PHPackages                             toalett/react-stream-adapter - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. toalett/react-stream-adapter

ActiveLibrary

toalett/react-stream-adapter
============================

Use any data source as a stream with ReactPHP

1.0(5y ago)21711MITPHPPHP ^7.4

Since Jan 6Pushed 5y ago1 watchersCompare

[ Source](https://github.com/toalett-io/react-stream-adapter)[ Packagist](https://packagist.org/packages/toalett/react-stream-adapter)[ Docs](https://github.com/toalett-io/react-stream-adapter)[ RSS](/packages/toalett-react-stream-adapter/feed)WikiDiscussions master Synced 1mo ago

READMEChangelog (1)Dependencies (1)Versions (2)Used By (1)

🚽 Toalett
=========

[](#-toalett)

Welcome to Toalett, a humble initiative. Toalett is the Norwegian word for toilet 💩.

What is `toalett/react-stream-adapter`?
---------------------------------------

[](#what-is-toalettreact-stream-adapter)

It is a library that allows any datasource to be used as a stream with ReactPHP. It is very small - there is [one interface](src/Source.php), [one class](src/StreamAdapter.php)and [one trait](src/EndlessTrait.php). Its only dependency is `react/stream`.

The [`StreamAdapter`](src/StreamAdapter.php) takes an implementation of the [`Source`](src/Source.php) interface and makes it approachable as a [stream](https://reactphp.org/stream/) in applications using an [event loop](https://reactphp.org/event-loop/).

Installation
------------

[](#installation)

It is available on [Packagist](https://packagist.org/packages/toalett/):

```
composer require toalett/react-stream-adapter
```

Motivation
----------

[](#motivation)

I was working on a project that required an application to respond to AMQP messages in a non-blocking way. The application made use of an event loop. Initially I used a periodic timer with a callback, but as the application grew this became a cluttered mess. It slowly started to feel more natural to treat the message queue as a stream. This makes sense if you think about it:

> In computer science, a stream is a sequence of data elements made available over time. A stream can be thought of as items on a conveyor belt being processed one at a time rather than in large batches.
>
> — [Stream (computing) on Wikipedia](https://en.wikipedia.org/wiki/Stream_(computing))

This definition suits a message queue.

In the project I mentioned earlier, I use this library to poll an AMQP queue every 10 seconds. This keeps my load low and allows me to do other things in the meantime. This abstraction turned out really useful, so I thought that others might enjoy it too.

How do I use this?
------------------

[](#how-do-i-use-this)

There are only three components to worry about, and one of them is optional! The main components are the [`Source`](src/Source.php) interface and the [`StreamAdapter`](src/StreamAdapter.php) class. The optional component is the [`EndlessTrait`](src/EndlessTrait.php), which can be used in an endless source.

The steps you need to take to use this library are as follows:

1. Define a class that is able to generate or provide some data. It must implement the [`Source`](src/Source.php)interface.
2. The `select()` method is called periodically. This is where you return your next piece of data. Make sure the `select()` method returns anything that is not `null` when data is available (anything goes) or `null`when there is none. You may add a typehint to your implementation of `select()` such as `select(): ?string`or `select(): ?MyData` for improved clarity.
3. The interface also specifies the `close(): void` and `eof(): bool` methods. In an endless (infinite) stream, `close()` may be left empty and `eof()` should return false (EOF is never reached). The [`EndlessTrait`](src/EndlessTrait.php)provides these implementations.
4. Use the [`StreamAdapter`](src/StreamAdapter.php) to attach your [`Source`](src/Source.php) to the loop.
5. Interact with the adapter as if it were any other `ReadableInputStream`.

*Note:* This library uses polling under the hood. The default polling interval is 0.5 seconds, though if checking for data is an intensive operation, you might want to increase the interval a bit to prevent slowdowns. This is a tradeoff between responsiveness and load. Custom intervals can be set by passing them as a third argument to the [`StreamAdapter`](src/StreamAdapter.php) constructor.

*Note:* The [`StreamAdapter`](src/StreamAdapter.php) reads data eagerly from the source - it won't stop reading until there is nothing left to read. This prevents congestion when high polling intervals are used but it might block execution for a while when there is a lot of data to be read or if your `select()` routine takes some time.

```
$loop = Factory::create();

$source = new MySource(); // implements Source
$stream = new StreamAdapter($source, $loop);
$stream->on('data', function(MyData $data) {
    /* do something with data */
});

$loop->run();
```

Check out the [examples](examples) folder for some simple implementations.

Questions
---------

[](#questions)

**Q**: *Where is the code that deals with AMQP messages*?
**A**: It will be released in a separate package, but it needs some work before it can be published.

**Q**: *Where are the tests*?
**A**: There is only one class, and it is mostly based on the `ReadableResourceStream` from `react/stream`. Tests might be added later, but as of now, I don't really see the value. Feel free to create an issue if this bothers you!

###  Health Score

26

—

LowBetter than 43% of packages

Maintenance20

Infrequent updates — may be unmaintained

Popularity10

Limited adoption so far

Community12

Small or concentrated contributor base

Maturity53

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Unknown

Total

1

Last Release

1949d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/354e41b4e3f076892667d134069d733b8ece2f4879904c0638eb40500332a8e6?d=identicon)[Joop Schilder](/maintainers/Joop%20Schilder)

![](https://www.gravatar.com/avatar/03595909ac421f84fa7cebf40aea80f705559bdced93d9d0928e2a4fe30a954a?d=identicon)[toalett-io](/maintainers/toalett-io)

---

Top Contributors

[![joopschilder](https://avatars.githubusercontent.com/u/23739275?v=4)](https://github.com/joopschilder "joopschilder (3 commits)")

---

Tags

streamnon-blockingreactphpevent-drivenadapterioreadableeventloopreactsourcenio

### Embed Badge

![Health badge](/badges/toalett-react-stream-adapter/health.svg)

```
[![Health](https://phpackages.com/badges/toalett-react-stream-adapter/health.svg)](https://phpackages.com/packages/toalett-react-stream-adapter)
```

###  Alternatives

[react/stream

Event-driven readable and writable streams for non-blocking I/O in ReactPHP

688126.8M194](/packages/react-stream)[react/socket

Async, streaming plaintext TCP/IP and secure TLS socket server and client connections for ReactPHP

1.3k116.9M401](/packages/react-socket)[amphp/byte-stream

A stream abstraction to make working with non-blocking I/O simple.

392116.2M104](/packages/amphp-byte-stream)[react/http

Event-driven, streaming HTTP client and server implementation for ReactPHP

78126.4M414](/packages/react-http)[react/child-process

Event-driven library for executing child processes with ReactPHP.

34076.1M135](/packages/react-child-process)[react/promise-stream

The missing link between Promise-land and Stream-land for ReactPHP

11512.9M45](/packages/react-promise-stream)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
