PHPackages                             hiblaphp/stream - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. hiblaphp/stream

ActiveLibrary

hiblaphp/stream
===============

0681↑83.4%1PHPCI passing

Since Mar 20Pushed 1mo agoCompare

[ Source](https://github.com/hiblaphp/stream)[ Packagist](https://packagist.org/packages/hiblaphp/stream)[ RSS](/packages/hiblaphp-stream/feed)WikiDiscussions main Synced 1mo ago

READMEChangelogDependenciesVersions (1)Used By (1)

Hibla Stream
============

[](#hibla-stream)

**Non-blocking, event-driven streams for PHP with promise-based I/O and automatic backpressure.**

`hiblaphp/stream` provides readable, writable, and duplex stream abstractions built on top of the Hibla event loop. Streams register I/O watchers with the event loop and let data flow cooperatively — a read or write never blocks the thread. Backpressure is tracked automatically: when a writable buffer fills, `write()` returns `false` and the `drain` event signals when it is safe to continue. Pipe chains wire all of this together without any manual coordination.

[![Latest Release](https://camo.githubusercontent.com/6fccbe4e5afdeb3f728cbe2189a5b68c1ae66a02416454cc5f6d242c3a6a3ba6/68747470733a2f2f696d672e736869656c64732e696f2f6769746875622f72656c656173652f6869626c617068702f73747265616d2e7376673f7374796c653d666c61742d737175617265)](https://github.com/hiblaphp/stream/releases)[![MIT License](https://camo.githubusercontent.com/942e017bf0672002dd32a857c95d66f28c5900ab541838c6c664442516309c8a/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f6c6963656e73652d4d49542d626c75652e7376673f7374796c653d666c61742d737175617265)](./LICENSE)

---

Contents
--------

[](#contents)

**Getting started**

- [Installation](#installation)
- [Introduction](#introduction)
- [Stream Types](#stream-types)
- [Stream Events](#stream-events)
- [Creating Streams](#creating-streams)

**Stream types**

- [Readable Streams](#readable-streams)
    - [Pausing and resuming](#pausing-and-resuming)
    - [Seeking and telling](#seeking-and-telling)
- [Writable Streams](#writable-streams)
    - [Backpressure](#backpressure)
- [Piping Streams](#piping-streams)
    - [Through Streams](#through-streams)
- [Duplex Streams](#duplex-streams)
- [Composite Streams](#composite-streams)

**Promise-based API**

- [Promise-Based API](#promise-based-api)
    - [Reading data](#reading-data)
    - [Reading exact byte counts](#reading-exact-byte-counts)
    - [Writing data](#writing-data)
    - [Piping with `pipeAsync()`](#piping-with-pipeasync)
    - [Cancellation](#cancellation)

**I/O and platform**

- [Standard I/O](#standard-io)
- [Platform Notes](#platform-notes)

**Reference**

- [Stream Lifecycle and Events](#stream-lifecycle-and-events)
    - [Readable stream lifecycle](#readable-stream-lifecycle)
    - [Writable stream lifecycle](#writable-stream-lifecycle)
    - [Pipe event flow](#pipe-event-flow)
    - [ThroughStream event flow](#throughstream-event-flow)
    - [CompositeStream and DuplexResourceStream events](#compositestream-and-duplexresourcestream-events)
    - [Error event behaviour](#error-event-behaviour)
- [No-Op Behaviour](#no-op-behaviour)
- [Resource Cleanup and Destructors](#resource-cleanup-and-destructors)
- [Events Reference](#events-reference)
- [API Reference](#api-reference)

**Meta**

- [Development](#development)
- [Credits](#credits)
- [License](#license)

---

Installation
------------

[](#installation)

```
composer require hiblaphp/stream
```

**Requirements:**

- PHP 8.3+
- `hiblaphp/event-loop`
- `hiblaphp/promise`
- `evenement/evenement`

---

Introduction
------------

[](#introduction)

PHP's built-in I/O is synchronous. `fread()` blocks until data arrives. `fwrite()`blocks until the kernel accepts the data. For a single file or socket this is fine — but the moment you are handling multiple streams concurrently, blocking on any one of them stalls everything else. The event loop cannot fire timers, cannot resume Fibers, cannot process other I/O until the blocking call returns.

The solution is to hand stream I/O to the event loop entirely. Instead of calling `fread()` blindly and waiting, you register a read watcher and supply a callback. The event loop monitors all active streams at once — using `stream_select()` or `ext-uv`depending on the active driver — and wakes only when data is actually available, then calls `fread()` immediately, knowing it will return without blocking.

Hibla streams are that abstraction. A `ReadableResourceStream` registers a read watcher with the event loop and emits `data` events when the underlying resource is ready. A `WritableResourceStream` buffers outgoing data and registers a write watcher to drain the buffer asynchronously without blocking. When you use the promise-based API — `readAsync()`, `writeAsync()`, `pipeAsync()` — the stream suspends the current Fiber at the `await()` point and resumes it exactly when the I/O completes, so the rest of your code reads top to bottom like ordinary synchronous PHP while the event loop handles everything underneath.

This library is the foundation that higher-level Hibla components build on. `hiblaphp/socket` exposes TCP and UDP connections as duplex streams. `hiblaphp/dns`reads and writes over DNS sockets using the same stream primitives. `hiblaphp/parallel`uses the promise-based stream API and Fibers to orchestrate IPC between parent and child processes. If you are using any of these packages you are already using `hiblaphp/stream` — understanding it directly gives you full visibility into how data moves through the entire Hibla stack.

---

Stream Types
------------

[](#stream-types)

ClassDirectionUse case`ReadableResourceStream`ReadEvent-driven reads from any readable PHP resource`WritableResourceStream`WriteBuffered non-blocking writes to any writable PHP resource`DuplexResourceStream`Read + WriteSingle resource opened in read/write mode (e.g. `r+`)`CompositeStream`Read + WriteTwo independent streams combined into a single duplex interface`ThroughStream`Read + WriteIn-line transformer — data written in is emitted out, optionally transformed`PromiseReadableStream`Read`ReadableResourceStream` extended with `readAsync()`, `readLineAsync()`, `readAllAsync()`, `pipeAsync()``PromiseWritableStream`Write`WritableResourceStream` extended with `writeAsync()`, `writeLineAsync()`, `endAsync()`---

Stream Events
-------------

[](#stream-events)

Every stream class emits a defined set of events at precise lifecycle points. Understanding what each event means before working with individual stream types makes the rest of this document easier to follow.

### Readable stream events

[](#readable-stream-events)

EventArgumentsWhen it fires`data``string $chunk`A chunk of data is available from the resource`end`—The stream has reached EOF — no more `data` events will follow`close`—The underlying resource has been closed and freed`error``\Throwable $e`A read error occurred. The stream closes immediately after`pause`—The stream has been paused — the read watcher is removed`resume`—The stream has resumed — the read watcher is re-registered### Writable stream events

[](#writable-stream-events)

EventArgumentsWhen it fires`drain`—The write buffer has dropped below the soft limit — safe to write again`finish`—`end()` was called and all buffered data has been fully flushed`close`—The underlying resource has been closed and freed`error``\Throwable $e`A write error occurred, or `write()` was called on a closed stream### Key ordering guarantees

[](#key-ordering-guarantees)

- On a readable stream, `end` always fires before `close` on a clean EOF. A read error skips `end` and goes directly to `close`.
- On a writable stream, `finish` always fires before `close` after a clean `end()`. Calling `close()` directly skips `finish` and discards any buffered data.
- After `close` fires, all listeners are removed — any listener attached after `close` will never fire.
- `error` is always followed by `close`. The stream closes itself after emitting `error`. You do not need to call `close()` inside an error handler.

> **Always attach an `error` listener** on any stream you open. An unhandled `error`event on an `EventEmitter` propagates and may terminate your process.

For the full event sequence diagrams showing exactly when each event fires relative to stream operations, see [Stream Lifecycle and Events](#stream-lifecycle-and-events) in the reference section.

---

Creating Streams
----------------

[](#creating-streams)

Each stream class accepts a PHP resource directly in its constructor. Resources must be valid and opened in the appropriate mode — the constructor validates this and throws a `StreamException` if not. Non-blocking mode is set automatically.

> **Note:** The examples throughout this README use `fopen()` to create resources for brevity. `fopen()` is a blocking call — it blocks the event loop for the duration of the file open operation. In production code running inside the event loop you should obtain resources through non-blocking means and pass them to the stream constructors.

```
use Hibla\Stream\ReadableResourceStream;
use Hibla\Stream\WritableResourceStream;
use Hibla\Stream\DuplexResourceStream;
use Hibla\Stream\PromiseReadableStream;
use Hibla\Stream\PromiseWritableStream;
use Hibla\Stream\CompositeStream;
use Hibla\Stream\ThroughStream;

$readable  = new ReadableResourceStream(fopen('/path/to/input.log', 'rb'));
$writable  = new WritableResourceStream(fopen('/path/to/output.log', 'wb'));
$duplex    = new DuplexResourceStream(fopen('/path/to/data.bin', 'r+b'));
$composite = new CompositeStream($readable, $writable);
$through   = new ThroughStream(fn(string $data) => strtoupper($data));

// Promise-based variants
$readable = new PromiseReadableStream(fopen('/path/to/input.log', 'rb'));
$writable = new PromiseWritableStream(fopen('/path/to/output.log', 'wb'));

// Named constructor alternative for promise streams
$readable = PromiseReadableStream::fromResource(fopen('/path/to/input.log', 'rb'));
$writable = PromiseWritableStream::fromResource(fopen('/path/to/output.log', 'wb'));
```

A `Stream` static factory is also available as a convenience shortcut for common cases like opening files and wrapping standard I/O handles:

```
use Hibla\Stream\Stream;

$readable = Stream::readableFile('/path/to/input.log');
$writable = Stream::writableFile('/path/to/output.log');
$stdin    = Stream::stdin();
$stdout   = Stream::stdout();
```

---

Readable Streams
----------------

[](#readable-streams)

`ReadableResourceStream` starts paused by default — no data events fire, no bytes are read from the resource, and no event loop watcher is registered until you explicitly call `resume()` or attach a `data` listener. The pause-by-default design gives you time to attach all your event listeners, wire up error handlers, and set up pipe destinations before any data starts flowing.

```
use Hibla\Stream\ReadableResourceStream;

$stream = new ReadableResourceStream(fopen('/var/log/app.log', 'rb'));

// Attach all listeners first — no data flows yet
$stream->on('data', function (string $chunk) {
    echo $chunk;
});

$stream->on('end', function () {
    echo "Stream fully consumed\n";
});

$stream->on('error', function (\Throwable $e) {
    echo "Error: " . $e->getMessage() . "\n";
});

// Start the flow only after all listeners are in place
$stream->resume();
```

> **If no data seems to arrive**, the most likely cause is a missing `resume()` call. The stream is paused and waiting for you to start it. Alternatively, switch to the promise-based API which handles `resume()` internally.

### Pausing and resuming

[](#pausing-and-resuming)

Calling `pause()` stops `data` events from firing and removes the event loop watcher, so the stream truly idles between reads rather than spinning. Both `pause()` and `resume()` are idempotent — calling them on an already-paused or already-flowing stream is a no-op.

```
$stream->on('data', function (string $chunk) use ($stream) {
    $stream->pause();
    processChunk($chunk);
    $stream->resume();
});
```

### Seeking and telling

[](#seeking-and-telling)

`ReadableResourceStream` exposes `seek()` and `tell()` for seekable resources such as files. `seek()` repositions the internal pointer, clears the read-ahead buffer, and resets the EOF flag — meaning data will flow again from the new position even if the stream had previously reached the end.

```
$stream = new ReadableResourceStream(fopen('/tmp/data.bin', 'rb'));

$stream->on('data', function (string $chunk) use ($stream) {
    echo "Read: " . strlen($chunk) . " bytes\n";
    $stream->pause();
});

$stream->resume();

// Rewind to the beginning and read again
$stream->seek(0);
$stream->resume();
```

`seek()` returns `false` on non-seekable resources — pipes, sockets, and STDIN — without throwing. Always check the return value if seekability matters:

```
// WRONG — seek() on a pipe silently returns false
$stream = new ReadableResourceStream(STDIN);
$stream->seek(0); // false — silently ignored

// CORRECT
if ($stream->seek(0) === false) {
    // resource is non-seekable — handle accordingly
}
```

`seek()` throws a `StreamException` in two cases: when the stream is closed, and when the underlying resource is no longer valid. `tell()` follows the same contract.

```
seek() return value
───────────────────────────────────────────────────
true            seek succeeded, buffer cleared, EOF reset
false           resource is non-seekable (pipe, socket, STDIN)
StreamException stream is closed or resource is invalid

```

```
$position = $stream->tell();
$stream->seek(512, SEEK_SET);   // seek to byte 512
$stream->seek(0, SEEK_END);     // seek to end of file
$stream->seek(-128, SEEK_CUR);  // seek relative to current position
```

> **Note:** `seek()` discards any internally buffered data. After a seek the buffer is stale and continuing from it would produce incorrect data, so it is always cleared.

---

Writable Streams
----------------

[](#writable-streams)

`WritableResourceStream` accepts data via `write()`, buffers it internally, and registers a write watcher with the event loop to drain the buffer non-blocking. Unlike readable streams, no `resume()` is needed — `write()` can be called immediately after construction. `end()` signals that no more data will be written. The buffer drains, then `finish` fires, then the stream closes.

`end()` and `close()` are both idempotent — calling them multiple times will not throw or emit duplicate events. This makes it safe to call them defensively without checking `isWritable()` first.

Calling `write()` on a closed stream emits an `error` event and returns `false` rather than throwing an exception. Always attach an `error` listener if there is any chance `write()` could be called after the stream closes.

```
use Hibla\Stream\WritableResourceStream;

$stream = new WritableResourceStream(fopen('/tmp/output.txt', 'wb'));

$stream->on('finish', fn() => echo "All data written\n");
$stream->on('error', fn(\Throwable $e) => echo "Write error: " . $e->getMessage() . "\n");

$stream->write("First line\n");
$stream->write("Second line\n");
$stream->end("Final line\n");
```

### Backpressure

[](#backpressure)

When a writable stream's internal buffer exceeds its soft limit, `write()` returns `false`. This is the backpressure signal — stop writing and wait for the `drain` event before continuing.

```
Consumer                 WritableResourceStream            Kernel buffer
   │                            │                               │
   │  write("chunk 1")          │                               │
   ├──────────────────────────► │  buffer: 10 KB (< 64 KB)     │
   │  ◄── true (keep going)     │                               │
   │                            │                               │
   │  write("chunk 2")          │                               │
   ├──────────────────────────► │  buffer: 70 KB (>= 64 KB)    │
   │  ◄── FALSE ◄───────────────│  STOP WRITING                 │
   │                            │                               │
   │  [wait for drain]          │ ─────────────────────────────►│ fwrite()
   │                            │  buffer: 15 KB (< 64 KB)      │
   │  ◄──── emit('drain') ◄─────│  RESUME                       │

```

```
use Hibla\Stream\WritableResourceStream;

$socket   = stream_socket_client('tcp://example.com:9000');
$writable = new WritableResourceStream($socket, softLimit: 65536);

function pump(string $data, WritableResourceStream $writable): void
{
    $canContinue = $writable->write($data);

    if ($canContinue === false) {
        $writable->once('drain', function () use ($writable) {
            pump(getNextChunk(), $writable);
        });
    }
}
```

`pipe()` and `pipeAsync()` handle all of this automatically. Manual backpressure management is only needed when calling `write()` directly.

---

Piping Streams
--------------

[](#piping-streams)

`pipe()` wires a readable stream to a writable stream and handles backpressure automatically. When the writable buffer fills, the readable is paused. When the writable drains, the readable resumes. The destination stream is returned for chaining.

```
use Hibla\Stream\ReadableResourceStream;
use Hibla\Stream\WritableResourceStream;

$source      = new ReadableResourceStream(fopen('/tmp/input.bin', 'rb'));
$destination = new WritableResourceStream(fopen('/tmp/output.bin', 'wb'));

$destination->on('finish', fn() => echo "Transfer complete\n");

// pipe() calls resume() internally — no need to call it yourself
$source->pipe($destination);
```

Pipe chains are composable — each `pipe()` call returns the destination:

```
use Hibla\Stream\ThroughStream;

$source
    ->pipe(new ThroughStream(fn($d) => gzencode($d)))
    ->pipe(new WritableResourceStream(fopen('/tmp/compressed.gz', 'wb')));
```

Pass `['end' => false]` to keep the destination open after the source ends — useful when piping multiple sources to the same destination sequentially:

```
// Source A's 'end' will NOT call dest->end()
$sourceA->pipe($dest, ['end' => false]);
$sourceA->on('end', function () use ($sourceB, $dest) {
    $sourceB->pipe($dest); // this one WILL close dest when finished
});
```

Calling `pipe()` on a non-readable source or a non-writable destination is a no-op — no error is thrown, making it safe to call defensively in cleanup paths.

### Through Streams

[](#through-streams)

`ThroughStream` is a duplex stream that sits in the middle of a pipe chain. Data written to it is emitted as `data` events on its readable side, optionally transformed by a callable. Without a transformer it acts as a transparent passthrough.

```
use Hibla\Stream\ThroughStream;

// Transform: compress mid-pipe
$source
    ->pipe(new ThroughStream(fn(string $data) => gzencode($data)))
    ->pipe($destination);

// Spy: inspect data mid-pipe without modifying it
$spy = new ThroughStream(function (string $data) {
    fwrite(STDERR, sprintf("[spy] %d bytes\n", strlen($data)));
    return $data; // must return data to pass it through
});

$source->pipe($spy)->pipe($destination);
```

`write()` on a closed `ThroughStream` emits an `error` event and returns `false`. `end()` and `close()` on an already-closed stream are no-ops. If the transformer throws, the `error` event fires and the stream closes.

---

Duplex Streams
--------------

[](#duplex-streams)

`DuplexResourceStream` wraps a single resource opened in read/write mode — such as a TCP socket or a file opened with `r+`. It presents a unified duplex interface while managing readable and writable sides on the same underlying resource internally.

Like all readable streams in Hibla, the readable side starts paused — attach your `data` and `error` listeners first, then call `resume()`. The writable side is always ready to accept `write()` calls immediately.

```
use Hibla\Stream\DuplexResourceStream;

$socket = stream_socket_client('tcp://api.example.com:80');
$duplex = new DuplexResourceStream($socket);

$duplex->on('data', function (string $response) use ($duplex) {
    echo $response;
    $duplex->close();
});

$duplex->on('error', fn(\Throwable $e) => echo "Error: " . $e->getMessage() . "\n");

// Write is always available immediately
$duplex->write("GET / HTTP/1.0\r\nHost: api.example.com\r\n\r\n");

// Start receiving after listeners are in place
$duplex->resume();
```

The resource must be opened in a read/write mode (containing `+` in its mode string). Passing a read-only or write-only resource throws a `StreamException`.

---

Composite Streams
-----------------

[](#composite-streams)

`CompositeStream` combines two independent, one-directional streams into a single duplex interface. This is useful when your readable and writable sides are separate resources — for example, a child process's stdout and stdin.

```
use Hibla\Stream\CompositeStream;
use Hibla\Stream\ReadableResourceStream;
use Hibla\Stream\WritableResourceStream;

$process = proc_open('ffmpeg -i pipe:0 -f mp3 pipe:1', [
    0 => ['pipe', 'r'],
    1 => ['pipe', 'w'],
], $pipes);

$composite = new CompositeStream(
    new ReadableResourceStream($pipes[1]), // process stdout → our readable
    new WritableResourceStream($pipes[0])  // our writable → process stdin
);

// Attach listeners before resuming
$composite->on('data', fn(string $chunk) => saveChunk($chunk));
$composite->on('end', fn() => proc_close($process));

$composite->resume();
$inputStream->pipe($composite);
```

Events from each inner stream are forwarded onto the composite automatically. The composite `close` event fires only when both inner streams have closed. Calling `close()` on an already-closed composite is a no-op.

---

Promise-Based API
-----------------

[](#promise-based-api)

The event-driven API — `on('data', ...)`, `pause()`, `resume()`, `drain` — is the right tool when you need maximum throughput and full control over flow. A TCP server handling hundreds of simultaneous connections, a proxy streaming bytes between two sockets, or a pipeline processing a continuous feed are all cases where the event-driven model pays off.

But not every use case needs that level of control. Reading a log file line by line, processing a CSV upload, writing a sequence of records to a file — these are tasks where setting up `data` listeners, managing the `end` event, and wiring `drain`callbacks is pure boilerplate. `PromiseReadableStream` and `PromiseWritableStream`eliminate that boilerplate. They extend their base classes with promise-returning methods that let you express sequential I/O as straight-line code.

The promise-based API manages pausing and resuming internally. `readAsync()` resumes the stream to fetch the next chunk and pauses it again once the chunk is delivered — data only flows when you ask for it. When writing, `writeAsync()` waits for the `drain` event automatically if the buffer is full — you never need to check `write()`'s return value or attach a `drain` listener yourself.

### Reading data

[](#reading-data)

```
use Hibla\Stream\PromiseReadableStream;
use function Hibla\await;

$stream = new PromiseReadableStream(fopen('/tmp/data.txt', 'rb'));

// Read the next chunk (up to $length bytes)
$chunk = await($stream->readAsync(1024));

// Read a full line including the trailing newline character
$line = await($stream->readLineAsync());

// Read the entire stream into a single string
$contents = await($stream->readAllAsync());
```

`readAsync()`, `readLineAsync()`, and `pipeAsync()` signal end-of-stream by resolving with `null`. Always check with a strict `!== null` guard — a truthiness check breaks on valid data:

ValueTruthiness check`!== null` check`"0"` — a line containing zerostops earlycontinues`""` — an empty stringstops earlycontinues`"\n"` — a blank linestops earlycontinues`null` — true EOFstopsstops```
// CORRECT — stops only on null (EOF)
while (($line = await($stream->readLineAsync())) !== null) {
    processLine(rtrim($line));
}

// WRONG — stops on any falsy chunk, including valid data like "0" or "\n"
while ($line = await($stream->readLineAsync())) {
    processLine($line);
}
```

`readAllAsync()` is the exception — it resolves with a plain string (never `null`) because it accumulates everything and returns the complete contents in one go:

```
$contents = await($stream->readAllAsync());
```

Both `readAllAsync()` and `readLineAsync()` accept a `$maxLength` parameter to prevent unbounded memory usage:

```
$contents = await($stream->readAllAsync(maxLength: 524288));  // 512 KiB limit
$line     = await($stream->readLineAsync(maxLength: 4096));   // 4 KiB per line
```

Calling `readAsync()` on a stream that has already reached EOF resolves immediately with `null` — no event loop tick needed.

### Reading exact byte counts

[](#reading-exact-byte-counts)

`readAsync($length)` resolves with **up to** `$length` bytes — not exactly `$length`bytes. The `$length` argument is passed to `fread()` as the maximum read size, but `fread()` returns whatever the OS has buffered at the moment the read watcher fires, which may be less than requested.

This is correct for general streaming, but wrong for binary protocol parsing where message boundaries are defined by fixed field sizes. For those cases, loop over `readAsync()` until you have accumulated the required count:

```
/**
 * Read exactly $length bytes from a stream.
 * Returns null if EOF is reached before $length bytes are available.
 *
 * @return string|null
 */
function readExact(PromiseReadableStream $stream, int $length): ?string
{
    $buffer    = '';
    $remaining = $length;

    while ($remaining > 0) {
        $chunk = await($stream->readAsync($remaining));

        if ($chunk === null) {
            return null; // EOF before enough bytes arrived
        }

        $buffer    .= $chunk;
        $remaining -= strlen($chunk);
    }

    return $buffer;
}
```

With that helper, binary protocol parsing is correct regardless of how the OS delivers data:

```
// Read a length-prefixed binary message:
// [ 4-byte uint32 length ][ N bytes payload ]

$header = readExact($stream, 4);
if ($header === null) {
    return; // clean EOF — no more messages
}

$payloadLength = unpack('N', $header)[1];
$payload       = readExact($stream, $payloadLength);

if ($payload === null) {
    throw new \RuntimeException("Truncated message: stream ended early");
}
```

Use `readLineAsync()` for text protocols, `readExact()` for binary protocols, and raw `readAsync()` only when your processing logic is genuinely chunk-size-agnostic — streaming file copies, proxying raw bytes, or feeding a streaming parser that handles partial input itself.

### Writing data

[](#writing-data)

```
use Hibla\Stream\PromiseWritableStream;
use function Hibla\await;

$stream = new PromiseWritableStream(fopen('/tmp/out.txt', 'wb'));

// Resolves with the number of bytes buffered
// Backpressure is handled internally — no drain listener needed
$bytes = await($stream->writeAsync("Hello, world\n"));

// Write a line (appends "\n" automatically)
await($stream->writeLineAsync("Another line"));

// End the stream and wait for all data to flush
// Resolves only after 'finish' fires — all data is durably written
await($stream->endAsync());
```

`writeAsync('')` resolves immediately with `0` — no write is attempted. `endAsync()` called on a stream that is already ending or already closed resolves immediately — it is safe to call defensively.

### Piping with `pipeAsync()`

[](#piping-with-pipeasync)

`pipeAsync()` pipes a `PromiseReadableStream` to any writable stream and resolves with the total number of bytes transferred once the source ends. Backpressure between source and destination is handled automatically.

```
use Hibla\Stream\PromiseReadableStream;
use Hibla\Stream\WritableResourceStream;
use function Hibla\await;

$source = new PromiseReadableStream(fopen('/tmp/large.bin', 'rb'));
$dest   = new WritableResourceStream(fopen('/tmp/copy.bin', 'wb'));

$totalBytes = await($source->pipeAsync($dest));
echo "Transferred: $totalBytes bytes\n";
```

### Cancellation

[](#cancellation)

All promise-based methods return a standard `PromiseInterface` — cancel any in-flight operation by calling `cancel()` on the returned promise. Cancelling detaches all internal event listeners, pauses the stream, and cleans up pending state. No further callbacks fire after cancellation.

```
use Hibla\EventLoop\Loop;
use function Hibla\await;

$readable    = new PromiseReadableStream(fopen('/tmp/large.log', 'rb'));
$readPromise = $readable->readLineAsync();

$timerId = Loop::addTimer(2.0, function () use ($readPromise) {
    $readPromise->cancel();
});

try {
    $line = await($readPromise);
    Loop::cancelTimer($timerId);
} catch (\Hibla\Promise\Exceptions\CancelledException $e) {
    echo "Read cancelled — no data within 2 seconds\n";
}
```

Cancelling `pipeAsync()` stops the transfer immediately, pauses the source, and detaches the destination listener without closing either stream:

```
$transferPromise = $source->pipeAsync($dest);

Loop::addTimer(5.0, fn() => $transferPromise->cancel());

try {
    $totalBytes = await($transferPromise);
} catch (\Hibla\Promise\Exceptions\CancelledException $e) {
    echo "Transfer cancelled\n";
    // $source and $dest are still open — you decide what to do with them
}
```

For structured cancellation across multiple operations, use `CancellationTokenSource` from `hiblaphp/cancellation`:

```
use Hibla\Cancellation\CancellationTokenSource;
use function Hibla\await;

$cts = new CancellationTokenSource(30.0); // 30 second hard limit

try {
    while (($line = await($stream->readLineAsync(), $cts->token)) !== null) {
        processLine(rtrim($line));
    }
} catch (\Hibla\Promise\Exceptions\CancelledException $e) {
    echo "Stream read timed out after 30 seconds\n";
}
```

---

Standard I/O
------------

[](#standard-io)

```
use Hibla\Stream\PromiseReadableStream;
use Hibla\Stream\WritableResourceStream;
use Hibla\Stream\CompositeStream;
use Hibla\Stream\ReadableResourceStream;
use function Hibla\await;

// Read from STDIN line by line
$stdin = new PromiseReadableStream(STDIN);

while (($line = await($stdin->readLineAsync())) !== null) {
    processLine(rtrim($line));
}

// Write to STDOUT respecting backpressure
$stdout = new WritableResourceStream(STDOUT);
$stdout->write("Hello from async PHP\n");

// Write errors to STDERR
$stderr = new WritableResourceStream(STDERR);
$stderr->write("Something went wrong\n");

// Combined interactive console — STDIN readable, STDOUT writable
$stdio = new CompositeStream(
    new ReadableResourceStream(STDIN),
    new WritableResourceStream(STDOUT)
);

$stdio->on('data', fn(string $input) => $stdio->write("Echo: $input"));
$stdio->resume();
```

---

Platform Notes
--------------

[](#platform-notes)

### Windows non-blocking limitations

[](#windows-non-blocking-limitations)

Non-blocking mode is set automatically on stream construction, but the stream types that support it differ between Unix and Windows.

On **Unix and macOS**, non-blocking mode is applied to sockets, pipes, STDIO handles, plain files, and in-memory streams (`php://memory`, `php://temp`).

On **Windows**, non-blocking mode is only applied to socket and pipe resources. Plain files, STDIO handles, and in-memory streams are left in blocking mode because PHP's `stream_set_blocking()` has no effect on non-socket handles on Windows. This means that on Windows, reading from a file or writing to STDOUT through a Hibla stream will block the event loop for the duration of the operation — exactly as a raw `fread()` or `fwrite()` call would.

If you are building an application that must run on Windows and needs truly non-blocking file or STDIO I/O, offload those operations to a worker process via `hiblaphp/parallel` rather than using stream watchers directly. Socket-based streams — TCP, UDP, Unix sockets — behave identically on all platforms.

---

Stream Lifecycle and Events
---------------------------

[](#stream-lifecycle-and-events)

This section documents the exact sequence of events emitted by each stream type. It is reference material — understanding lifecycle order matters when you are implementing custom flow control, building protocol parsers, or debugging unexpected behaviour.

### Readable stream lifecycle

[](#readable-stream-lifecycle)

A `ReadableResourceStream` always starts paused. No watcher is registered, no data flows, and no events fire until `resume()` is called or a `data` listener is attached.

```
new ReadableResourceStream($resource)
       │
       ▼
  ┌─────────┐
  │  PAUSED │  ◄─── pause() called (or initial state)
  └────┬────┘
       │ resume()
       ▼
  ┌─────────┐
  │ FLOWING │  ◄─── read watcher registered with event loop
  └────┬────┘
       │ data arrives
       ├──────────────── emit('data', $chunk)    ← repeats each read
       │
       │ pause() called
       ├──────────────── emit('pause')
       │                 read watcher removed
       │
       │ EOF reached
       ├──────────────── emit('end')
       │                 emit('close')           ← always follows 'end'
       │                 resource closed
       │
       │ read error
       └──────────────── emit('error', $e)
                         emit('close')           ← always follows 'error'
                         resource closed

```

`end` and `close` are always emitted in that order on EOF. A read error skips `end`and goes directly to `close`. After `close`, all listeners are removed — any listener attached after `close` will never fire.

```
$stream = new ReadableResourceStream(fopen('/var/log/app.log', 'rb'));

$stream->on('data',  fn(string $chunk)  => echo $chunk);
$stream->on('end',   fn()              => echo "Done reading\n");
$stream->on('close', fn()              => echo "Stream closed\n");
$stream->on('error', fn(\Throwable $e) => echo "Error: " . $e->getMessage() . "\n");

$stream->resume();
```

### Writable stream lifecycle

[](#writable-stream-lifecycle)

A `WritableResourceStream` is ready immediately — no `resume()` needed.

```
new WritableResourceStream($resource)
       │
       ▼
  ┌──────────┐
  │ WRITABLE │
  └────┬─────┘
       │ write($data) — data buffered, write watcher registered
       │
       │  buffer < softLimit  ──► write() returns true
       │  buffer >= softLimit ──► write() returns false
       │
       │ write watcher drains the buffer
       ├──────────────── emit('drain')
       │
       │ end() called
       ├──────────────── emit('finish')
       │                 emit('close')
       │
       │ write() on closed stream
       └──────────────── emit('error', $e)

```

```
$stream = new WritableResourceStream(fopen('/tmp/output.log', 'wb'));

$stream->on('drain',  fn()             => echo "Drained, can write again\n");
$stream->on('finish', fn()             => echo "All data written\n");
$stream->on('close',  fn()             => echo "Stream closed\n");
$stream->on('error',  fn(\Throwable $e) => echo "Write error: " . $e->getMessage() . "\n");

$stream->write("Hello\n");
$stream->end("Goodbye\n");
```

### Pipe event flow

[](#pipe-event-flow)

`pipe()` coordinates `data`, `drain`, `end`, and `close` events between source and destination automatically:

```
ReadableResourceStream              WritableResourceStream
       │                                    │
  emit('data', $chunk) ──────────────────► write($chunk)
       │
       │             [buffer not full]
       │         write() returns true ◄─────
       │  keep flowing
       │
  emit('data', $chunk) ──────────────────► write($chunk)
       │
       │             [buffer now full]
       │         write() returns false ◄────
  pause() ◄──────────────────────────────── STOP SOURCE
       │
       │  [event loop drains the buffer]
       │                           emit('drain')
       │  ◄── resume() ────────────────────┘
       │  [flowing again]
       │
  emit('end') ───────────────────────────► end()
       │                              emit('finish')
       │                              emit('close')

```

### `ThroughStream` event flow

[](#throughstream-event-flow)

`ThroughStream` is both a writable (receives `write()` calls) and a readable (emits `data` events). Unlike resource-backed streams, it has no I/O watcher — the event loop is not involved.

```
Upstream writes          ThroughStream          Downstream reads
     │                        │                        │
     │  write($chunk)         │                        │
     ├───────────────────────►│                        │
     │                 [transform($chunk)]             │
     │                  emit('data', $result) ────────►│
     │                        │                        │
     │  end($final)           │                        │
     ├───────────────────────►│                        │
     │                  emit('data', $transformed)     │
     │                  emit('end')                    │
     │                  emit('finish')                 │
     │                  emit('close')                  │
     │                        │                        │
     │  transformer throws    │                        │
     │                  emit('error', $e)              │
     │                  emit('close')                  │

```

### `CompositeStream` and `DuplexResourceStream` events

[](#compositestream-and-duplexresourcestream-events)

Both are wrappers around underlying streams. Their events are forwarded from the inner streams, not re-emitted independently.

The `close` event on a `CompositeStream` fires only when **both** inner streams have closed. On a `DuplexResourceStream`, `close` fires as soon as either side closes because both share the same underlying resource.

### Error event behaviour

[](#error-event-behaviour)

Both readable and writable streams follow the same rule: an `error` event is always followed by `close`. The stream closes itself after emitting `error`. You do not need to call `close()` inside an error handler.

```
// CORRECT — just handle the error; close fires on its own
$stream->on('error', function (\Throwable $e) {
    echo "Error: " . $e->getMessage() . "\n";
});

$stream->on('close', function () {
    cleanupResources();
});
```

---

No-Op Behaviour
---------------

[](#no-op-behaviour)

All stream types are designed to be called defensively without needing to check state first. Redundant calls are silently ignored — no exceptions are thrown, no duplicate events fire.

CallConditionBehaviour`pause()`Stream already paused or closedNo-op`resume()`Stream already flowing or closedNo-op`close()`Stream already closedNo-op`end()`Stream already ending or closedNo-op`write('')`Any writable streamNo-op — returns `true`, no buffer interaction`writeAsync('')`Any `PromiseWritableStream`Resolves immediately with `0``endAsync()`Stream already ending or closedResolves immediately`readAsync()`Stream already at EOFResolves immediately with `null``pipe()`Source not readableNo-op — returns destination unchanged`pipe()`Destination not writablePauses source, returns destination unchanged`removeReadWatcher()`Watcher already removedNo-op — returns `false``removeWriteWatcher()`Watcher already removedNo-op — returns `false`The one exception is `write()` on a closed stream — this emits an `error` event and returns `false` rather than silently succeeding, because writing to a closed stream is almost always a logic error that should surface rather than be swallowed.

---

Resource Cleanup and Destructors
--------------------------------

[](#resource-cleanup-and-destructors)

All stream classes implement `__destruct`. If the stream has not been explicitly closed by the time the object is garbage collected, the destructor calls `close()`automatically to free the underlying resource. Stream resources are never silently leaked.

However, the destructor calls `close()` directly — it does not call `end()` first. For writable streams this has an important consequence: **any data still buffered at destruction time is discarded and the `finish` event never fires.** If you rely on `finish` to confirm that all data has been flushed, always call `end()` or `endAsync()`explicitly before letting the stream go out of scope.

```
// Wrong — buffer may be discarded if $stream goes out of scope
$stream = new WritableResourceStream(fopen('/tmp/output.txt', 'wb'));
$stream->write("Important data\n");
// $stream goes out of scope — destructor calls close(), buffer is discarded

// Correct — drain the buffer before releasing the stream
$stream = new WritableResourceStream(fopen('/tmp/output.txt', 'wb'));
$stream->on('finish', fn() => echo "All data flushed\n");
$stream->write("Important data\n");
$stream->end();
```

With the promise-based API this risk is eliminated as long as you await every write and end call to completion. By the time `endAsync()` resolves, all data is durably written and the destructor has nothing left to discard:

```
$stream = new PromiseWritableStream(fopen('/tmp/output.txt', 'wb'));
await($stream->writeAsync("Important data\n"));
await($stream->endAsync()); // all data flushed before this resolves
```

The destructor is a safety net for resource handles, not a substitute for explicit lifecycle management.

---

Events Reference
----------------

[](#events-reference)

### Readable stream events

[](#readable-stream-events-1)

EventArgumentsDescription`data``string $chunk`Fires when a chunk of data is available`end`—Fires when the stream reaches EOF — no more `data` events will follow`close`—Fires when the underlying resource is closed`error``\Throwable $e`Fires on a read error. The stream closes after emitting `error``pause`—Fires when the stream is paused`resume`—Fires when the stream resumes### Writable stream events

[](#writable-stream-events-1)

EventArgumentsDescription`drain`—Fires when the write buffer drops below the soft limit — safe to write again`finish`—Fires after `end()` is called and all buffered data has been flushed`close`—Fires when the underlying resource is closed`error``\Throwable $e`Fires on a write error. The stream closes after emitting `error`---

API Reference
-------------

[](#api-reference)

### `Stream` factory

[](#stream-factory)

MethodReturnsDescription`Stream::readable($resource, $chunkSize)``ReadableResourceStream`Wrap a readable resource`Stream::writable($resource, $softLimit)``WritableResourceStream`Wrap a writable resource`Stream::duplex($resource, $readChunkSize, $writeSoftLimit)``DuplexResourceStream`Wrap a read/write resource`Stream::composite($readable, $writable)``CompositeStream`Combine two streams into one duplex`Stream::through(?callable $transformer)``ThroughStream`Create a transform stream`Stream::readableFile($path, $chunkSize)``ReadableResourceStream`Open a file for reading`Stream::writableFile($path, $append, $softLimit)``WritableResourceStream`Open a file for writing`Stream::duplexFile($path, $readChunkSize, $writeSoftLimit)``DuplexResourceStream`Open a file for read/write`Stream::stdin($chunkSize)``ReadableResourceStream`STDIN as a readable stream`Stream::stdout($softLimit)``WritableResourceStream`STDOUT as a writable stream`Stream::stderr($softLimit)``WritableResourceStream`STDERR as a writable stream`Stream::stdio($readChunkSize, $writeSoftLimit)``CompositeStream`STDIN + STDOUT as a single duplex### `ReadableStreamInterface`

[](#readablestreaminterface)

MethodReturnsDescription`pipe($destination, $options)``WritableStreamInterface`Pipe to a writable stream with automatic backpressure`isReadable()``bool`True if the stream is open and readable`pause()``void`Stop emitting `data` events. No-op if already paused or closed`resume()``void`Resume emitting `data` events. No-op if already flowing or closed`close()``void`Close the stream and free the resource. No-op if already closed### `ReadableResourceStream`

[](#readableresourcestream)

Extends `ReadableStreamInterface` with:

MethodReturnsDescription`isEof()``bool`True if the stream has reached the end of the resource`isPaused()``bool`True if the stream is currently paused`seek($offset, $whence)``bool`Reposition the stream pointer. Clears internal buffer and resets EOF. Returns `false` on non-seekable resources. Throws `StreamException` if the stream is closed`tell()``int|false`Return the current byte position. Returns `false` if undetermined. Throws `StreamException` if the stream is closed### `WritableStreamInterface`

[](#writablestreaminterface)

MethodReturnsDescription`write($data)``bool`Write data. Returns `false` if the buffer is full (backpressure). Emits `error` if called on a closed stream`end($data)``void`Signal end-of-stream, optionally writing a final chunk. No-op if already ending or closed`isWritable()``bool`True if the stream is open and writable`close()``void`Close the stream, discarding any buffered data. No-op if already closed### `WritableResourceStream`

[](#writableresourcestream)

Extends `WritableStreamInterface` with:

MethodReturnsDescription`isEnding()``bool`True if `end()` has been called and the stream is draining its buffer before closing### `PromiseReadableStreamInterface`

[](#promisereadablestreaminterface)

MethodReturnsDescription`readAsync($length)``PromiseInterface`Read a chunk. Resolves with `null` at EOF. Supports cancellation`readLineAsync($maxLength)``PromiseInterface`Read until `\n` or `$maxLength`. Resolves with `null` at EOF. Supports cancellation`readAllAsync($maxLength)``PromiseInterface`Read entire stream into a string. Supports cancellation`pipeAsync($destination, $options)``PromiseInterface`Pipe to a writable stream. Resolves with total bytes transferred. Supports cancellation### `PromiseWritableStreamInterface`

[](#promisewritablestreaminterface)

MethodReturnsDescription`writeAsync($data)``PromiseInterface`Write data. Waits for `drain` if buffer is full. Resolves with `0` if `$data` is empty. Supports cancellation`writeLineAsync($data)``PromiseInterface`Write data with an appended `\n`. Supports cancellation`endAsync($data)``PromiseInterface`End the stream. Resolves when all buffered data is flushed. Resolves immediately if already ending or closed. Supports cancellation---

Development
-----------

[](#development)

```
git clone https://github.com/hiblaphp/stream.git
cd stream
composer install
./vendor/bin/pest
./vendor/bin/phpstan analyse
```

---

Credits
-------

[](#credits)

- **API Design:** Inspired by the [ReactPHP Streams](https://github.com/reactphp/stream)interface. If you are familiar with ReactPHP's stream API, Hibla's will feel immediately familiar — with the addition of native promise-based methods and Fiber-aware I/O.
- **Event Emitter:** Built on [evenement/evenement](https://github.com/igorw/evenement).
- **Event Loop Integration:** Powered by [hiblaphp/event-loop](https://github.com/hiblaphp/event-loop).
- **Promise Integration:** Built on [hiblaphp/promise](https://github.com/hiblaphp/promise).

---

License
-------

[](#license)

MIT License. See [LICENSE](./LICENSE) for more information.

###  Health Score

26

—

LowBetter than 43% of packages

Maintenance63

Regular maintenance activity

Popularity19

Limited adoption so far

Community6

Small or concentrated contributor base

Maturity11

Early-stage or recently created project

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

### Community

Maintainers

![](https://www.gravatar.com/avatar/adefeba01f7aab9f1eed1cc3f5015be296d86126a4adb50257d81ec0bf23c8ca?d=identicon)[RCalicdan](/maintainers/RCalicdan)

---

Top Contributors

[![rcalicdan](https://avatars.githubusercontent.com/u/163510169?v=4)](https://github.com/rcalicdan "rcalicdan (61 commits)")

### Embed Badge

![Health badge](/badges/hiblaphp-stream/health.svg)

```
[![Health](https://phpackages.com/badges/hiblaphp-stream/health.svg)](https://phpackages.com/packages/hiblaphp-stream)
```

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
