PHPackages                             event-stream/laravel - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Caching](/categories/caching)
4. /
5. event-stream/laravel

ActiveLibrary[Caching](/categories/caching)

event-stream/laravel
====================

Subscribable, resumable server-sent event streams over Redis for Laravel, with an optional synced state-graph layer.

v1.0.0(today)01↑2900%MITPHPPHP ^8.2

Since Jun 10Pushed todayCompare

[ Source](https://github.com/patrickjames242/event-stream-laravel)[ Packagist](https://packagist.org/packages/event-stream/laravel)[ RSS](/packages/event-stream-laravel/feed)WikiDiscussions main Synced today

READMEChangelogDependencies (5)Versions (2)Used By (0)

event-stream/laravel
====================

[](#event-streamlaravel)

Subscribable, resumable **server-sent event (SSE) streams over Redis** for Laravel.

Define a stream once on the server, publish typed messages to it from anywhere in your application, and let any number of browser clients subscribe to it over a single SSE connection. The server tracks a per-stream resume cursor, so a client that drops and reconnects picks up exactly where it left off — no duplicated and no missed messages. An optional **state-stream** layer keeps a JSON object graph automatically synced from the server to every client.

This is the **backend** package. The companion frontend package is [`event-stream-react`](https://www.npmjs.com/package/event-stream-react). The two communicate over a small, versioned wire protocol and should be kept on matching major versions.

- **Multi-stream multiplexing** — one browser `EventSource` can subscribe to many streams at once; each is tracked and resumed independently.
- **Resumable** — every message carries a cursor; reconnects resume from the last delivered message using the standard `Last-Event-ID` mechanism.
- **Authorization per stream** — each stream decides who may subscribe.
- **Interceptors** — transform or suppress messages on their way to the client.
- **Preludes** — hand a freshly connected client a snapshot before live messages begin.
- **State streams** — mirror a server-side JSON graph to clients via granular `set` / `delete` / `reset` mutations instead of re-fetching.

---

Requirements
------------

[](#requirements)

- PHP 8.2+
- Laravel 10, 11, or 12
- A Redis connection configured in `config/database.php` (the streams are backed by [Redis Streams](https://redis.io/docs/latest/develop/data-types/streams/))
- A server setup that allows long-lived PHP responses (see [Deployment notes](#deployment-notes))

Installation
------------

[](#installation)

```
composer require event-stream/laravel
```

The service provider is auto-discovered. Publish the config file:

```
php artisan vendor:publish --tag=event-stream-config
```

This writes `config/event-stream.php`.

---

Quick start
-----------

[](#quick-start)

> This quick-start section is shared between the backend and frontend packages so both halves of the picture are in one place.

### 1. Backend — define a stream

[](#1-backend--define-a-stream)

A stream is a class. It declares a **pattern** for its key, decides **who can subscribe**, and exposes typed methods that **publish** messages.

```
// app/Streams/JobProgressStream.php
namespace App\Streams;

use EventStream\EventStream;
use Illuminate\Http\Request;

class JobProgressStream extends EventStream
{
    // {jobId} is a placeholder; a key like "job.progress:abc-123"
    // makes $this->args->jobId === "abc-123".
    const streamPattern = 'job.progress:{jobId}';

    public function authorize(Request $request): bool
    {
        // Return true to allow the subscription, false for a 403.
        return $request->user() !== null;
    }

    // A typed publish method. `send()` returns the new Redis message id.
    public function sendProgress(int $percent): string
    {
        return $this->send('progress', ['percent' => $percent]);
    }
}
```

### 2. Backend — register the stream

[](#2-backend--register-the-stream)

Add the class to `config/event-stream.php`:

```
'streams' => [
    App\Streams\JobProgressStream::class,
],
```

### 3. Backend — expose the SSE route

[](#3-backend--expose-the-sse-route)

Mount the bundled controller wherever you like, behind whatever middleware your app needs (the package intentionally does **not** register a route for you):

```
// routes/web.php
use EventStream\Http\StreamSubscriptionController;

Route::get('/streams', [StreamSubscriptionController::class, 'stream'])
    ->middleware(['web', 'auth'])
    ->name('streams.subscribe');
```

### 4. Backend — publish messages

[](#4-backend--publish-messages)

From a job, controller, event listener — anywhere:

```
use App\Streams\JobProgressStream;

JobProgressStream::make('abc-123')->sendProgress(50);
```

### 5. Frontend — wrap your app once

[](#5-frontend--wrap-your-app-once)

```
import { EventStreamProvider } from 'event-stream-react';

function Root() {
  return (
    // `url` must match the route you mounted the controller on.

  );
}
```

### 6. Frontend — subscribe

[](#6-frontend--subscribe)

```
import { useState } from 'react';
import { useSubscribeToStreams } from 'event-stream-react';

function JobProgress({ jobId }: { jobId: string }) {
  const [percent, setPercent] = useState(0);

  useSubscribeToStreams(
    `job.progress:${jobId}`,
    ({ message }) => {
      if (message.type === 'progress') {
        setPercent((message.data as { percent: number }).percent);
      }
    }
  );

  return {percent}% complete;
}
```

That is the whole round trip: publish on the server, receive in the browser, with resumable delivery handled for you.

### State streams (synced graph)

[](#state-streams-synced-graph)

For a server-owned object graph you want mirrored to clients without writing per-field event handling, use a **state stream**.

```
// Backend
namespace App\Streams;

use EventStream\StateStream;
use Illuminate\Http\Request;

class DocumentStateStream extends StateStream
{
    const streamPattern = 'document:{documentId}';

    public function authorize(Request $request): bool
    {
        return $request->user() !== null;
    }
}

// Mutate the graph anywhere — each call publishes a granular mutation:
$doc = DocumentStateStream::make('abc-123');
$doc->setState('title', 'Quarterly report');
$doc->setState('author.name', 'Ada');
$doc->arrayAppend('comments', ['body' => 'Looks good']);
```

```
// Frontend — `state` stays current automatically.
import { observer } from 'mobx-react-lite';
import { useStreamStateObject } from 'event-stream-react';

const Document = observer(({ id }: { id: string }) => {
  const doc = useStreamStateObject(`document:${id}`);
  return {String(doc.state.title ?? '')};
});
```

---

Backend reference
=================

[](#backend-reference)

The rest of this document covers the backend in depth. For the frontend API, see the [`event-stream-react`](https://www.npmjs.com/package/event-stream-react)README.

How it works
------------

[](#how-it-works)

1. A browser opens one `EventSource` to your SSE route, naming one or more streams via `?streams[]=` query parameters.
2. The controller resolves each key to a registered `EventStream` subclass, calls `authorize()` on each, and opens a single batched, blocking read across all of them against Redis.
3. As messages arrive they are run through each stream's interceptors and written to the client as SSE frames. Every live frame carries a composite `id:` (the `Last-Event-ID`) encoding a resume cursor per stream.
4. If the connection drops, the browser automatically reconnects and replays the last `Last-Event-ID`. The controller decodes it and resumes each stream from its own cursor.

Publishing is just an `XADD` to a Redis stream; subscribing is an `XREAD`. There is no broker process to run — Redis is the entire transport.

Defining streams
----------------

[](#defining-streams)

### The stream pattern

[](#the-stream-pattern)

Every subclass declares a `streamPattern` constant. Only `{` and `}` are special; everything else is a literal that must match exactly.

```
const streamPattern = 'job.progress:{jobId}';          // one placeholder
const streamPattern = 'report:{reportId}:{format}';    // two placeholders
const streamPattern = 'system.notifications';          // no placeholders
```

When a client subscribes to `job.progress:abc-123`, the registry finds the class whose pattern matches and instantiates it with that key. Extracted placeholder values are available as `$this->args`:

```
$this->args->jobId; // "abc-123"
```

Construct instances with `make()`, passing placeholder values left to right:

```
JobProgressStream::make('abc-123');           // job.progress:abc-123
ReportStream::make('rpt-99', 'pdf');          // report:rpt-99:pdf
SystemNotificationsStream::make();            // system.notifications
```

### Authorization

[](#authorization)

`authorize()` is **required** and runs once per subscription, with `$this->args`already populated. Return `false` to reject with a 403.

```
public function authorize(Request $request): bool
{
    return Job::query()
        ->where('id', $this->args->jobId)
        ->where('user_id', $request->user()?->id)
        ->exists();
}
```

### Publishing

[](#publishing)

Wrap `send()` in typed methods so call sites read clearly. The first argument is the message **type** (an arbitrary string your frontend switches on); the second is the JSON-serializable **data** payload. `send()` returns the Redis message id.

```
public function sendCompleted(array $summary): string
{
    return $this->send('completed', ['summary' => $summary]);
}
```

The published wire message is `{ type, data }`; the client receives it as `message.type` and `message.data` (see [Wire protocol](#wire-protocol)).

### Choosing where to start: `initialCursor()`

[](#choosing-where-to-start-initialcursor)

By default a new subscription only receives messages published **after** it connects (`'$'`). Override `initialCursor()` to replay history on first connect:

```
public function initialCursor(): string
{
    return '0-0'; // replay from the beginning of the Redis stream
}
```

A reconnect with a `Last-Event-ID` always takes precedence over this — so `initialCursor()` only governs the very first connection.

### Preludes: a snapshot on connect

[](#preludes-a-snapshot-on-connect)

Return a payload from `preludeMessage()` to hand every freshly connected client a one-off message **before** live messages begin. A prelude is server-synthesized (not read from Redis) and never advances the resume cursor.

```
public function preludeMessage(): ?array
{
    return ['snapshot' => $this->currentSnapshot()];
}
```

On the client this arrives with `meta.isPrelude === true`.

### Interceptors

[](#interceptors)

Interceptors transform or suppress messages on their way to the client. Register them in `registerInterceptors()`:

```
use EventStream\Messages\DeliveredStreamMessage;

protected function registerInterceptors(): void
{
    // Runs on every message: enrich the payload.
    $this->registerInterceptor(function (DeliveredStreamMessage $message) {
        return $message->withData([
            ...$message->content->data,
            'serverTime' => now()->toIso8601String(),
        ]);
    });

    // Runs only on a specific type: hydrate, or suppress by returning null.
    $this->registerInterceptor('progress', function (DeliveredStreamMessage $message) {
        return $this->shouldHide($message) ? null : $message;
    });
}
```

Return a (possibly modified) `DeliveredStreamMessage` to forward it, or `null`to drop it. Use `withData()` / `withType()` to produce modified copies. Interceptors run in registration order, each receiving the previous one's output. A suppressed message still advances the resume cursor, so it is never re-read.

### Custom Redis key: `redisStreamName()`

[](#custom-redis-key-redisstreamname)

The Redis stream defaults to the subscription key. Override it if the underlying key needs to differ:

```
public function redisStreamName(): string
{
    return 'prod:' . $this->streamKey;
}
```

State streams
-------------

[](#state-streams)

`StateStream` extends `EventStream` to keep a JSON object graph synced to clients. The graph is stored flattened into a single Redis hash (one field per leaf, keyed by dot-notation path); every mutation also publishes a granular event describing exactly what changed, so the client patches its local copy instead of re-fetching.

### Mutating the graph

[](#mutating-the-graph)

```
$s = DocumentStateStream::make('abc-123');

$s->resetState(['title' => 'Draft', 'tags' => ['a', 'b']]); // replace whole graph
$s->setState('title', 'Final');                             // set one path
$s->setState(['title' => 'Final', 'author.name' => 'Ada']); // set many at once
$s->arrayAppend('comments', ['body' => 'Nice']);            // append to an array
$s->getState('tags');                                       // read back: ['a', 'b']
$s->getState();                                             // whole graph
$s->deleteState('title');                                   // remove a path
$s->deleteState();                                          // wipe the graph
```

### Paths

[](#paths)

Paths are dot-strings (`'author.name'`) or segment arrays (`['author', 'name']`). Use the array form when a segment legitimately contains a dot; internally dots inside a segment are escaped so they round-trip. Setting a path first clears anything stored beneath it, so replacing a subtree never leaves stale leaves.

### Container types: `jsonPathTypes()`

[](#container-types-jsonpathtypes)

Because the graph is stored flat, an *empty* node has no inherent array-vs-object type. By default a node is rendered as an array only when its keys form a clean `0..n` list. Override `jsonPathTypes()` to pin specific paths, using `*` as a single-segment wildcard:

```
protected function jsonPathTypes(): array
{
    return [
        'comments'   => 'array',  // always an array, even when empty
        'author'     => 'object',
        'sections.*' => 'array',
    ];
}
```

The first matching pattern (in declaration order) wins. This map is shipped to the client in the prelude so it materializes empty containers with the same type.

### Default state and TTL

[](#default-state-and-ttl)

```
// Shown by getState() (and to clients) when nothing is stored yet.
protected function defaultState(): array
{
    return ['status' => 'pending'];
}

// Sliding-window lifetime of the state hash; refreshed on every mutation.
// Return null to keep state forever. Default: 3600 seconds.
protected function stateTtlSeconds(): ?int
{
    return 3600;
}
```

The state hash key defaults to `redisStreamName() . ':state'`; override `stateHashKey()` to change it.

Consuming a stream from the server
----------------------------------

[](#consuming-a-stream-from-the-server)

Sometimes the server itself needs to read a stream (e.g. a queued job that waits for a particular message). Call `connect()`:

```
$connection = JobProgressStream::make('abc-123')->connect();

while (true) {
    $message = $connection->next(blockMs: 1000);
    if ($message === null) {
        continue; // timed out this interval — heartbeat point
    }
    // handle $message (a DeliveredStreamMessage)
}
```

Or block until a condition is met, with a total time budget:

```
$message = $connection->waitUntil(
    fn ($message) => $message->content->type === 'completed',
    deadlineMs: 30_000,
);
// $message is null if the deadline passed without a match.
```

Configuration
-------------

[](#configuration)

`config/event-stream.php`:

```
return [
    // EventStream subclasses the endpoint may resolve. Patterns must be unique.
    'streams' => [
        App\Streams\JobProgressStream::class,
    ],

    // ms the server blocks per read before sending a heartbeat comment.
    'heartbeat_interval_ms' => 10_000,

    // Max seconds a connection stays open before asking the browser to
    // reconnect (it reconnects automatically). Default: 55 minutes.
    'max_connection_seconds' => 55 * 60,
];
```

You may also pass an explicit list of stream classes to `new EventStreamRegistry([...])` directly — handy in tests.

Wire protocol
-------------

[](#wire-protocol)

The backend and `event-stream-react` share this contract. **Keep the two packages on matching major versions.**

Each SSE `message` frame's `data:` is JSON of the shape:

```
{
  "meta": {
    "streamKey": "job.progress:abc-123", // which stream this belongs to
    "messageId": "1713000000000-0",      // Redis id, or null for a prelude
    "isPrelude": false                   // true for the on-connect snapshot
  },
  "message": {
    "type": "progress",                  // your message type
    "data": { "percent": 50 }            // your payload
  }
}
```

- The `Last-Event-ID` (and the `lastEventId` query fallback) is a composite cursor: `streamKey1=redisId1;streamKey2=redisId2`.
- Preludes carry `messageId: null` and never advance the cursor.
- State-stream mutations use reserved types: `state_stream_mutation_reset`, `state_stream_mutation_set`, `state_stream_mutation_delete`.

Deployment notes
----------------

[](#deployment-notes)

An SSE response is a long-lived HTTP request. Make sure your stack allows it:

- **Workers** — each open connection holds one PHP worker (php-fpm/Octane) for its lifetime. Size your worker pool for the number of concurrent subscribers.
- **Buffering** — the controller sends `X-Accel-Buffering: no` and disables output buffering. Ensure no proxy (nginx, load balancer) re-enables response buffering on this route.
- **Timeouts** — raise proxy read timeouts above `max_connection_seconds`, or rely on the built-in periodic reconnect.
- **Redis** — the reader uses blocking `XREAD`; ensure your Redis client/proxy read timeout exceeds `heartbeat_interval_ms`.

Publishing new versions
-----------------------

[](#publishing-new-versions)

See [PUBLISHING.md](PUBLISHING.md).

License
-------

[](#license)

[MIT](LICENSE)

###  Health Score

40

—

FairBetter than 86% of packages

Maintenance100

Actively maintained with recent releases

Popularity2

Limited adoption so far

Community6

Small or concentrated contributor base

Maturity45

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Unknown

Total

1

Last Release

0d ago

### Community

Maintainers

![](https://avatars.githubusercontent.com/u/34404152?v=4)[Patrick Hanna](/maintainers/patrickjames242)[@patrickjames242](https://github.com/patrickjames242)

---

Top Contributors

[![PyGuy2](https://avatars.githubusercontent.com/u/11974189?v=4)](https://github.com/PyGuy2 "PyGuy2 (1 commits)")

---

Tags

laravelstreamingreal-timeredissseserver sent eventsevent-streamredis-streams

### Embed Badge

![Health badge](/badges/event-stream-laravel/health.svg)

```
[![Health](https://phpackages.com/badges/event-stream-laravel/health.svg)](https://phpackages.com/packages/event-stream-laravel)
```

###  Alternatives

[spatie/laravel-responsecache

Speed up a Laravel application by caching the entire response

2.8k8.7M64](/packages/spatie-laravel-responsecache)[laravel/reverb

Laravel Reverb provides a real-time WebSocket communication backend for Laravel applications.

1.6k12.9M69](/packages/laravel-reverb)[psalm/plugin-laravel

Psalm plugin for Laravel

3325.1M337](/packages/psalm-plugin-laravel)[moonshine/moonshine

Laravel administration panel

1.3k239.9k73](/packages/moonshine-moonshine)[illuminate/routing

The Illuminate Routing package.

1239.0M2.8k](/packages/illuminate-routing)[spatie/laravel-export

Create a static site bundle from a Laravel app

671139.5k6](/packages/spatie-laravel-export)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
