PHPackages                             initphp/queue - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Caching](/categories/caching)
4. /
5. initphp/queue

ActiveLibrary[Caching](/categories/caching)

initphp/queue
=============

The framework-less BabelQueue runtime for plain PHP: a polyglot queue worker loop with URN routing, retries/back-off, dead-letter routing and a database (PDO) transport, built on babelqueue/php-sdk.

2.1.0(1w ago)08MITPHPPHP ^8.2CI passing

Since Dec 10Pushed 1w agoCompare

[ Source](https://github.com/InitPHP/Queue)[ Packagist](https://packagist.org/packages/initphp/queue)[ Docs](https://github.com/InitPHP/Queue)[ GitHub Sponsors](https://github.com/muhammetsafak)[ RSS](/packages/initphp-queue/feed)WikiDiscussions main Synced yesterday

READMEChangelog (2)Dependencies (7)Versions (8)Used By (0)

InitPHP Queue
=============

[](#initphp-queue)

[![CI](https://github.com/InitPHP/Queue/actions/workflows/ci.yml/badge.svg)](https://github.com/InitPHP/Queue/actions/workflows/ci.yml)[![License: MIT](https://camo.githubusercontent.com/8bb50fd2278f18fc326bf71f6e88ca8f884f72f179d3e555e20ed30157190d0d/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f6c6963656e73652d4d49542d677265656e2e737667)](LICENSE)[![PHP Version](https://camo.githubusercontent.com/610fa46e1adcd232c72a957bff198a45a2cbbf3ead460717db36553d96114009/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f7068702d254532253839254135253230382e322d3737376262342e737667)](composer.json)

> The **framework-less [BabelQueue](https://babelqueue.com) runtime for plain PHP** — a polyglot queue worker for apps that have no Laravel queue or Symfony Messenger of their own.

InitPHP Queue gives a plain PHP / Slim / Mezzio application the piece [`babelqueue/php-sdk`](https://github.com/BabelQueue/php-sdk) deliberately leaves to the framework: the **consumer loop, retries with back-off and dead-letter routing**, plus a **database (PDO) transport** the core SDK does not ship. It reuses the SDK's canonical `{ job, trace_id, data, meta, attempts }` envelope, so the queue you produce and consume here is the *same* queue a Go, Python, Node or .NET service reads — messages are routed by a stable **URN**, never a PHP class name.

Where it sits
-------------

[](#where-it-sits)

Layer`babelqueue/php-sdk`**InitPHP Queue**Wire format / contractCanonical envelope, URN scheme, validation, dead-letter annotation*(reuses it)*Producer`EnvelopeCodec` + a publish `Transport``Producer` facade**Consumer loop****— (left to the framework)****`Worker`: reserve → route → ack / retry / dead-letter**TransportsRedis &amp; AMQP (publish only)Redis, AMQP **and PDO** (publish **+** consume)Requirements
------------

[](#requirements)

- PHP **8.2+**
- [`babelqueue/php-sdk`](https://packagist.org/packages/babelqueue/php-sdk) `^1.0` (installed automatically)
- A broker client for the transport you choose:
    - `ext-pdo` for the database transport
    - [`predis/predis`](https://packagist.org/packages/predis/predis) for Redis (Redis **6.2+**)
    - [`php-amqplib/php-amqplib`](https://packagist.org/packages/php-amqplib/php-amqplib) for RabbitMQ

Installation
------------

[](#installation)

```
composer require initphp/queue
# plus the client for your broker, e.g.:
composer require predis/predis
```

Quick start
-----------

[](#quick-start)

### 1. Write a handler

[](#1-write-a-handler)

A handler is mapped to a message **URN**, not to a PHP class name. Return to acknowledge; throw to fail (the worker retries, then dead-letters).

```
use BabelQueue\Contracts\InboundMessage;
use InitPHP\Queue\Contracts\Handler;

final class SendWelcomeEmail implements Handler
{
    public function handle(InboundMessage $message): void
    {
        $data = $message->getData();      // ['user_id' => 42, 'email' => '...']
        // ... do the work. Throwing marks the message as failed.
    }
}
```

### 2. Produce a message

[](#2-produce-a-message)

```
use BabelQueue\Codec\EnvelopeCodec;
use InitPHP\Queue\Producer\Producer;
use InitPHP\Queue\Transport\Redis\RedisTransport;

$transport = new RedisTransport(new Predis\Client('tcp://127.0.0.1:6379'));
$producer  = new Producer($transport, defaultQueue: 'emails');

// From a URN + pure-JSON data:
$producer->send('urn:babel:users:registered', ['user_id' => 42, 'email' => 'a@b.c']);
```

A Go or Python consumer subscribed to the same `emails` queue reads the identical envelope.

### 3. Run a worker

[](#3-run-a-worker)

Build the worker in a small bootstrap file that the CLI loads:

```
// worker.php
use InitPHP\Queue\Consumer\Dispatcher;
use InitPHP\Queue\Consumer\Worker;
use InitPHP\Queue\Consumer\WorkerOptions;
use InitPHP\Queue\Routing\HandlerMap;
use InitPHP\Queue\Transport\Redis\RedisTransport;

require __DIR__ . '/vendor/autoload.php';

$transport = new RedisTransport(new Predis\Client('tcp://127.0.0.1:6379'));

$handlers = (new HandlerMap())
    ->register('urn:babel:users:registered', SendWelcomeEmail::class);

$options = new WorkerOptions(maxAttempts: 3, backoff: [1, 5, 15]);

return new Worker($transport, new Dispatcher($handlers), $options);
```

```
php bin/queue work --bootstrap=worker.php --queue=emails
# or process exactly one message and exit:
php bin/queue work --bootstrap=worker.php --queue=emails --once
```

Prefer to drive it from your own code? Skip the CLI and call the worker directly:

```
$worker->run('emails');       // loop until SIGINT/SIGTERM or a configured limit
$worker->runOnce('emails');   // process at most one message
```

Transports
----------

[](#transports)

All three implement both the SDK's publish `Transport` and this package's `ConsumerTransport`, so one object both produces and consumes.

```
use InitPHP\Queue\Transport\Pdo\PdoTransport;
use InitPHP\Queue\Transport\Redis\RedisTransport;
use InitPHP\Queue\Transport\Amqp\AmqpTransport;

// Database (no extra broker to run):
$pdo = new PDO('mysql:host=127.0.0.1;dbname=app', 'user', 'pass');
$transport = new PdoTransport($pdo, table: 'jobs');
$transport->createSchema();   // dev/test convenience; see docs for production DDL

// Redis 6.2+ (reliable-queue: BLMOVE/LREM):
$transport = new RedisTransport(new Predis\Client('tcp://127.0.0.1:6379'));

// RabbitMQ:
$connection = new PhpAmqpLib\Connection\AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$transport  = new AmqpTransport($connection->channel());
```

Retries, back-off and dead-letters
----------------------------------

[](#retries-back-off-and-dead-letters)

A failed message is re-queued with an incremented `attempts` and a back-off delay until `WorkerOptions::$maxAttempts` is reached; then it is annotated with a `dead_letter` block and moved to the dead-letter destination (`:failed`list on Redis, `.failed` queue on RabbitMQ, the `*_failed` table on PDO).

```
new WorkerOptions(
    maxAttempts: 5,          // total tries before dead-lettering
    backoff: [1, 5, 30],     // seconds between attempts (last value repeats)
    maxJobs: 1000,           // stop after N messages (pair with a supervisor)
    memoryLimitMb: 128,      // stop when memory grows past this
);
```

Delivery is **at-least-once** — make handlers idempotent, or let the worker do it for you (see below).

Idempotent consumption
----------------------

[](#idempotent-consumption)

Because delivery is at-least-once, a transport may hand the same message to a worker more than once (after a crash before the ack, a lapsed reservation, or a broker hiccup). The dispatcher can dedupe these redeliveries so a message is processed **at most once** — opt in by giving it an `IdempotencyOptions`:

```
use InitPHP\Queue\Consumer\Dispatcher;
use InitPHP\Queue\Consumer\IdempotencyOptions;
use InitPHP\Queue\Consumer\InMemoryIdempotencyStore;

$dispatcher = new Dispatcher(
    $handlers,
    idempotency: new IdempotencyOptions(new InMemoryIdempotencyStore()),
);
```

With no options (the default) deduplication is off and the dispatcher behaves exactly as before — this is fully backward-compatible.

**How it dedupes.** Before running a handler the dispatcher derives a stable dedup key from the message and *claims* it; a key that is already recorded means the message was processed before, so the handler is skipped and the message is acknowledged and discarded. The key is **only recorded after the handler succeeds** — a handler that throws leaves the key free, so the at-least-once redelivery is retried as normal.

The key is derived, in order, from:

1. a custom `keyResolver` you supply (`fn (ReceivedMessage): ?string`), else
2. the producer-minted **`meta.id`** (the canonical message identity), else
3. the **`trace_id`**.

A message with none of these has no stable identity and is processed without deduplication.

```
new IdempotencyOptions(
    store: $store,
    keyPrefix: 'bq:idemp:',   // namespaces keys in a shared store
    ttl: 86_400,              // seconds a processed key is retained (null = forever)
    leaseSeconds: 300,        // in-flight claim lease (frees a crashed worker's claim)
    keyResolver: fn ($m) => $m->getData()['order_id'] ?? null, // optional
);
```

### Bring your own store

[](#bring-your-own-store)

`InMemoryIdempotencyStore` is correct for a **single long-running worker** but it is not shared between processes and does not survive a restart, so it cannot dedupe across the whole fleet. For that, back the `InitPHP\Queue\Contracts\IdempotencyStore` contract with a durable, atomic store — exactly how the InitPHP ecosystem treats Cache and Database backends as pluggable seams:

```
use InitPHP\Queue\Contracts\IdempotencyStore;

final class RedisIdempotencyStore implements IdempotencyStore
{
    public function __construct(private \Redis $redis) {}

    public function seen(string $key): bool
    {
        return (bool) $this->redis->exists($key);
    }

    public function claim(string $key, int $leaseSeconds = 0): bool
    {
        // Atomic test-and-set: only the first caller wins the claim.
        $options = ['NX'];
        if ($leaseSeconds > 0) {
            $options['EX'] = $leaseSeconds;
        }

        return (bool) $this->redis->set($key, '1', $options);
    }

    public function remember(string $key, ?int $ttl = null): void
    {
        $ttl !== null && $ttl > 0
            ? $this->redis->set($key, '1', ['EX' => $ttl])
            : $this->redis->set($key, '1');
    }

    public function forget(string $key): void
    {
        $this->redis->del($key);
    }
}
```

A PDO-backed store works the same way: `claim()` is an `INSERT` that fails on a unique `key` column (caught and reported as `false`), `remember()` upserts the row, and a `processed_at`/`expires_at` column drives the TTL.

Unknown-URN strategy
--------------------

[](#unknown-urn-strategy)

When a message arrives whose URN has no mapped handler, the dispatcher applies one of the four canonical BabelQueue strategies:

```
use BabelQueue\Routing\UnknownUrnStrategy;

new Dispatcher($handlers, UnknownUrnStrategy::DEAD_LETTER);
// FAIL (default) | DELETE | RELEASE | DEAD_LETTER
```

A malformed or unsupported-`schema_version` envelope is always **quarantined**(dead-lettered), never silently dropped.

Documentation
-------------

[](#documentation)

Full guides live in [`docs/`](docs/):

GuideWhat it covers[Getting started](docs/getting-started.md)Install, the three moving parts, your first producer + worker.[Envelope &amp; URNs](docs/envelope-and-urns.md)The wire format and how to name message URNs.[Handlers &amp; routing](docs/handlers-and-routing.md)Writing handlers, the `HandlerMap`, unknown-URN strategies.[The worker &amp; retries](docs/worker-and-retries.md)The loop, `WorkerOptions`, back-off, limits, graceful shutdown, the CLI.[Dead-letter handling](docs/dead-letter.md)When messages are quarantined and how to inspect/replay them.[PDO transport](docs/transports/pdo.md)Schema, production DDL, reservation semantics.[Redis transport](docs/transports/redis.md)Keys, the reliable-queue pattern, delayed retries.[RabbitMQ transport](docs/transports/amqp.md)Properties, headers, retry/delay caveats.[Interoperability](docs/interoperability.md)Consuming a Go/Python-produced message end to end.[Migrating from 1.x](docs/migrating-from-v1.md)What changed and how to port `1.x` jobs.Migrating from 1.x
------------------

[](#migrating-from-1x)

`2.0` is a breaking rewrite. See [UPGRADE-2.0.md](UPGRADE-2.0.md) and [docs/migrating-from-v1.md](docs/migrating-from-v1.md).

Testing
-------

[](#testing)

```
composer install
composer test            # unit suite (no broker required)
composer ci              # cs-check + phpstan + tests
```

Integration tests against real Redis/RabbitMQ/MySQL run in CI (and locally when the matching `QUEUE_TEST_*` environment variables are set).

Contributing
------------

[](#contributing)

Fork, branch, add tests for your change, and open a pull request. All code is released under the MIT License.

Credits
-------

[](#credits)

- [Muhammet ŞAFAK](https://www.muhammetsafak.com.tr) &lt;&gt;

License
-------

[](#license)

Copyright © 2023–2026 InitPHP — released under the [MIT License](LICENSE).

###  Health Score

45

—

FairBetter than 91% of packages

Maintenance98

Actively maintained with recent releases

Popularity5

Limited adoption so far

Community6

Small or concentrated contributor base

Maturity60

Established project with proven stability

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~185 days

Recently: every ~231 days

Total

6

Last Release

12d ago

Major Versions

v1.0.x-dev → 2.0.02026-06-08

PHP version history (2 changes)1.0PHP &gt;=7.4

2.0.0PHP ^8.2

### Community

Maintainers

![](https://www.gravatar.com/avatar/4b6b34f3ac8938d8ee52ba3bd260680855dc5715c7b2929d9380de30d15a67dd?d=identicon)[muhammetsafak](/maintainers/muhammetsafak)

---

Top Contributors

[![muhammetsafak](https://avatars.githubusercontent.com/u/104234499?v=4)](https://github.com/muhammetsafak "muhammetsafak (5 commits)")

---

Tags

amqpbabelqueueconsumerdead-letterinitphpjob-queuemessage-queuepdophppolyglotqueuerabbitmqredisworkerpdoredisqueuerabbitmqconsumerworkerpolyglotdead-letterinitphpbabelqueue

###  Code Quality

TestsPHPUnit

Static AnalysisPHPStan

Code StylePHP CS Fixer

Type Coverage Yes

### Embed Badge

![Health badge](/badges/initphp-queue/health.svg)

```
[![Health](https://phpackages.com/badges/initphp-queue/health.svg)](https://phpackages.com/packages/initphp-queue)
```

###  Alternatives

[mjphaynes/php-resque

Redis backed library for creating background jobs and processing them later.

228201.4k2](/packages/mjphaynes-php-resque)[spinx/sidekiq-job-php

Push and schedule jobs to Sidekiq from PHP

37236.2k](/packages/spinx-sidekiq-job-php)[pdffiller/qless-php

PHP Bindings for qless

29113.7k2](/packages/pdffiller-qless-php)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
