PHPackages                             phpdot/rabbitmq - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. phpdot/rabbitmq

ActiveLibrary[Queues &amp; Workers](/categories/queues)

phpdot/rabbitmq
===============

RabbitMQ client for PHP: publish, consume, retry, dead letter, topology.

v1.3.0(1mo ago)04MITPHPPHP &gt;=8.3

Since May 3Pushed 1mo agoCompare

[ Source](https://github.com/phpdot/rabbitmq)[ Packagist](https://packagist.org/packages/phpdot/rabbitmq)[ RSS](/packages/phpdot-rabbitmq/feed)WikiDiscussions main Synced 1w ago

READMEChangelogDependencies (9)Versions (5)Used By (0)

phpdot/rabbitmq
===============

[](#phpdotrabbitmq)

RabbitMQ messaging for PHP: publish, consume, retry, dead letter.

Install
-------

[](#install)

```
composer require phpdot/rabbitmq
```

Quick Start
-----------

[](#quick-start)

```
use PHPdot\RabbitMQ\RabbitMQConnection;
use PHPdot\RabbitMQ\Config\RabbitMQConfig;
use PHPdot\RabbitMQ\Message;
use PHPdot\RabbitMQ\Enum\TaskStatus;

$config = new RabbitMQConfig(
    host: 'localhost',
    exchanges: [
        'tasks' => ['type' => 'direct', 'durable' => true],
    ],
    queues: [
        'tasks.process' => [
            'bindings' => [['exchange' => 'tasks', 'routing_key' => 'task.new']],
            'durable' => true,
        ],
    ],
);

$conn = new RabbitMQConnection($config);

// Publish
$conn->message('{"task":"send_email"}')
    ->publish('tasks', 'task.new');

// Consume
$conn->consume('tasks.process')
    ->execute(function (Message $msg): TaskStatus {
        processTask(json_decode($msg->body(), true));
        return TaskStatus::SUCCESS;
    });
```

---

Architecture
------------

[](#architecture)

 ```
graph TD
    subgraph RabbitMQConnection
        direction TB
        CONN[Connect, reconnect, channel, topology]
        subgraph Publishers & Consumers
            direction LR
            PUB[PublisherFluent builder:compress, trace,priority, headers]
            CON[Consumerbasic_consume loopwith retry/deadletter handling]
        end
        TOPO[Topology ManagerDeclares exchanges, queues, bindings,retry queues — cached after first use]
        CONN --> PUB
        CONN --> CON
        PUB --> TOPO
        CON --> TOPO
    end
```

      Loading ---

Publishing
----------

[](#publishing)

```
// Simple
$conn->message('{"order_id": 123}')
    ->publish('orders', 'order.created');

// Full-featured
$conn->message(json_encode($data))
    ->retry(5)
    ->priority(8)
    ->compress()
    ->header(['traceparent' => $traceHeader])
    ->header(['x-source' => 'api-gateway'])
    ->publish('orders', 'order.created');
```

**Auto-set properties:**

PropertyDefault`message_id`UUIDv7`timestamp``time()``app_id``gethostname()``content_type`Auto-detected (JSON or text)`delivery_mode`2 (persistent)---

Consuming
---------

[](#consuming)

```
$conn->consume('orders.process')
    ->prefetch(10)
    ->onRetry(function (Message $msg, int $count): void {
        echo "Retry #{$count}: {$msg->messageId()}\n";
    })
    ->onDead(function (Message $msg, string $reason): void {
        echo "Dead: {$msg->messageId()} — {$reason}\n";
    })
    ->execute(function (Message $msg): TaskStatus {
        $data = json_decode($msg->body(), true);

        if ($data === null) {
            return TaskStatus::DEAD;     // malformed, don't retry
        }

        try {
            processOrder($data);
            return TaskStatus::SUCCESS;  // done
        } catch (TemporaryException $e) {
            return TaskStatus::RETRY;    // try again
        } catch (PermanentException $e) {
            return TaskStatus::DEAD;     // give up
        }
    });
```

Three return values. No ambiguity:

- **SUCCESS** — ack, done
- **RETRY** — nack to retry queue, try again later
- **DEAD** — ack + forward to dead letter exchange

Unhandled exceptions are caught and dead-lettered automatically. The consumer never crashes.

---

Retry &amp; Dead Letter
-----------------------

[](#retry--dead-letter)

### Retry Flow

[](#retry-flow)

 ```
graph TD
    A[orders.queue] -->|nack| B[orders.queue.retry.exchange]
    B --> C["orders.queue.retry(TTL queue, e.g. 500ms)"]
    C -->|TTL expires| A
```

      Loading Enable in config:

```
'queues' => [
    'orders.process' => [
        'bindings' => [['exchange' => 'orders', 'routing_key' => 'order.created']],
        'retry' => ['enable' => true, 'delay_ms' => 500],
        'dead' => 'dead-letters',
        'durable' => true,
    ],
],
```

Retry infrastructure (exchange, TTL queue, bindings) is created automatically on first use.

### Dead Letter

[](#dead-letter)

When max retries exceeded or `TaskStatus::DEAD` returned, the message is forwarded to the dead letter exchange with:

- `x-failed-queue` — original queue name
- `x-failed-reason` — failure description
- `x-failed-timestamp` — Unix timestamp

### Replay

[](#replay)

After fixing a bug, replay dead-lettered messages back to their original queue. Same callback pattern as consuming — return an enum, the library handles the rest.

```
use PHPdot\RabbitMQ\Enum\ReplayAction;

$result = $conn->replay('orders.dead')
    ->limit(100)
    ->execute(function (Message $msg): ReplayAction {
        echo "[{$msg->messageId()}] {$msg->failedReason()}\n";

        // Bad payload — clean up DB and discard permanently
        if ($msg->failedReason() === 'Invalid payload') {
            Order::where('message_id', $msg->messageId())->delete();
            return ReplayAction::REMOVE;
        }

        // Timeout errors — bug is fixed, send it back
        if (str_contains($msg->failedReason(), 'timeout')) {
            return ReplayAction::REPLAY;
        }

        // Unknown — leave in DLQ for investigation
        return ReplayAction::SKIP;
    });

echo "Replayed: {$result->replayed}, Removed: {$result->removed}, Skipped: {$result->skipped}\n";
```

Three actions per message:

- **REPLAY** — ack + republish to original exchange with original message ID. Dead-letter metadata stripped, retry counter reset.
- **REMOVE** — ack + discard permanently. Use the callback to clean up related data before it's gone.
- **SKIP** — nack with requeue. Message stays in DLQ for later.

Uses `basic_get` (pull mode) — processes available messages and stops. No infinite loop.

---

Compression
-----------

[](#compression)

```
// Publish compressed
$conn->message($largePayload)
    ->compress()
    ->publish('data', 'import.batch');

// Consume — transparent decompression
$conn->consume('data.process')
    ->execute(function (Message $msg): TaskStatus {
        $body = $msg->body();  // already decompressed
        return TaskStatus::SUCCESS;
    });
```

---

Trace Propagation
-----------------

[](#trace-propagation)

Pass trace context as plain string headers. No coupling to any tracing library.

```
// Publish with trace
$conn->message($payload)
    ->header(['traceparent' => $tracelog->getTraceparent()->toHeader()])
    ->publish('orders', 'order.created');

// Consume with trace
$conn->consume('orders.process')
    ->execute(function (Message $msg): TaskStatus {
        $traceparent = $msg->header('traceparent');  // '' if missing
        // Reconnect trace in your framework layer
        return TaskStatus::SUCCESS;
    });
```

---

Configuration
-------------

[](#configuration)

```
$config = new RabbitMQConfig(
    host: 'rabbitmq.internal',
    port: 5672,
    username: 'app',
    password: 'secret',
    vhost: '/',
    timeoutMs: 30000,
    maxRetries: 3,
    retryDelayMs: 1000,
    exchanges: [
        'orders' => ['type' => 'direct', 'durable' => true],
        'notifications' => ['type' => 'fanout', 'durable' => true],
        'dead' => ['type' => 'direct', 'durable' => true],
    ],
    queues: [
        'orders.process' => [
            'bindings' => [
                ['exchange' => 'orders', 'routing_key' => 'order.created'],
                ['exchange' => 'orders', 'routing_key' => 'order.updated'],
            ],
            'retry' => ['enable' => true, 'delay_ms' => 500],
            'dead' => 'dead',
            'durable' => true,
        ],
    ],
);
```

---

RabbitMQConnection Resilience
-----------------------------

[](#rabbitmqconnection-resilience)

Auto-reconnect with exponential backoff:

```
Attempt 1: wait 1s
Attempt 2: wait 2s
Attempt 3: wait 4s
→ ConnectionException if all fail

```

---

Message API
-----------

[](#message-api)

```
$msg->body();              // message content (decompressed)
$msg->messageId();         // UUIDv7 message ID
$msg->queue();             // queue name
$msg->header('key');       // header value or ''
$msg->headers();           // all headers as array
$msg->maxRetries();        // x-retries-max value
$msg->priority();          // 0-10
$msg->originalExchange();  // x-original-exchange
$msg->originalRoutingKey();// x-original-routing-key
$msg->failedReason();      // x-failed-reason (on dead letters)
```

---

CLI Commands
------------

[](#cli-commands)

When installed in a phpdot app (with `phpdot/console`), these commands auto-discover and become available under your CLI entry point:

```
rabbitmq:status              Show broker connectivity and topology drift
rabbitmq:queues              Show queue depths and consumer counts
rabbitmq:topology:declare    Declare exchanges, queues, and bindings from config
rabbitmq:peek                Inspect messages without consuming them
rabbitmq:replay              Replay messages from a dead-letter queue
rabbitmq:dlq:analyze         Group dead-letter messages by reason and origin

```

### `rabbitmq:status`

[](#rabbitmqstatus)

Connection check + topology drift detection — confirms the broker is up, credentials work, and every exchange/queue defined in `config/rabbitmq.php`is actually declared.

```
$ php bin/console rabbitmq:status

RabbitMQ Status

  Host           localhost:5672
  Vhost          /
  User           app
  Connection     ✓ ok (8.2 ms)

Topology in config/rabbitmq.php:

  ✓ exchange  events                          (direct, durable)
  ✓ queue     orders                          (durable)
  ✗ queue     payments                        NOT declared on broker

1 resource(s) missing — run `rabbitmq:topology:declare` to fix.

```

### `rabbitmq:queues`

[](#rabbitmqqueues)

Queue depths and consumer counts — basic observability without opening the management UI. Supports `--filter=substring` and `--watch` (refresh every second).

```
$ php bin/console rabbitmq:queues

╔══════════════════╦══════════╦═══════════╗
║ Queue            ║ Messages ║ Consumers ║
╠══════════════════╬══════════╬═══════════╣
║ orders           ║      127 ║         3 ║
║ orders.dlq       ║       12 ║         0 ║
║ payments         ║       45 ║         2 ║
╚══════════════════╩══════════╩═══════════╝

3 queues, 184 messages, 5 consumers

```

### `rabbitmq:topology:declare`

[](#rabbitmqtopologydeclare)

Applies the exchanges/queues/bindings from `config/rabbitmq.php` to the broker. Idempotent — existing matching resources are skipped. Supports `--dry-run` (preview) and `--force` (drop+recreate mismatched queues, DESTRUCTIVE — confirms before dropping unless `--no-interaction`).

### `rabbitmq:peek  [--limit=5]`

[](#rabbitmqpeek-queue---limit5)

Inspects messages in a queue without consuming them. Each peeked message is returned to the queue via `basic_reject` with requeue.

### `rabbitmq:replay  [--limit=10] [--dry-run]`

[](#rabbitmqreplay-queue---limit10---dry-run)

Wraps the `Replayer` class — requeues dead-letter messages back to their original exchange. `--dry-run` previews what would replay without acking.

### `rabbitmq:dlq:analyze  [--limit=500]`

[](#rabbitmqdlqanalyze-queue---limit500)

Samples a dead-letter queue and groups messages by death reason and original routing-key — one-screen incident triage.

```
Analysis of orders.dlq (12 messages sampled)

Reason for dead-lettering:
  expired                   7  ████████████░░░░░░░░  58%
  rejected                  4  ██████░░░░░░░░░░░░░░  33%
  maxlen                    1  ██░░░░░░░░░░░░░░░░░░   9%

By original routing-key:
  order.created             9  ███████████████░░░░░  75%
  order.updated             3  █████░░░░░░░░░░░░░░░  25%

```

---

Package Structure
-----------------

[](#package-structure)

```
src/
├── RabbitMQConnection.php   Main entry point
├── RabbitMQConnector.php    phpdot/pool ConnectorInterface adapter
├── Publisher.php            Fluent message builder
├── Consumer.php             Message consumer with retry/dead letter
├── Replayer.php             Dead letter queue replay
├── Message.php              Immutable inbound message DTO
├── Config/
│   └── RabbitMQConfig.php   Connection and topology configuration
├── Enum/
│   ├── TaskStatus.php       SUCCESS, RETRY, DEAD
│   └── ReplayAction.php     REPLAY, REMOVE, SKIP
├── Result/
│   └── ReplayResult.php     Replay outcome counts
├── Topology/
│   └── TopologyManager.php  Exchange/queue/binding declaration
├── Cli/Command/
│   ├── StatusCommand.php
│   ├── QueuesCommand.php
│   ├── TopologyDeclareCommand.php
│   ├── PeekCommand.php
│   ├── ReplayCommand.php
│   └── DlqAnalyzeCommand.php
└── Exception/
    ├── RabbitMQException.php   Base exception
    ├── ConnectionException.php
    ├── PublishException.php
    └── ConsumeException.php

```

---

Development
-----------

[](#development)

```
composer test        # PHPUnit (unit tests only)
composer test-all    # PHPUnit (including integration, needs RabbitMQ)
composer analyse     # PHPStan level 10
composer cs-fix      # PHP-CS-Fixer
composer check       # All three (unit + analyse + cs-check)
```

License
-------

[](#license)

MIT

###  Health Score

41

—

FairBetter than 87% of packages

Maintenance93

Actively maintained with recent releases

Popularity5

Limited adoption so far

Community6

Small or concentrated contributor base

Maturity51

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~0 days

Total

4

Last Release

37d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/62e82421bda4b5d6ba9a47ba6d88caca060dcd0d1a2862f351f3a97657385db0?d=identicon)[phpdot](/maintainers/phpdot)

---

Top Contributors

[![phpdot](https://avatars.githubusercontent.com/u/252500?v=4)](https://github.com/phpdot "phpdot (4 commits)")

---

Tags

rabbitmqmessagingretryAMQPconsumerpublisherdead-letter

###  Code Quality

TestsPHPUnit

Static AnalysisPHPStan

Code StylePHP CS Fixer

Type Coverage Yes

### Embed Badge

![Health badge](/badges/phpdot-rabbitmq/health.svg)

```
[![Health](https://phpackages.com/badges/phpdot-rabbitmq/health.svg)](https://phpackages.com/packages/phpdot-rabbitmq)
```

###  Alternatives

[matomo/matomo

Matomo is the leading Free/Libre open analytics platform

21.6k38.2k](/packages/matomo-matomo)[php-amqplib/rabbitmq-bundle

Integrates php-amqplib with Symfony &amp; RabbitMq. Formerly emag-tech-labs/rabbitmq-bundle, oldsound/rabbitmq-bundle.

1.2k20.7M68](/packages/php-amqplib-rabbitmq-bundle)[enqueue/enqueue

Message Queue Library

19020.5M62](/packages/enqueue-enqueue)[bschmitt/laravel-amqp

AMQP wrapper for Laravel and Lumen to publish and consume messages

2782.4M7](/packages/bschmitt-laravel-amqp)[prolic/humus-amqp

PHP-AMQP library with RabbitMQ Extensions

76209.1k5](/packages/prolic-humus-amqp)[kdyby/rabbitmq

Integrates php-amqplib with RabbitMq and Nette Framework

27715.0k4](/packages/kdyby-rabbitmq)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
