PHPackages                             flyokai/amp-data-pipeline - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Utility &amp; Helpers](/categories/utility)
4. /
5. flyokai/amp-data-pipeline

ActiveLibrary[Utility &amp; Helpers](/categories/utility)

flyokai/amp-data-pipeline
=========================

Async data pipeline using amphp

0.1.0(1mo ago)06↓50%MITPHPPHP ^8.1

Since Apr 25Pushed yesterdayCompare

[ Source](https://github.com/flyokai/amp-data-pipeline)[ Packagist](https://packagist.org/packages/flyokai/amp-data-pipeline)[ RSS](/packages/flyokai-amp-data-pipeline/feed)WikiDiscussions main Synced 1w ago

READMEChangelog (1)Dependencies (2)Versions (8)Used By (0)

flyokai/amp-data-pipeline
=========================

[](#flyokaiamp-data-pipeline)

> User docs → [`README.md`](README.md) · Agent quick-ref → [`CLAUDE.md`](CLAUDE.md) · Agent deep dive → [`AGENTS.md`](AGENTS.md)

> Composable async data pipelines on AMPHP 3.x — sources, processors, batching, multicast, with explicit concurrency and backpressure controls.

A small toolkit for building pull-based concurrent processing pipelines. You wire together a `DataSource` and one or more `Processor`s; each processor controls its own fiber count and output queue size. Batch and multicast operators give you the rest.

Features
--------

[](#features)

- **`DataItem`** wrapper — `data` + `meta` arrays, immutable-ish
- **Sources** — `ArraySource`, `IteratorSource`, `QueueSource`
- **Processors** — `ProcessorAbstract` (override `processDataItem()`), `SkipProcessor`
- **Composition** — `ProcessorComposition` chains stages sequentially
- **Batching** — `Batch\BatchProcessor` groups items and creates a per-batch processor
- **Multicast** — `DataCast\MultiCastProcessor` fans out each item to parallel cast processors
- **Per-stage concurrency** — fiber count, buffer size, optional ordering
- **Cancellation** propagated through the chain

Installation
------------

[](#installation)

```
composer require flyokai/amp-data-pipeline
```

Quick start
-----------

[](#quick-start)

```
use Flyokai\AmpDataPipeline\{ArraySource, DataItem, ProcessorAbstract, ProcessorComposition};

final class Upper extends ProcessorAbstract
{
    protected function processDataItem(DataItem $item): void
    {
        $item->setData('value', strtoupper($item->getData('value')));
        $this->releaseDataItem($item);
    }
}

$source = new ArraySource([
    DataItem::fromArray(['value' => 'alice'], []),
    DataItem::fromArray(['value' => 'bob'],   []),
]);

$pipeline = new ProcessorComposition([new Upper()]);
$pipeline->setSource($source);

$pipeline->run(function (DataItem $item) {
    echo $item->getData('value'), "\n";   // ALICE, BOB
});
```

Concepts
--------

[](#concepts)

### `DataItem`

[](#dataitem)

```
$item->getData('key');          // payload access
$item->setData('key', 'value'); // returns mutated
$item->getMeta();               // metadata bag
```

### Sources

[](#sources)

ClassUse case`ArraySource`wraps a PHP array (`ConcurrentArrayIterator`)`IteratorSource`wraps any `iterable``QueueSource`wraps an AMPHP `Queue` for push-based input### Processors

[](#processors)

`ProcessorAbstract` gives you:

- `setConcurrency(int)` — fiber count inside the stage
- `setBufferSize(int)` — output queue depth (0 = same as concurrency)
- `setCancellation(Cancellation)` — graceful shutdown
- `releaseDataItem(DataItem)` — push to output

```
new MyProcessor()
    ->setConcurrency(8)
    ->setBufferSize(16);
```

### Linear pipeline

[](#linear-pipeline)

```
$pipeline = new ProcessorComposition([
    new PrepareProcessor(),
    new ValidateProcessor(),
    new SaveProcessor(),
]);
$pipeline->setSource(new ArraySource($rows));
$pipeline->run(/* optional itemCallback */);
```

### Batching

[](#batching)

```
use Flyokai\AmpDataPipeline\Batch\BatchProcessor;

$batcher = new BatchProcessor(
    batchProcessorFactory: fn() => new SaveBatchProcessor(),
    resultHandlerFactory:  fn() => new ResultRouter(),
    batchSize: 100,
    ordered: false,        // true → preserve order across batches
    groupResults: false,   // true → merge batch results into one DataItem
    throwIfUnhandled: true,
);
```

Items accumulate up to `batchSize`, a fresh processor is built for each batch, and results are routed through a `DataItemHandler` strategy.

### Multicast

[](#multicast)

```
use Flyokai\AmpDataPipeline\DataCast\MultiCastProcessor;

$cast = new MultiCastProcessor(
    castProcessorFactories: [
        fn() => new IndexInOpensearch(),
        fn() => new WriteToCache(),
    ],
    groupResults: true,
    groupBufferSize: 10,
);
```

Each input item is delivered to every cast processor in parallel; outputs are aggregated by `MultiCastConsumer`.

### Handler strategies

[](#handler-strategies)

`DataItemHandler` is the strategy for handling specific items:

```
interface DataItemHandler {
    public function canHandle(DataItem $item): bool;
    public function handle(DataItem $item): Future;
}
```

`HandlerComposition` enforces **mutual exclusion** — exactly one handler per item; multiple matches throw. Pass `$ordered = true` to preserve item order via Mutex / Sequence.

Concurrency model
-----------------

[](#concurrency-model)

- **Inter-stage**: a `ProcessorComposition` chains iterators (pull-based, demand-driven).
- **Intra-stage**: `concurrency` controls the number of fibers servicing the queue inside a processor.
- **Buffer**: `bufferSize` decouples producer/consumer (set to 0 to mirror concurrency).
- **Multicast**: every cast processor fires simultaneously per item.
- **Backpressure**: queue + `groupBufferSize` cap memory growth.

Gotchas
-------

[](#gotchas)

- **Order is not preserved by default.** Use `$ordered = true` on `BatchProcessor` or `HandlerComposition` if you need it.
- **`reset()` requires queue completion.** Resetting an incomplete queue throws `RuntimeException`.
- **Handler exclusivity** — a `HandlerComposition` enforces one handler per item. Two matching handlers throw.
- **`CastProcessor` ≠ `Processor`.** Cast processors receive a raw `ConcurrentIterator` and own their queues.
- **`groupBufferSize = 0` is unlimited.** Multicast with grouping can grow memory unboundedly.
- **Cancellation must propagate.** `ProcessorComposition` propagates to children automatically, but custom compositions need to do this explicitly.
- **Reflection in error handling** — `errorDisposeQueue()` reaches into `Queue` internals via reflection. AMPHP version updates may break it.

See also
--------

[](#see-also)

- [`flyokai/indexer`](../indexer/README.md) — full reindex uses pipelines
- Bulk data-import services typically use this as their processing core (`DataSource → BatchProcessor → Stage Pipeline`).

License
-------

[](#license)

MIT

###  Health Score

38

—

LowBetter than 83% of packages

Maintenance96

Actively maintained with recent releases

Popularity6

Limited adoption so far

Community8

Small or concentrated contributor base

Maturity37

Early-stage or recently created project

 Bus Factor1

Top contributor holds 60% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~1 days

Total

4

Last Release

41d ago

### Community

Maintainers

![](https://avatars.githubusercontent.com/u/247743048?v=4)[flyokai](/maintainers/flyokai)[@flyokai](https://github.com/flyokai)

---

Top Contributors

[![flyokai](https://avatars.githubusercontent.com/u/247743048?v=4)](https://github.com/flyokai "flyokai (3 commits)")[![wtsergo](https://avatars.githubusercontent.com/u/305326?v=4)](https://github.com/wtsergo "wtsergo (2 commits)")

### Embed Badge

![Health badge](/badges/flyokai-amp-data-pipeline/health.svg)

```
[![Health](https://phpackages.com/badges/flyokai-amp-data-pipeline/health.svg)](https://phpackages.com/packages/flyokai-amp-data-pipeline)
```

###  Alternatives

[amphp/parallel

Parallel processing component for Amp.

85049.9M89](/packages/amphp-parallel)[amphp/process

A fiber-aware process manager based on Amp and Revolt.

25657.8M64](/packages/amphp-process)[amphp/http-server

A non-blocking HTTP application server for PHP based on Amp.

1.3k5.9M101](/packages/amphp-http-server)[amphp/http-client

An advanced async HTTP client library for PHP, enabling efficient, non-blocking, and concurrent requests and responses.

7298.5M183](/packages/amphp-http-client)[amphp/parallel-functions

Parallel processing made simple.

27910.5M26](/packages/amphp-parallel-functions)[amphp/mysql

Asynchronous MySQL client for PHP based on Amp.

3761.1M33](/packages/amphp-mysql)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
