PHPackages                             react/promise-stream - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. react/promise-stream

ActiveLibrary[Queues &amp; Workers](/categories/queues)

react/promise-stream
====================

The missing link between Promise-land and Stream-land for ReactPHP

v1.7.0(2y ago)11512.9M↓19.7%14[1 issues](https://github.com/reactphp/promise-stream/issues)20MITPHPPHP &gt;=5.3CI passing

Since May 10Pushed 1y ago10 watchersCompare

[ Source](https://github.com/reactphp/promise-stream)[ Packagist](https://packagist.org/packages/react/promise-stream)[ Docs](https://github.com/reactphp/promise-stream)[ Fund](https://opencollective.com/reactphp)[ RSS](/packages/react-promise-stream/feed)WikiDiscussions 1.x Synced 1mo ago

READMEChangelog (10)Dependencies (3)Versions (13)Used By (20)

PromiseStream
=============

[](#promisestream)

[![CI status](https://github.com/reactphp/promise-stream/actions/workflows/ci.yml/badge.svg)](https://github.com/reactphp/promise-stream/actions)[![installs on Packagist](https://camo.githubusercontent.com/759d4e2b3c7ba0fc8930509b36925077dc90034374120261b13d57ba291f39bd/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f64742f72656163742f70726f6d6973652d73747265616d3f636f6c6f723d626c7565266c6162656c3d696e7374616c6c732532306f6e2532305061636b6167697374)](https://packagist.org/packages/react/promise-stream)

The missing link between Promise-land and Stream-land for [ReactPHP](https://reactphp.org/).

**Table of Contents**

- [Usage](#usage)
    - [buffer()](#buffer)
    - [first()](#first)
    - [all()](#all)
    - [unwrapReadable()](#unwrapreadable)
    - [unwrapWritable()](#unwrapwritable)
- [Install](#install)
- [Tests](#tests)
- [License](#license)

Usage
-----

[](#usage)

This lightweight library consists only of a few simple functions. All functions reside under the `React\Promise\Stream` namespace.

The below examples refer to all functions with their fully-qualified names like this:

```
React\Promise\Stream\buffer(…);
```

As of PHP 5.6+ you can also import each required function into your code like this:

```
use function React\Promise\Stream\buffer;

buffer(…);
```

Alternatively, you can also use an import statement similar to this:

```
use React\Promise\Stream;

Stream\buffer(…);
```

### buffer()

[](#buffer)

The `buffer(ReadableStreamInterface $stream, ?int $maxLength = null): PromiseInterface` function can be used to create a `Promise` which will be fulfilled with the stream data buffer.

```
$stream = accessSomeJsonStream();

React\Promise\Stream\buffer($stream)->then(function (string $contents) {
    var_dump(json_decode($contents));
});
```

The promise will be fulfilled with a `string` of all data chunks concatenated once the stream closes.

The promise will be fulfilled with an empty `string` if the stream is already closed.

The promise will be rejected with a `RuntimeException` if the stream emits an error.

The promise will be rejected with a `RuntimeException` if it is cancelled.

The optional `$maxLength` argument defaults to no limit. In case the maximum length is given and the stream emits more data before the end, the promise will be rejected with an `OverflowException`.

```
$stream = accessSomeToLargeStream();

React\Promise\Stream\buffer($stream, 1024)->then(function ($contents) {
    var_dump(json_decode($contents));
}, function ($error) {
    // Reaching here when the stream buffer goes above the max size,
    // in this example that is 1024 bytes,
    // or when the stream emits an error.
});
```

### first()

[](#first)

The `first(ReadableStreamInterface|WritableStreamInterface $stream, string $event = 'data'): PromiseInterface` function can be used to create a `Promise` which will be fulfilled once the given event triggers for the first time.

```
$stream = accessSomeJsonStream();

React\Promise\Stream\first($stream)->then(function (string $chunk) {
    echo 'The first chunk arrived: ' . $chunk;
});
```

The promise will be fulfilled with a `mixed` value of whatever the first event emitted or `null` if the event does not pass any data. If you do not pass a custom event name, then it will wait for the first "data" event. For common streams of type `ReadableStreamInterface`, this means it will be fulfilled with a `string` containing the first data chunk.

The promise will be rejected with a `RuntimeException` if the stream emits an error – unless you're waiting for the "error" event, in which case it will be fulfilled.

The promise will be rejected with a `RuntimeException` once the stream closes – unless you're waiting for the "close" event, in which case it will be fulfilled.

The promise will be rejected with a `RuntimeException` if the stream is already closed.

The promise will be rejected with a `RuntimeException` if it is cancelled.

### all()

[](#all)

The `all(ReadableStreamInterface|WritableStreamInterface $stream, string $event = 'data'): PromiseInterface` function can be used to create a `Promise` which will be fulfilled with an array of all the event data.

```
$stream = accessSomeJsonStream();

React\Promise\Stream\all($stream)->then(function (array $chunks) {
    echo 'The stream consists of ' . count($chunks) . ' chunk(s)';
});
```

The promise will be fulfilled with an `array` once the stream closes. The array will contain whatever all events emitted or `null` values if the events do not pass any data. If you do not pass a custom event name, then it will wait for all the "data" events. For common streams of type `ReadableStreamInterface`, this means it will be fulfilled with a `string[]` array containing all the data chunk.

The promise will be fulfilled with an empty `array` if the stream is already closed.

The promise will be rejected with a `RuntimeException` if the stream emits an error.

The promise will be rejected with a `RuntimeException` if it is cancelled.

### unwrapReadable()

[](#unwrapreadable)

The `unwrapReadable(PromiseInterface $promise): ReadableStreamInterface` function can be used to unwrap a `Promise` which will be fulfilled with a `ReadableStreamInterface`.

This function returns a readable stream instance (implementing `ReadableStreamInterface`) right away which acts as a proxy for the future promise resolution. Once the given Promise will be fulfilled with a `ReadableStreamInterface`, its data will be piped to the output stream.

```
//$promise = someFunctionWhichResolvesWithAStream();
$promise = startDownloadStream($uri);

$stream = React\Promise\Stream\unwrapReadable($promise);

$stream->on('data', function (string $data) {
    echo $data;
});

$stream->on('end', function () {
    echo 'DONE';
});
```

If the given promise is either rejected or fulfilled with anything but an instance of `ReadableStreamInterface`, then the output stream will emit an `error` event and close:

```
$promise = startDownloadStream($invalidUri);

$stream = React\Promise\Stream\unwrapReadable($promise);

$stream->on('error', function (Exception $error) {
    echo 'Error: ' . $error->getMessage();
});
```

The given `$promise` SHOULD be pending, i.e. it SHOULD NOT be fulfilled or rejected at the time of invoking this function. If the given promise is already settled and does not fulfill with an instance of `ReadableStreamInterface`, then you will not be able to receive the `error` event.

You can `close()` the resulting stream at any time, which will either try to `cancel()` the pending promise or try to `close()` the underlying stream.

```
$promise = startDownloadStream($uri);

$stream = React\Promise\Stream\unwrapReadable($promise);

$loop->addTimer(2.0, function () use ($stream) {
    $stream->close();
});
```

### unwrapWritable()

[](#unwrapwritable)

The `unwrapWritable(PromiseInterface $promise): WritableStreamInterface` function can be used to unwrap a `Promise` which will be fulfilled with a `WritableStreamInterface`.

This function returns a writable stream instance (implementing `WritableStreamInterface`) right away which acts as a proxy for the future promise resolution. Any writes to this instance will be buffered in memory for when the promise will be fulfilled. Once the given Promise will be fulfilled with a `WritableStreamInterface`, any data you have written to the proxy will be forwarded transparently to the inner stream.

```
//$promise = someFunctionWhichResolvesWithAStream();
$promise = startUploadStream($uri);

$stream = React\Promise\Stream\unwrapWritable($promise);

$stream->write('hello');
$stream->end('world');

$stream->on('close', function () {
    echo 'DONE';
});
```

If the given promise is either rejected or fulfilled with anything but an instance of `WritableStreamInterface`, then the output stream will emit an `error` event and close:

```
$promise = startUploadStream($invalidUri);

$stream = React\Promise\Stream\unwrapWritable($promise);

$stream->on('error', function (Exception $error) {
    echo 'Error: ' . $error->getMessage();
});
```

The given `$promise` SHOULD be pending, i.e. it SHOULD NOT be fulfilled or rejected at the time of invoking this function. If the given promise is already settled and does not fulfill with an instance of `WritableStreamInterface`, then you will not be able to receive the `error` event.

You can `close()` the resulting stream at any time, which will either try to `cancel()` the pending promise or try to `close()` the underlying stream.

```
$promise = startUploadStream($uri);

$stream = React\Promise\Stream\unwrapWritable($promise);

$loop->addTimer(2.0, function () use ($stream) {
    $stream->close();
});
```

Install
-------

[](#install)

The recommended way to install this library is [through Composer](https://getcomposer.org/). [New to Composer?](https://getcomposer.org/doc/00-intro.md)

This project follows [SemVer](https://semver.org/). This will install the latest supported version:

```
composer require react/promise-stream:^1.7
```

See also the [CHANGELOG](CHANGELOG.md) for details about version upgrades.

This project aims to run on any platform and thus does not require any PHP extensions and supports running on legacy PHP 5.3 through current PHP 8+ and HHVM. It's *highly recommended to use the latest supported PHP version* for this project.

Tests
-----

[](#tests)

To run the test suite, you first need to clone this repo and then install all dependencies [through Composer](https://getcomposer.org/):

```
composer install
```

To run the test suite, go to the project root and run:

```
vendor/bin/phpunit
```

License
-------

[](#license)

MIT, see [LICENSE file](LICENSE).

###  Health Score

52

—

FairBetter than 96% of packages

Maintenance34

Infrequent updates — may be unmaintained

Popularity61

Solid adoption and visibility

Community39

Small or concentrated contributor base

Maturity65

Established project with proven stability

 Bus Factor2

2 contributors hold 50%+ of commits

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~239 days

Recently: every ~253 days

Total

13

Last Release

416d ago

Major Versions

v0.1.2 → v1.0.02017-10-24

### Community

Maintainers

![](https://avatars.githubusercontent.com/u/147145?v=4)[Cees-Jan Kiewiet](/maintainers/WyriHaximus)[@WyriHaximus](https://github.com/WyriHaximus)

![](https://avatars.githubusercontent.com/u/776829?v=4)[Christian Lück](/maintainers/clue)[@clue](https://github.com/clue)

![](https://www.gravatar.com/avatar/5de14776bbddf901c6e24d35829fe66fe997c303d53aca83cc7d1a90bb0b7110?d=identicon)[jsor](/maintainers/jsor)

![](https://www.gravatar.com/avatar/abeb36727bb30a42ef26abe8e6c1152917648fa2ae7259e0038ac9644218e27a?d=identicon)[cboden](/maintainers/cboden)

---

Top Contributors

[![clue](https://avatars.githubusercontent.com/u/776829?v=4)](https://github.com/clue "clue (54 commits)")[![WyriHaximus](https://avatars.githubusercontent.com/u/147145?v=4)](https://github.com/WyriHaximus "WyriHaximus (38 commits)")[![jsor](https://avatars.githubusercontent.com/u/55574?v=4)](https://github.com/jsor "jsor (11 commits)")[![SimonFrings](https://avatars.githubusercontent.com/u/44357440?v=4)](https://github.com/SimonFrings "SimonFrings (11 commits)")[![PaulRotmann](https://avatars.githubusercontent.com/u/85174210?v=4)](https://github.com/PaulRotmann "PaulRotmann (1 commits)")[![reedy](https://avatars.githubusercontent.com/u/67615?v=4)](https://github.com/reedy "reedy (1 commits)")[![nhedger](https://avatars.githubusercontent.com/u/649677?v=4)](https://github.com/nhedger "nhedger (1 commits)")[![kelunik](https://avatars.githubusercontent.com/u/2743004?v=4)](https://github.com/kelunik "kelunik (1 commits)")[![lucasnetau](https://avatars.githubusercontent.com/u/9331242?v=4)](https://github.com/lucasnetau "lucasnetau (1 commits)")[![carusogabriel](https://avatars.githubusercontent.com/u/16328050?v=4)](https://github.com/carusogabriel "carusogabriel (1 commits)")

---

Tags

phppromisereactphpstreamstreamasyncpromisereactphpBufferunwrap

###  Code Quality

TestsPHPUnit

### Embed Badge

![Health badge](/badges/react-promise-stream/health.svg)

```
[![Health](https://phpackages.com/badges/react-promise-stream/health.svg)](https://phpackages.com/packages/react-promise-stream)
```

###  Alternatives

[react/socket

Async, streaming plaintext TCP/IP and secure TLS socket server and client connections for ReactPHP

1.3k116.9M402](/packages/react-socket)[react/promise-timer

A trivial implementation of timeouts for Promises, built on top of ReactPHP.

34141.9M96](/packages/react-promise-timer)[icicleio/icicle

Icicle is a PHP library for writing asynchronous code using synchronous coding techniques.

1.1k150.9k14](/packages/icicleio-icicle)[clue/docker-react

Async, event-driven access to the Docker Engine API, built on top of ReactPHP.

113154.9k1](/packages/clue-docker-react)[clue/reactphp-flux

Flux, the lightweight stream processor to concurrently do many (but not too many) things at once, built on top of ReactPHP.

59118.6k1](/packages/clue-reactphp-flux)[joshdifabio/resource-pool

Regulate the concurrency level of your async components

3321.1k1](/packages/joshdifabio-resource-pool)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
