PHPackages                             flyokai/amp-channel-dispatcher - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Utility &amp; Helpers](/categories/utility)
4. /
5. flyokai/amp-channel-dispatcher

ActiveLibrary[Utility &amp; Helpers](/categories/utility)

flyokai/amp-channel-dispatcher
==============================

Dispatcher for amphp channel

0.1.1(4w ago)07↓50%MITPHPPHP ^8.2

Since Apr 25Pushed yesterdayCompare

[ Source](https://github.com/flyokai/amp-channel-dispatcher)[ Packagist](https://packagist.org/packages/flyokai/amp-channel-dispatcher)[ RSS](/packages/flyokai-amp-channel-dispatcher/feed)WikiDiscussions main Synced 1w ago

READMEChangelogDependencies (3)Versions (8)Used By (0)

flyokai/amp-channel-dispatcher
==============================

[](#flyokaiamp-channel-dispatcher)

> User docs → [`README.md`](README.md) · Agent quick-ref → [`CLAUDE.md`](CLAUDE.md) · Agent deep dive → [`AGENTS.md`](AGENTS.md)

> Async bidirectional message dispatching over AMPHP channels — request/response correlation, middleware pipeline, remote iterators, and configurable error handling.

This is the messaging plumbing that lets two PHP processes (or threads) talk to each other over an AMPHP `Channel`. You get request/response semantics with automatic id correlation, a middleware stack for cross-cutting concerns, and **remote iterators** for streaming large result sets without buffering.

Features
--------

[](#features)

- **Dual-loop dispatcher** — read &amp; write loops share one channel
- **Request types** — `Request` (expects `Response`) and `MeekRequest` (fire-and-forget)
- **Middleware pipeline** — chain-of-responsibility via `stackMiddleware()`
- **Remote iterators** — stream results across the channel without full buffering
- **Pluggable error handling** — `ErrorResponse` (recoverable) vs `FatalErrorResponse` (terminates dispatcher)
- **Context injection** — handlers get a `Context` with reference back to the dispatcher

Installation
------------

[](#installation)

```
composer require flyokai/amp-channel-dispatcher
```

Quick start
-----------

[](#quick-start)

```
use Amp\ByteStream\StreamChannel;
use Flyokai\AmpChannelDispatcher\{Dispatcher, RequestHandler, Request, Response};
use Flyokai\AmpChannelDispatcher\DefaultDispatcherChannel;
use Flyokai\AmpChannelDispatcher\Error\DefaultErrorHandler;
use function Flyokai\AmpChannelDispatcher\stackMiddleware;

final class PingHandler implements RequestHandler
{
    public function handleRequest(Request $request): Response
    {
        return new Response\SuccessResponse(requestId: $request->id());
    }
}

$channel = /* an Amp\Sync\Channel */;
$dispatcher = new Dispatcher(
    new DefaultDispatcherChannel($channel),
    stackMiddleware(new PingHandler() /*, $middleware1, $middleware2*/),
    new DefaultErrorHandler(),
);
$dispatcher->run();

$response = $dispatcher->sendRequest(new Request\Ping())->await();
```

Concepts
--------

[](#concepts)

### Messages

[](#messages)

Every message implements `Message`:

- `id(): int` — auto-generated unique ID
- `getAttribute(string $name)` / `setAttribute(string $name, mixed $value)`
- `cloneWith(...$args): static`

`Request` and `Response` both extend `Message`. A `Response` carries `requestId(): ?int` for correlation.

### Built-in responses

[](#built-in-responses)

ClassMeaning`Response\SuccessResponse`Generic success`Response\ErrorResponse`Recoverable error (message + code)`Response\AcceptedResponse`Acknowledgement`Response\FatalErrorResponse`**Terminates the dispatcher** (extends ErrorResponse, `requestId = null`)`Response\IteratorContinue`Remote-iterator pagination payload### Dispatcher lifecycle

[](#dispatcher-lifecycle)

```
$dispatcher->run();             // starts read & write loops via EventLoop::defer()
$future = $dispatcher->sendRequest(new MyRequest(...));   // returns null for MeekRequest
$response = $future->await();
$dispatcher->onStop(fn() => /* cleanup */);
$dispatcher->stop();            // graceful shutdown
```

The dispatcher attaches a `Context` attribute to every incoming request with:

- `dispatcher()` — `?WeakReference` to the dispatcher
- `sendRequest()` — for nested calls
- iterator-storage handles (`addLocalIterator()`, …)

### Middleware

[](#middleware)

```
final class LoggingMiddleware implements Middleware
{
    public function handleRequest(Request $request, RequestHandler $next): Response
    {
        // … pre …
        $response = $next->handleRequest($request);
        // … post …
        return $response;
    }
}

$pipeline = stackMiddleware($handler, $logging, $auth);
// composes: $logging → $auth → $handler
```

### Remote iterators

[](#remote-iterators)

When a handler returns a `ConcurrentIterator`, the consumer side gets a `RemoteIterator` proxy. Each `continue()` is a blocking RPC round-trip:

```
// Server side:
$context->addLocalIterator($iterator);

// Client side:
$remote = new RemoteIterator(/* … */);
foreach ($remote as $value) { /* … */ }
```

API
---

[](#api)

ClassPurpose`Dispatcher`Central read/write hub`DefaultDispatcherChannel`Channel adapter`RequestHandler` (interface)`handleRequest(Request): Response``Middleware` (interface)`handleRequest(Request, RequestHandler $next): Response``stackMiddleware(...)`Helper to compose handler + middlewares`Error\ErrorHandler` (interface)`handleError`, `handleException``Error\DefaultErrorHandler`Recoverable vs fatal heuristic`RemoteIterator`, `IteratorStorage`Remote iteration`Context`Per-request context (dispatcher ref, sendRequest, iterator storage)Gotchas
-------

[](#gotchas)

- **WeakReference inside `Context`** — if the dispatcher is GC'd, `context->dispatcher()` returns null. Handlers must check.
- **RemoteIterator round-trips** — every `continue()` blocks until the remote responds. There is no batching; very large iterators are slow.
- **`FiberLocal` in `RemoteIterator`** — state is per-fiber. Calling `continue()` from a different fiber raises an error.
- **MeekRequest** — `sendRequest()` returns `null`. There is no delivery confirmation.
- **`stop()` cancels pending futures** — every unresolved request errors with `DispatcherException('Dispatcher terminated')`.
- **`FatalErrorResponse` halts the dispatcher** — a response with a `null` requestId terminates the read loop.
- **Messages are serialized** — AMPHP channels use `serialize()`/`unserialize()`. Custom objects must support it.
- **WeakClosure callbacks** — all dispatcher callbacks are weakly referenced to avoid circular references and let GC do its job.

See also
--------

[](#see-also)

- [`flyokai/data-service`](../data-service/README.md) — socket-based service built on this dispatcher
- [`flyokai/data-service-message`](../data-service-message/README.md) — concrete request/response DTOs

License
-------

[](#license)

MIT

###  Health Score

40

—

FairBetter than 86% of packages

Maintenance97

Actively maintained with recent releases

Popularity6

Limited adoption so far

Community8

Small or concentrated contributor base

Maturity41

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 63.6% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~4 days

Total

4

Last Release

29d ago

### Community

Maintainers

![](https://avatars.githubusercontent.com/u/247743048?v=4)[flyokai](/maintainers/flyokai)[@flyokai](https://github.com/flyokai)

---

Top Contributors

[![wtsergo](https://avatars.githubusercontent.com/u/305326?v=4)](https://github.com/wtsergo "wtsergo (7 commits)")[![flyokai](https://avatars.githubusercontent.com/u/247743048?v=4)](https://github.com/flyokai "flyokai (4 commits)")

###  Code Quality

TestsPHPUnit

### Embed Badge

![Health badge](/badges/flyokai-amp-channel-dispatcher/health.svg)

```
[![Health](https://phpackages.com/badges/flyokai-amp-channel-dispatcher/health.svg)](https://phpackages.com/packages/flyokai-amp-channel-dispatcher)
```

###  Alternatives

[amphp/process

A fiber-aware process manager based on Amp and Revolt.

25657.8M64](/packages/amphp-process)[amphp/parallel-functions

Parallel processing made simple.

27910.5M26](/packages/amphp-parallel-functions)[setono/sylius-gift-card-plugin

Gift card plugin for Sylius

51179.0k1](/packages/setono-sylius-gift-card-plugin)[kijtra/textdiff

Simple text diff Class. 簡易的なテキスト比較ライブラリです。

491.6k](/packages/kijtra-textdiff)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
