PHPackages                             arekxv/php-data-streamer - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Utility &amp; Helpers](/categories/utility)
4. /
5. arekxv/php-data-streamer

ActiveLibrary[Utility &amp; Helpers](/categories/utility)

arekxv/php-data-streamer
========================

1.0.1(4y ago)117Apache-2.0PHPPHP &gt;=7.4

Since Jul 4Pushed 4y ago1 watchersCompare

[ Source](https://github.com/ArekX/php-data-streamer)[ Packagist](https://packagist.org/packages/arekxv/php-data-streamer)[ RSS](/packages/arekxv-php-data-streamer/feed)WikiDiscussions main Synced 4d ago

READMEChangelog (2)Dependencies (3)Versions (3)Used By (0)

Redis Data Streamer
===================

[](#redis-data-streamer)

[![Documentation Status](https://camo.githubusercontent.com/66d439fd7b25827dda5e7d069129f429cdcf690e9161ab77b3d2e3577f461a3a/68747470733a2f2f72656164746865646f63732e6f72672f70726f6a656374732f7068702d646174612d73747265616d65722f62616467652f3f76657273696f6e3d6c6174657374)](https://php-data-streamer.readthedocs.io/en/latest/?badge=latest)[![Build Status](https://camo.githubusercontent.com/748d896df096644616bdbd0abcb682125036590f7a23b3c6770a7ecd25788ca9/68747470733a2f2f7363727574696e697a65722d63692e636f6d2f672f4172656b582f7068702d646174612d73747265616d65722f6261646765732f6275696c642e706e673f623d6d61696e)](https://scrutinizer-ci.com/g/ArekX/php-data-streamer/build-status/main)[![Scrutinizer Code Quality](https://camo.githubusercontent.com/9b8c90002eef8b795faa73822f6880c95848f37a84b57673b2166eed1022da48/68747470733a2f2f7363727574696e697a65722d63692e636f6d2f672f4172656b582f7068702d646174612d73747265616d65722f6261646765732f7175616c6974792d73636f72652e706e673f623d6d61696e)](https://scrutinizer-ci.com/g/ArekX/php-data-streamer/?branch=main)[![Code Coverage](https://camo.githubusercontent.com/715345b899f3ed63cfd726051c0a9a25c3a1e500a703ed18d484b6d8d4195354/68747470733a2f2f7363727574696e697a65722d63692e636f6d2f672f4172656b582f7068702d646174612d73747265616d65722f6261646765732f636f7665726167652e706e673f623d6d61696e)](https://scrutinizer-ci.com/g/ArekX/php-data-streamer/?branch=main)[![Code Intelligence Status](https://camo.githubusercontent.com/4f2b3d5e4976e5cf9dc815dc53286943bd8de2a32ca6b09152388335fd75065a/68747470733a2f2f7363727574696e697a65722d63692e636f6d2f672f4172656b582f7068702d646174612d73747265616d65722f6261646765732f636f64652d696e74656c6c6967656e63652e7376673f623d6d61696e)](https://scrutinizer-ci.com/code-intelligence)

Data streaming library which handles multiple redis drivers and provides easy access to Redis Streams and Consumer Groups.

Both php-redis and predis drivers are supported.

Installation
------------

[](#installation)

Through composer: `composer require arekxv/php-data-streamer`

Usage
=====

[](#usage)

This library supports both reading the data stream and writing to a data stream.

Reading and handling messages
-----------------------------

[](#reading-and-handling-messages)

To set your code to listen and read messages the code should be setup as follows:

```
use ArekX\DataStreamer\Data\ArrayMessage;
use ArekX\DataStreamer\Data\CallableHandler;
use ArekX\DataStreamer\Data\CallableParser;
use ArekX\DataStreamer\Data\Settings;
use ArekX\DataStreamer\Drivers\RedisDriver;
use ArekX\DataStreamer\StreamReader;

// Specify a redis driver to use
$driver = new RedisDriver(); // or new \ArekX\DataStreamer\Drivers\PredisDriver() if Predis package is used
$driver->connect([
    'host' => '127.0.0.1'
]);

// Specify a message converter from array into instances
// This can be one callable via setDefaultBuilder or per type in setBuilder
$parser = new CallableParser();
$parser->setDefaultBuilder(fn(string $id, string $type, array $payload) => ArrayMessage::create($type, $payload, $id));

// Set handler for messages this can be a default one for all messages
// or a custom one per Message::getType()
$handler = new CallableHandler();
$handler->setDefaultHandler(function (ArrayMessage $message) {
    echo "{$message->getId()}: " . json_encode($message->getPayload()) . PHP_EOL;
    return true; // Returning true means that message was handled successfully.
});

// Settings object which holds the configuration for the stream.
$settings = new Settings([
    'stream' => 'data-stream',
    'consumerGroup' => 'my-consumer-group',
    'consumerName' => 'my-consumer-consumer',
]);

// Initialize data stream reader.
$reader = new StreamReader($driver, $parser, $handler, $settings);

// Run infinite loop to process messages.
echo "Listening..." . PHP_EOL;
$reader->runLoop();
```

Sending messages
----------------

[](#sending-messages)

To send messages you can send the data using code below:

```
use ArekX\DataStreamer\Data\ArrayMessage;
use ArekX\DataStreamer\Data\PayloadMessageConverter;
use ArekX\DataStreamer\Data\Settings;
use ArekX\DataStreamer\Drivers\RedisDriver;
use ArekX\DataStreamer\StreamWriter;

// Specify a redis driver to use
$driver = new RedisDriver(); // or new \ArekX\DataStreamer\Drivers\PredisDriver() if Predis package is used
$driver->connect([
    'host' => '127.0.0.1'
]);

// Settings object which holds the configuration for the stream.
$settings = new Settings([
    'stream' => 'data-stream'
]);

// Define a converter which will convert a message into an array
// suitable for sending across the data stream.
$converter = new PayloadMessageConverter();

// Initialize a stream writer
$writer = new StreamWriter($driver, $settings, $converter);

// Send message to the data stream
$writer->write(ArrayMessage::create('test-type', [
    'key' => 'value',
    'key2' => 'value2'
]));
```

Documentation
=============

[](#documentation)

Documentation is available at:

You can also build documentation manually or read it directly from `docs` folder.

Building
--------

[](#building)

1. Install Python 3 and PIP
2. Run `pip install mkdocs`
3. Run `mkdocs serve` to serve or `mkdocs build` to build.

Tests
=====

[](#tests)

To run tests run `composer test`.

License
=======

[](#license)

Copyright Aleksandar Panic

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

 or [in this repository](LICENSE.md)

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

###  Health Score

24

—

LowBetter than 32% of packages

Maintenance20

Infrequent updates — may be unmaintained

Popularity7

Limited adoption so far

Community7

Small or concentrated contributor base

Maturity52

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~24 days

Total

2

Last Release

1752d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/3c16806a6fc4805b54005c72ea7aa0b6bc33f889a4a657a3071953f6b4e4a23c?d=identicon)[ArekXV](/maintainers/ArekXV)

---

Top Contributors

[![ArekX](https://avatars.githubusercontent.com/u/4344776?v=4)](https://github.com/ArekX "ArekX (13 commits)")

###  Code Quality

TestsCodeception

### Embed Badge

![Health badge](/badges/arekxv-php-data-streamer/health.svg)

```
[![Health](https://phpackages.com/badges/arekxv-php-data-streamer/health.svg)](https://phpackages.com/packages/arekxv-php-data-streamer)
```

###  Alternatives

[joshribakoff/php-csv-utils

php-csv-utils

1143.8k1](/packages/joshribakoff-php-csv-utils)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
