PHPackages                             amphp/pipeline - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. amphp/pipeline

ActiveLibrary[Queues &amp; Workers](/categories/queues)

amphp/pipeline
==============

Asynchronous iterators and operators.

v1.2.3(1y ago)7632.7M—0.8%6[2 issues](https://github.com/amphp/pipeline/issues)20MITPHPPHP &gt;=8.1CI passing

Since Dec 7Pushed 1w ago5 watchersCompare

[ Source](https://github.com/amphp/pipeline)[ Packagist](https://packagist.org/packages/amphp/pipeline)[ Docs](https://amphp.org/pipeline)[ GitHub Sponsors](https://github.com/amphp)[ RSS](/packages/amphp-pipeline/feed)WikiDiscussions 1.x Synced 1mo ago

READMEChangelog (10)Dependencies (6)Versions (17)Used By (20)

amphp/pipeline
==============

[](#amphppipeline)

AMPHP is a collection of event-driven libraries for PHP designed with fibers and concurrency in mind. `amphp/pipeline` provides concurrent iterators and collection operators.

Installation
------------

[](#installation)

This package can be installed as a [Composer](https://getcomposer.org/) dependency.

```
composer require amphp/pipeline
```

Requirements
------------

[](#requirements)

This package requires PHP 8.1 or later.

Usage
-----

[](#usage)

Using fiber-based coroutines, asynchronous sets can now be created and consumed within a single fiber using PHP's built-in `Iterator`. Attempting to consume an `Iterator` instance from multiple fibers is problematic though, as one fiber may modify the state of the iterator while another is suspended.

This library provides a `ConcurrentIterator` interface which provides a fiber-safe iterator that may be consumed by multiple fibers concurrently, as well as tools for creating asynchronous sets.

### Concurrent Iterators

[](#concurrent-iterators)

A `ConcurrentIterator` may be used in place of an `Iterator`, meaning it can be used with `foreach`, `yield from`, `iterator_to_array()`, argument unpacking, and more!

Like an `Iterator`, a `ConcurrentIterator` may also be iterated manually, with separate methods for advancing and retrieving the current value.

```
use Amp\Pipeline\Pipeline;

// Pipeline::getIterator() returns a ConcurrentIterator
$concurrentIterator = Pipeline::fromIterable([1, 2, 3])->getIterator();
while ($concurrentIterator->continue()) {
    $position = $concurrentIterator->getPosition();
    $value = $concurrentIterator->getValue();

    // ...
}

// Equivalently, multiple fibers may consume a single ConcurrentIterator
// instance using foreach.
$concurrentIterator = Pipeline::fromIterable([1, 2, 3])->getIterator();
foreach ($concurrentIterator as $position => $value) {
    // ...
}
```

`continue()` suspends the current fiber until a value becomes available or the iterator completes, returning `true` or `false` respectively. An exception is thrown from `continue()` the source of the iterator throws an exception while generating the next value.

`getValue()` returns the last value emitted on the iterator within the current fiber. The return value of this function will not change within the current fiber until `continue()` is called again. `continue()` must be invoked and return before this method can be called.

`getPosition()` returns the current 0-indexed position within the current iterator. If consuming from multiple fibers, this value may not be sequential within a single fiber. Similar to `getValue()`, `continue()` must be invoked and return before this method can be called.

> **Note**In general, it is not necessary to call these methods directly within application code. Concurrent iterators typically should be used with `foreach`.

### Queue

[](#queue)

A `Queue` is used to create an asynchronous set of values with the ability to await consumption of the values produced, providing back-pressure to the production of more values, so consumption and production can be synchronized.

Values may be added to a `Queue` in two ways.

- `push()` adds the value to the queue, only returning once the value has been consumed from the queue.
- `pushAsync()` adds the value to the queue, returning a `Future` immediately which is completed only once the value has been consumed from the queue.

```
use Amp\Pipeline\Queue;
use function Amp\async;
use function Amp\delay;

$queue = new Queue();

$start = \microtime(true);
$elapsed = fn () => \microtime(true) - $start;

// Generate values in a separate fiber
async(function () use ($queue, $elapsed): void {
    printf("Starting production loop at %.3fs\n", $elapsed());

    foreach (range(1, 10) as $value) {
        delay(0.1); // Production of a value takes between 100ms
        $queue->push($value);
    }

    printf("Completing production loop at %.3fs\n", $elapsed());

    // Queue must be completed, otherwise foreach loop below will never exit!
    $queue->complete();
});

foreach ($queue->iterate() as $value) {
    printf("Iterator yielded %d at %.3fs\n", $value, $elapsed());
    delay(0.5); // Listener consumption takes 500 ms
}
```

Once all values have been pushed into a `Queue`, the producer must call `complete()` to end the concurrent iterator. Failure to do so will leave the consumer suspended indefinitely. Alternatively to indicate an error, the producer may use `error()` to throw an exception to the concurrent iterator consumer and end the concurrent iterator.

#### DisposedException

[](#disposedexception)

If the consumer of the concurrent iterator generated by the `Queue` is destroyed, `push()` will throw a `DisposedException` (or the future returned from `pushAsync()` will error with a `DisposedException`). This indicates that no additional values need to be generated since consumption of those values has ended. If for some reason the producer wishes to continue (e.g., to consume bytes from a buffer), either catch the exception or ignore the future. (The `DisposedException` instance is created only once for each queue.)

### Pipeline

[](#pipeline)

A `Pipeline` represents an asynchronous set and provides operations which can be applied over the set.

```
use Amp\Pipeline\Pipeline;
use function Amp\delay;

$pipeline = Pipeline::fromIterable(function (): \Generator {
    for ($i = 0; $i < 100; ++$i) {
        yield $i;
    }
});

$pipeline = $pipeline
    ->concurrent(10) // Process up to 10 items concurrently
    ->unordered() // Results may be consumed eagerly and out of order
    ->tap(fn () => delay(random_int(1, 10) / 10)) // Observe each value with a delay for 0.1 to 1 seconds, simulating I/O
    ->map(fn (int $input) => $input * 10) // Apply an operation to each value
    ->filter(fn (int $input) => $input % 3 === 0); // Filter only values divisible by 3

foreach ($pipeline as $value) {
    echo $value, "\n";
}
```

Alternatively, `Pipeline` also has methods which consume the set, such as `forEach()` or `reduce()`, which return only once the set is complete or throws an exception.

```
use Amp\Pipeline\Pipeline;

Pipeline::generate(function (): int { static $v = 0; return ++$v; })
    ->take(10) // Take only 10 values from the generation function.
    ->concurrent(3) // Process 3 values concurrently
    ->delay(1) // Delay for 1 second to simulate I/O
    ->forEach(function (int $value): void {
        echo $value, "\n";
    });
```

Versioning
----------

[](#versioning)

`amphp/pipeline` follows the [semver](http://semver.org/) semantic versioning specification like all other `amphp` packages.

Security
--------

[](#security)

If you discover any security related issues, please email [`contact@amphp.org`](mailto:contact@amphp.org) instead of using the issue tracker.

License
-------

[](#license)

The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information.

###  Health Score

62

—

FairBetter than 99% of packages

Maintenance74

Regular maintenance activity

Popularity63

Solid adoption and visibility

Community29

Small or concentrated contributor base

Maturity67

Established project with proven stability

 Bus Factor1

Top contributor holds 61.7% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~118 days

Recently: every ~180 days

Total

14

Last Release

79d ago

PHP version history (2 changes)v1.0.0-beta.1PHP &gt;=8

v1.0.0-beta.4PHP &gt;=8.1

### Community

Maintainers

![](https://avatars.githubusercontent.com/u/1628287?v=4)[Aaron Piotrowski](/maintainers/Trowski)[@trowski](https://github.com/trowski)

![](https://www.gravatar.com/avatar/12852217f3369e8144bc9ce6ac2a2341c28c5512c5b3df5749bfbbd45b6877ff?d=identicon)[kelunik](/maintainers/kelunik)

---

Top Contributors

[![trowski](https://avatars.githubusercontent.com/u/1628287?v=4)](https://github.com/trowski "trowski (103 commits)")[![kelunik](https://avatars.githubusercontent.com/u/2743004?v=4)](https://github.com/kelunik "kelunik (63 commits)")[![vudaltsov](https://avatars.githubusercontent.com/u/2552865?v=4)](https://github.com/vudaltsov "vudaltsov (1 commits)")

---

Tags

asyncnon-blockingiteratorampamphpio

###  Code Quality

TestsPHPUnit

### Embed Badge

![Health badge](/badges/amphp-pipeline/health.svg)

```
[![Health](https://phpackages.com/badges/amphp-pipeline/health.svg)](https://phpackages.com/packages/amphp-pipeline)
```

###  Alternatives

[amphp/byte-stream

A stream abstraction to make working with non-blocking I/O simple.

393116.2M104](/packages/amphp-byte-stream)[amphp/file

Non-blocking access to the filesystem based on Amp and Revolt.

1103.3M94](/packages/amphp-file)[amphp/amp

A non-blocking concurrency framework for PHP applications.

4.4k123.4M323](/packages/amphp-amp)[amphp/socket

Non-blocking socket connection / server implementations based on Amp and Revolt.

26539.0M119](/packages/amphp-socket)[amphp/dns

Async DNS resolution for Amp.

19339.2M41](/packages/amphp-dns)[amphp/http-server

A non-blocking HTTP application server for PHP based on Amp.

1.3k4.5M81](/packages/amphp-http-server)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
