PHPackages                             mahavirnahata/stream-bus - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Utility &amp; Helpers](/categories/utility)
4. /
5. mahavirnahata/stream-bus

ActiveLibrary[Utility &amp; Helpers](/categories/utility)

mahavirnahata/stream-bus
========================

Redis-backed cross-language stream bus for Laravel.

0.1.0(4mo ago)10MITPHPPHP ^8.2CI passing

Since Feb 8Pushed 1mo agoCompare

[ Source](https://github.com/mahavirnahata/stream-bus)[ Packagist](https://packagist.org/packages/mahavirnahata/stream-bus)[ RSS](/packages/mahavirnahata-stream-bus/feed)WikiDiscussions main Synced today

READMEChangelog (2)Dependencies (6)Versions (3)Used By (0)

Stream Bus for Laravel
======================

[](#stream-bus-for-laravel)

A Redis-backed cross-language stream bus for Laravel. Publish from any language; consume in Laravel (or vice versa) with at-least-once or effectively-once delivery.

Requirements
------------

[](#requirements)

- PHP 8.2+
- Laravel 11 or 12
- Redis 5.0+ (Redis 6.2+ for PEL recovery via XAUTOCLAIM)
- phpredis extension **or** predis/predis 2.x

Installation
------------

[](#installation)

```
composer require mahavirnahata/stream-bus
```

Publish the config file:

```
php artisan vendor:publish --tag=stream-bus-config
```

Quick start
-----------

[](#quick-start)

**Publish a message:**

```
use MahavirNahata\StreamBus\Facades\StreamBus;

StreamBus::publish('events:outbound', [
    'type' => 'image.process',
    'payload' => ['id' => 123],
]);
```

**Create a handler:**

```
namespace App\Handlers;

use MahavirNahata\StreamBus\Contracts\StreamBusHandler;

class ImageResultHandler implements StreamBusHandler
{
    public function handle(array $message): void
    {
        // $message['payload'] contains your data
        dispatch(new ProcessImageResult($message['payload'] ?? []));
    }
}
```

**Start consuming:**

```
php artisan stream-bus:consume events:inbound App\Handlers\ImageResultHandler --group=laravel
```

---

Drivers
-------

[](#drivers)

Streams (`streams`)Lists (`lists`)Redis primitiveXADD / XREADGROUPRPUSH / BLPOPOrderingFIFO, guaranteedFIFO, guaranteedConsumer groupsYesNoAt-least-onceYes (PEL + ACK)Yes (pop is destructive)Multi-consumerYes, independentYes, competingPEL recoveryYes (XAUTOCLAIM)N/ACross-languageAny client that speaks Redis StreamsAny client that speaks Redis Lists**Use `streams`** (default) when you need consumer groups, per-message ACK, and PEL recovery after crashes.

**Use `lists`** for simpler competing-consumer queues where Redis Streams features are not required.

---

Configuration
-------------

[](#configuration)

```
// config/stream-bus.php
return [
    'driver'           => env('STREAM_BUS_DRIVER', 'streams'),
    'connection'       => env('STREAM_BUS_REDIS', 'default'),
    'prefix'           => env('STREAM_BUS_PREFIX', 'stream-bus:'),
    'cluster'          => env('STREAM_BUS_CLUSTER', false),
    'delivery'         => env('STREAM_BUS_DELIVERY', 'at-least-once'),
    'dedupe_ttl'       => env('STREAM_BUS_DEDUPE_TTL', 86400),
    'maxlen'           => env('STREAM_BUS_MAXLEN', null),
    'reclaim'          => env('STREAM_BUS_RECLAIM', false),
    'min_idle_time'    => env('STREAM_BUS_MIN_IDLE_TIME', 60000),
    'reclaim_count'    => env('STREAM_BUS_RECLAIM_COUNT', 10),
    'max_attempts'     => env('STREAM_BUS_MAX_ATTEMPTS', 0),
    'dead_letter_topic'=> env('STREAM_BUS_DEAD_LETTER_TOPIC', null),
    'consumers'        => [],
];
```

### Config reference

[](#config-reference)

KeyDefaultDescription`driver``streams``streams` or `lists``connection``default`Laravel Redis connection name`prefix``stream-bus:`Key prefix for all topics`cluster``false`Wrap topic in `{}` hash tags for Redis Cluster slot colocation`delivery``at-least-once``at-least-once` or `effectively-once``dedupe_ttl``86400`Dedup / attempt key TTL in seconds`maxlen``null`Max stream entries; `null` = unlimited. **Set this in production.**`reclaim``false`Enable automatic PEL recovery (Redis 6.2+)`min_idle_time``60000`ms a PEL message must be idle before reclaim`reclaim_count``10`Max messages to reclaim per loop`max_attempts``0`Max handler attempts before dead-lettering (`0` = unlimited)`dead_letter_topic``null`DLQ topic; defaults to `{original}:dead-letter``consumers``[]`Topic → handler map (see below)### Consumers config

[](#consumers-config)

```
'consumers' => [
    // Simple form
    'events:inbound' => App\Handlers\ImageResultHandler::class,

    // Per-topic overrides
    'events:orders' => [
        'handler'           => App\Handlers\OrderHandler::class,
        'driver'            => 'streams',
        'group'             => 'order-workers',
        'count'             => 5,
        'delivery'          => 'effectively-once',
        'dead_letter_topic' => 'events:orders:dead-letter',
    ],
],
```

---

Publishing messages
-------------------

[](#publishing-messages)

```
// Facade
use MahavirNahata\StreamBus\Facades\StreamBus;
StreamBus::publish('events:outbound', ['foo' => 'bar']);

// Helper function
stream_bus()->publish('events:outbound', ['foo' => 'bar']);

// Injected class
use MahavirNahata\StreamBus\StreamBus;
public function __construct(private StreamBus $bus) {}
$this->bus->publish('events:outbound', ['foo' => 'bar']);
```

Per-call option overrides:

```
StreamBus::publish('events:outbound', $payload, [
    'driver'     => 'lists',
    'connection' => 'redis-secondary',
    'prefix'     => 'app1:bus:',
]);
```

---

Consumer command
----------------

[](#consumer-command)

```
php artisan stream-bus:consume [topic] [handler] [options]
```

### Arguments

[](#arguments)

ArgumentDescription`topic`Topic to consume (optional if `consumers` is configured)`handler`Handler class name (required when `topic` is given)### Options

[](#options)

OptionDefaultDescription`--driver`config`streams` or `lists``--connection`configRedis connection name`--prefix`configKey prefix`--group``default`Consumer group (streams)`--consumer`hostnameConsumer name (streams)`--count``1`Messages per read (streams)`--block``2000`Block time: ms for streams, seconds for lists`--delivery`config`at-least-once` or `effectively-once``--dedupe-ttl`configDedup key TTL in seconds`--once`—Read once and exit`--sleep``200`Sleep ms between polls when idle`--no-ack`—Skip ACK (streams)`--stop-on-error`—Exit if the handler throws`--max-attempts`configMax attempts before dead-lettering (`0` = unlimited)`--dead-letter-topic`configOverride DLQ topic name`--memory``128`Exit when process memory exceeds this MB limit`--reclaim`configEnable PEL reclaim each loop (Redis 6.2+)`--min-idle-time``60000`ms before a PEL message is eligible for reclaim---

Delivery semantics
------------------

[](#delivery-semantics)

### at-least-once (default)

[](#at-least-once-default)

Messages are processed at least once. Duplicate delivery is possible after a crash. Use idempotent handlers.

### effectively-once

[](#effectively-once)

A Redis `SET NX EX` key is written on first processing. Duplicates within `dedupe_ttl` seconds are skipped.

```
php artisan stream-bus:consume events:inbound App\Handlers\Foo \
    --delivery=effectively-once \
    --dedupe-ttl=3600
```

---

Dead-letter queue
-----------------

[](#dead-letter-queue)

When `max_attempts` is set and a handler fails that many times, the message is published to the dead-letter topic and ACKed (streams) or discarded (lists).

```
STREAM_BUS_MAX_ATTEMPTS=3
STREAM_BUS_DEAD_LETTER_TOPIC=events:dead
```

Or per command:

```
php artisan stream-bus:consume events:inbound App\Handlers\Foo \
    --max-attempts=3 \
    --dead-letter-topic=events:dead
```

Consume dead-letter messages like any other topic:

```
php artisan stream-bus:consume events:dead App\Handlers\DeadLetterInspector
```

---

PEL recovery (streams only)
---------------------------

[](#pel-recovery-streams-only)

When a consumer crashes while processing, its messages sit unacknowledged in Redis's Pending Entry List (PEL) forever without intervention. Enable automatic reclaim to re-queue them:

```
STREAM_BUS_RECLAIM=true
STREAM_BUS_MIN_IDLE_TIME=60000   # 60 seconds idle before reclaiming
STREAM_BUS_RECLAIM_COUNT=10      # max reclaimed per loop
```

Or via CLI:

```
php artisan stream-bus:consume --reclaim --min-idle-time=60000
```

**Requires Redis 6.2+** (uses XAUTOCLAIM internally).

---

Redis Cluster
-------------

[](#redis-cluster)

All related keys (stream, dedupe, attempts) must land on the same hash slot. Enable hash-tag wrapping:

```
STREAM_BUS_CLUSTER=true
```

With `cluster=true`, the topic `events:outbound` produces keys like `stream-bus:{events:outbound}`, `stream-bus:{events:outbound}:dedupe:{id}`, etc. — all guaranteed to the same slot.

---

Stream length management
------------------------

[](#stream-length-management)

Redis Streams grow indefinitely without trimming. Set a limit in production:

```
STREAM_BUS_MAXLEN=100000
```

Trimming uses the `~` approximate modifier (O(1)). Requires phpredis; on predis the package falls back to a separate XTRIM call (logged as a warning if that also fails).

---

Metrics
-------

[](#metrics)

Retrieve live stream / queue health metrics:

```
$metrics = stream_bus()->metrics('events:outbound', 'default');

// streams result:
// [
//   'driver'  => 'streams',
//   'topic'   => 'events:outbound',
//   'key'     => 'stream-bus:events:outbound',
//   'group'   => 'default',
//   'length'  => 1500,   // total entries in stream
//   'pending' => 3,      // delivered but not yet ACKed
// ]

// lists result:
// [
//   'driver' => 'lists',
//   'topic'  => 'events:outbound',
//   'key'    => 'stream-bus:events:outbound',
//   'length' => 42,
// ]
```

A value of `-1` indicates the Redis command is unavailable for the current client version.

---

Production deployment
---------------------

[](#production-deployment)

### Supervisor

[](#supervisor)

```
[program:stream-bus-inbound]
command=php /var/www/artisan stream-bus:consume events:inbound App\Handlers\ImageResultHandler
    --group=laravel
    --memory=128
    --reclaim
    --max-attempts=3
autostart=true
autorestart=true
stopwaitsecs=10
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/stream-bus-inbound.log
```

### Graceful shutdown

[](#graceful-shutdown)

The consume command installs POSIX signal handlers (SIGTERM, SIGINT, SIGHUP) when the `pcntl` extension is available. On signal receipt, the current batch completes and the process exits cleanly. The default `--block=2000` means the worst-case shutdown delay is ~2 seconds.

Reduce `--block` further if you need faster shutdown response (e.g. for Kubernetes with a short grace period).

### Memory management

[](#memory-management)

The `--memory=128` flag exits the process when RSS exceeds the limit, allowing Supervisor to restart it. This prevents slow memory leaks from accumulating indefinitely.

### Shared Redis

[](#shared-redis)

If you share Redis with other workloads, isolate the bus with a unique prefix or a dedicated Redis connection:

```
STREAM_BUS_PREFIX=myapp:bus:
STREAM_BUS_REDIS=stream-bus-connection
```

---

Cross-language interoperability
-------------------------------

[](#cross-language-interoperability)

The wire format for every message is:

```
{
  "id": "",
  "ts": 1716000000,
  "payload": { "your": "data" }
}
```

See `examples/` for producers and consumers in Node.js, Python, and Go.

### Go (streams)

[](#go-streams)

```
examples/go-producer/main.go   — publish via XADD
examples/go-consumer/main.go   — consume via XREADGROUP / XACK

```

Run each independently:

```
cd examples/go-producer && go mod init producer && go get github.com/redis/go-redis/v9 github.com/google/uuid && go run main.go
cd examples/go-consumer && go mod init consumer && go get github.com/redis/go-redis/v9 && go run main.go
```

### Node.js (streams)

[](#nodejs-streams)

```
examples/node-producer.js   — publish via xAdd
examples/node-consumer.js   — consume via xReadGroup / xAck

```

```
npm install redis
node examples/node-producer.js
node examples/node-consumer.js
```

### Python (streams / lists)

[](#python-streams--lists)

```
examples/python-producer.py   — publish via xadd
examples/python-consumer.py   — consume via xreadgroup / xack

```

```
pip install redis
python examples/python-producer.py
python examples/python-consumer.py
```

---

Troubleshooting
---------------

[](#troubleshooting)

**Consumer exits immediately with "Handler class not found"**
Ensure the handler class exists, is autoloaded, and implements `StreamBusHandler`.

**No messages received**
Verify topic name and prefix match between publisher and consumer. For streams, confirm the group name matches.

**Duplicate messages**
Expected with `at-least-once`. Switch to `effectively-once` with idempotent handlers, or use `max_attempts=1` with a dead-letter queue.

**Stream grows unbounded**
Set `STREAM_BUS_MAXLEN` in your `.env`.

**Shutdown takes too long**
Reduce `--block` (e.g. `--block=500`). The maximum shutdown delay equals the block time.

**PEL grows after consumer crashes**
Enable `--reclaim` or set `STREAM_BUS_RECLAIM=true`. Requires Redis 6.2+.

---

FAQ
---

[](#faq)

**Does it scan all Redis keys?**
No — it reads only the configured topic key with your prefix.

**Can I run multiple consumers for the same topic?**
Yes. With the streams driver, consumers in the same group share the load. With lists, multiple consumers compete naturally.

**Exactly-once delivery?**
Not guaranteed. Use `effectively-once` with idempotent handlers for the best-effort equivalent.

**Can I use this without Laravel?**
The core `StreamBus` class only depends on `Illuminate\Contracts\Redis\Factory`. You can wire it up manually in any container.

---

License
-------

[](#license)

MIT

###  Health Score

34

—

LowBetter than 75% of packages

Maintenance83

Actively maintained with recent releases

Popularity2

Limited adoption so far

Community6

Small or concentrated contributor base

Maturity38

Early-stage or recently created project

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~1 days

Total

2

Last Release

144d ago

### Community

Maintainers

![](https://avatars.githubusercontent.com/u/6626053?v=4)[mahavirnahata](/maintainers/mahavirnahata)[@mahavirnahata](https://github.com/mahavirnahata)

---

Top Contributors

[![mahavirnahata](https://avatars.githubusercontent.com/u/6626053?v=4)](https://github.com/mahavirnahata "mahavirnahata (12 commits)")

###  Code Quality

TestsPHPUnit

### Embed Badge

![Health badge](/badges/mahavirnahata-stream-bus/health.svg)

```
[![Health](https://phpackages.com/badges/mahavirnahata-stream-bus/health.svg)](https://phpackages.com/packages/mahavirnahata-stream-bus)
```

###  Alternatives

[illuminate/queue

The Illuminate Queue package.

21332.6M1.6k](/packages/illuminate-queue)[laravel/horizon

Dashboard and code-driven configuration for Laravel queues.

4.2k95.4M307](/packages/laravel-horizon)[laravel/ai

The official AI SDK for Laravel.

1.0k3.2M201](/packages/laravel-ai)[laravel/sail

Docker files for running a basic Laravel application.

1.9k205.7M1.3k](/packages/laravel-sail)[laravel/pulse

Laravel Pulse is a real-time application performance monitoring tool and dashboard for your Laravel application.

1.7k15.1M132](/packages/laravel-pulse)[tallstackui/tallstackui

TallStackUI is a powerful suite of Blade components that elevate your workflow of Livewire applications.

725173.2k14](/packages/tallstackui-tallstackui)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
