PHPackages                             alsc/streambus - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Caching](/categories/caching)
4. /
5. alsc/streambus

ActiveLibrary[Caching](/categories/caching)

alsc/streambus
==============

Redis Streams message bus

0.2.1(4mo ago)15.6k↓48.1%MITPHPPHP &gt;=8.2CI passing

Since Jul 19Pushed 1mo ago1 watchersCompare

[ Source](https://github.com/alschastny/streambus)[ Packagist](https://packagist.org/packages/alsc/streambus)[ RSS](/packages/alsc-streambus/feed)WikiDiscussions master Synced today

READMEChangelog (3)Dependencies (7)Versions (5)Used By (0)

Redis Streams Bus
=================

[](#redis-streams-bus)

This library enables you to use [Redis Streams](https://redis.io/docs/latest/develop/data-types/streams/) as a message bus or queue.

At the technical level, Redis Streams Bus is a collection of Redis streams, each designated for a specific subject or message type, along with abstractions for working with them.

Features
--------

[](#features)

- Handle multiple message types and subjects within a single abstraction
- Independent consumption via consumer groups with any required number of consumers
- Ordered consumption within each subject
- Recover consumption after failures
- Delivery attempt counter
- Place unprocessed messages into a DLQ (Dead Letter Queue)
- Delay processing (NACK) for an arbitrary period in case of failure
- Set a maximum time for message processing
- Queue mode operation with deletion of processed items
- Limit the time and number of stored messages
- Tools for monitoring consumption and bus state

Requirements
------------

[](#requirements)

- **PHP** &gt;= 8.2
- **Redis** or **Valkey** &gt;= 7.2

Installation
------------

[](#installation)

```
composer require alsc/streambus
```

Usage
-----

[](#usage)

The library provides a [factory](src/StreamBusBuilder.php) for configuring and creating the necessary classes. The main components and settings are described below. You can find more usage examples in the [examples](examples) folder and in the [tests](tests) of the repository.

### Settings

[](#settings)

Class `StreamBus\StreamBusSettings`

This class defines the settings for the entire message bus and is used during initialization of `StreamBusBuilder` and `StreamBus`.

KeyDescriptionDefaultminTTLSecMinimum message TTL in stream`86400`maxSizeMaximum messages stored per subject`1000000`exactLimitsApply exact limits [detail](https://redis.io/docs/latest/commands/xadd/#capped-streams)`false`deleteOnAckDelete message from stream on ACK`false`deletePolicyHow entries are removed when trimmed or acked with `deleteOnAck`. `KeepRef` / `DelRef` / `Acked` (Redis 8.2+)`DeleteMode::KeepRef`maxDeliveryMaximum delivery attempts per message. `0` means no limit`0`ackExplicitRequire explicit ACK from consumers`true`ackWaitMsMaximum time to process a message before it is redelivered`30 * 60 * 1000`nackDelayMsMinimum delay before a NACKed message is redelivered`0`idmpModeIdempotent publishing mode. `None` / `Auto` / `Explicit` (Redis 8.6+)`IdmpMode::None`idmpDurationSecHow long idempotency keys are retained. `0` uses the server default`0`idmpMaxSizeMaximum number of idempotency keys retained per stream. `0` uses the server default`0`maxExpiredSubjectsMaximum subjects scanned for expired messages per call. `0` means no limit`0`**Example**

```
$settings = new StreamBusSettings(
    minTTLSec: 86_400,
    maxSize: 10_000_000,
    exactLimits: false,
    deleteOnAck: false,
    maxDelivery: 10,
    ackWaitMs: 30 * 60 * 1000,
    //...
);

$builder = StreamBusBuilder::create('bus_name')
    ->withSettings($settings)
    //...
```

### Produce

[](#produce)

Class `StreamBus\Producer\StreamBusProducerInterface`

You can add messages to the bus either one by one or in batches.

**Example**

```
$serializers = [
    'users.new' => new StreamBusJsonSerializer(),
    'products.new' => new StreamBusJsonSerializer(),
];

$builder = StreamBusBuilder::create('bus_name')
    ->withClient($redisClient)
    ->withSettings(new StreamBusSettings())
    ->withSerializers($serializers);

$producer = $builder->createProducer('producer');
$producer->add('users.new', ['id' => 1, 'name' => 'David']);
$producer->add('users.new', ['id' => 2, 'name' => 'Andrew']);
$producer->add('products.new', ['id' => 1, 'product' => 'guitar']);
$producer->addMany('products.new', [
    ['id' => 1, 'product' => 'guitar']
    ['id' => 2, 'product' => 'flute']
]);
```

### Consume

[](#consume)

Class: `StreamBus\Consumer\StreamBusConsumerInterface`

There are several types of consumers for message consumption:

- `StreamBusConsumer` - no order guaranties, at least once delivery
- `StreamBusOrderedConsumer` - ordered per subject, at least once delivery
- `StreamBusOrderedStrictConsumer` - same as previous with additional consistency checks

You can specify the subjects you are interested in when creating the consumers. Consumers support blocking reads.

**Example**

```
$serializers = [
    'users.new' => new StreamBusJsonSerializer(),
    'products.new' => new StreamBusJsonSerializer(),
    'orders.new' => new StreamBusJsonSerializer(),
];

$builder = StreamBusBuilder::create('bus_name')
    ->withClient($redisClient)
    ->withSettings(new StreamBusSettings())
    ->withSerializers($serializers);

$consumer = $builder->createConsumer('all', 'consumer');
// or
$consumer = $builder->createOrderedConsumer('users', ['users.new']);
// or
$consumer = $builder->createOrderedStrictConsumer('users_and_orders', ['users.new', 'orders.new']);

// Read max 5 messages per subject, block read for 10 seconds if no messages available to read
while ($messages = $consumer->read(5, 10_000)) {
    foreach ($messages as $subject => $subjectMessages) {
        foreach ($subjectMessages as $messageId => $message) {
            printf('got message from subject: %s, with id: %s' . PHP_EOL, $subject, $messageId);
            print_r($message);
            // ack
            $consumer->ack($subject, $messageId);
            // or nack
            $consumer->nack($subject, $messageId);
            // or nack with 10 seconds redelivery delay
            $consumer->nack($subject, $messageId, 10_000);
        }
    }
}
```

### Consume with processor

[](#consume-with-processor)

Class `StreamBus\Processor\StreamBusProcessor`

You can also process messages using the processor.

**Example**

```
class User
{
    public function __construct(public int $id, public string $name) {}

    public function toArray(): array
    {
        return ['id' => $this->id, 'name' => $this->name];
    }

    public static function fromArray(array $data): self
    {
        return new self($data['id'], $data['name']);
    }
}

$builder = StreamBusBuilder::create('bus_name')
    ->withClient($redisClient)
    ->withSettings(new StreamBusSettings())
    ->withSerializers([
        'users.new' => new StreamBusJsonSerializer(
            static fn(User $user) => $user->toArray(),
            static fn(array $data) => User::fromArray($data),
        ),
    ]);

class UsersHandler
{
    public function __invoke(string $type, string $id, User $user): true
    {
        printf('Welcome, %s!' . PHP_EOL, $user->name);
        return true;
    }
}

$processor = $builder->createProcessor(
    'processor',
    'consumer',
    ['users.new' => new UsersHandler()]
)->process();
```

### Dead Letter Queue

[](#dead-letter-queue)

The bus supports working with a [DLQ](https://en.wikipedia.org/wiki/Dead_letter_queue), placing messages there after `maxDelivery` delivery attempts. An example of usage can be found in the [examples](examples/dlq.php) folder.

### Observe

[](#observe)

Class: `StreamBus\StreamBus\StreamBusInfoInterface`

Using this interface, you can obtain:

- A list of existing subjects
- A list of consumer groups for a specific subject
- The number of messages taken for processing
- The consumer group lag
- Raw information about the state of the underlying Redis streams ([details](https://redis.io/docs/latest/commands/xinfo-stream/))

**Example**

```
$info = $builder->createBusInfo();

foreach ($info->getSubjects() as $subject) {
    printf('Subject: %s' . PHP_EOL, $subject);
    printf('  Stream length: %d' . PHP_EOL, $info->getStreamLength($subject));
    foreach ($info->getGroups($subject) as $group) {
        printf('  Group: %s' . PHP_EOL, $group);
        printf('    Pending: %d' . PHP_EOL, $info->getGroupPending($group, $subject));
        printf('    Time lag: %d' . PHP_EOL, $info->getGroupPending($group, $subject));
    }
}
```

Benchmark
---------

[](#benchmark)

The project includes [benchmarks](tests/Benchmark) that you can run. It is recommended to adjust the test file configurations according to your hardware.

My 5-year-old home laptop is capable of processing around 100,000 messages per second in batch mode. On servers, I have achieved results of around 1 million messages per second in batch mode.

```
composer test:benchmark
```

Contributing
------------

[](#contributing)

This project is open source and welcomes contributions from the community. If you have ideas, improvements, or bug fixes, feel free to open an issue or submit a pull request!

###  Health Score

43

—

FairBetter than 89% of packages

Maintenance85

Actively maintained with recent releases

Popularity26

Limited adoption so far

Community7

Small or concentrated contributor base

Maturity42

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~110 days

Total

3

Last Release

129d ago

### Community

Maintainers

![](https://avatars.githubusercontent.com/u/91664363?v=4)[alschastny](/maintainers/alschastny)[@alschastny](https://github.com/alschastny)

---

Top Contributors

[![alschastny](https://avatars.githubusercontent.com/u/91664363?v=4)](https://github.com/alschastny "alschastny (7 commits)")

---

Tags

streammessageredisqueueevent busmessage bus

###  Code Quality

TestsPHPUnit

Code StylePHP CS Fixer

### Embed Badge

![Health badge](/badges/alsc-streambus/health.svg)

```
[![Health](https://phpackages.com/badges/alsc-streambus/health.svg)](https://phpackages.com/packages/alsc-streambus)
```

###  Alternatives

[symfony/cache

Provides extended PSR-6, PSR-16 (and tags) implementations

4.2k373.5M3.3k](/packages/symfony-cache)[pdffiller/qless-php

PHP Bindings for qless

29113.7k2](/packages/pdffiller-qless-php)[javibravo/simpleue

Php package to manage queue tasks in a simple way

131348.6k1](/packages/javibravo-simpleue)[spinx/sidekiq-job-php

Push and schedule jobs to Sidekiq from PHP

37236.2k](/packages/spinx-sidekiq-job-php)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
