PHPackages                             bschmitt/laravel-amqp - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. bschmitt/laravel-amqp

ActiveLibrary[Queues &amp; Workers](/categories/queues)

bschmitt/laravel-amqp
=====================

AMQP wrapper for Laravel and Lumen to publish and consume messages

v3.4.1(1mo ago)2822.5M↓25.1%85[24 issues](https://github.com/bschmitt/laravel-amqp/issues)[2 PRs](https://github.com/bschmitt/laravel-amqp/pulls)7MITPHPPHP ^7.3|^8.0CI passing

Since Jun 29Pushed 1mo ago11 watchersCompare

[ Source](https://github.com/bschmitt/laravel-amqp)[ Packagist](https://packagist.org/packages/bschmitt/laravel-amqp)[ RSS](/packages/bschmitt-laravel-amqp/feed)WikiDiscussions master Synced 2d ago

READMEChangelog (10)Dependencies (35)Versions (31)Used By (7)

Laravel AMQP Package
====================

[](#laravel-amqp-package)

A detailed AMQP wrapper for Laravel and Lumen to publish and consume messages, especially from RabbitMQ. This package provides full support for RabbitMQ features including RPC patterns, management operations, message properties, and more.

[![Build Status](https://camo.githubusercontent.com/81c39a820304e4b65c631a4e95c84e16f854195c18d3d4f9ae684732c515a52e/68747470733a2f2f7472617669732d63692e6f72672f627363686d6974742f6c61726176656c2d616d71702e7376673f6272616e63683d6d6173746572)](https://travis-ci.org/bschmitt/laravel-amqp)[![CI](https://github.com/zfhassaan/laravel-amqp/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/zfhassaan/laravel-amqp/actions/workflows/ci.yml)[![Latest Stable Version](https://camo.githubusercontent.com/89a5fa9a6aa3228fbccd3de2d6251828dc16a2a7ef504c3ca9f3fc03507748fc/68747470733a2f2f706f7365722e707567782e6f72672f627363686d6974742f6c61726176656c2d616d71702f762f737461626c652e737667)](https://packagist.org/packages/bschmitt/laravel-amqp)[![PHP](https://camo.githubusercontent.com/c21a8416a8d4c7a8e224e9ba273a436e41c4fa9bf5707be46af193640952a550/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f5048502d372e33253230746f253230382e352d3737374242343f6c6f676f3d706870266c6f676f436f6c6f723d7768697465)](#requirements)[![Laravel](https://camo.githubusercontent.com/7aa5c884f96c567e86483853287994c660f03f0431e6be4096b561c811de697f/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f4c61726176656c2d38253230746f25323031332d4646324432303f6c6f676f3d6c61726176656c266c6f676f436f6c6f723d7768697465)](#requirements)[![License](https://camo.githubusercontent.com/25bc1a3091d0ba10777b5d152d8133de1d5557c73df4781aba72512789917955/68747470733a2f2f706f7365722e707567782e6f72672f627363686d6974742f6c61726176656c2d616d71702f6c6963656e73652e737667)](https://packagist.org/packages/bschmitt/laravel-amqp)[![Total Downloads](https://camo.githubusercontent.com/4d6f6e67fd6e5b4061b979e1b041f0f2fab3d202937c4e0593fcec0df9663abf/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f64742f627363686d6974742f6c61726176656c2d616d71702e7376673f7374796c653d666c61742d737175617265)](https://packagist.org/packages/bschmitt/laravel-amqp)

Features
--------

[](#features)

### Core Features

[](#core-features)

- Advanced queue configuration
- Easy message publishing to queues
- Flexible queue consumption with useful options
- Support for all RabbitMQ exchange types (topic, direct, fanout, headers)
- Full AMQP message properties support

### Version 3.1.0+ New Features

[](#version-310-new-features)

- **RPC Pattern Support** - Built-in request-response patterns with `rpc()` and `reply()` methods
- **Queue Management** - Programmatic control (purge, delete, unbind)
- **Management HTTP API** - Full integration with RabbitMQ Management API
- **Policy Management** - Create, update, and delete policies programmatically
- **Feature Flags** - Query RabbitMQ feature flags
- **Enhanced Message Properties** - Full support for priority, correlation\_id, headers, etc.
- **Listen Method** - Auto-create queues and bind to multiple routing keys
- **Connection Configuration Helper** - Easy access to connection configs

### Advanced Features

[](#advanced-features)

- Publisher Confirms - Guaranteed message delivery
- Consumer Prefetch (QoS) - Rate limiting and flow control
- Queue Types - Classic, Quorum, and Stream queues
- Dead Letter Exchanges - Message routing for failed messages
- **Advanced retry &amp; dead-letter abstractions** - Declarative `RetryPolicy` + `DeadLetterTopology` + `RetryHandler` with fixed/exponential backoff and auto-routing to DLQ when retries exhaust (see [Retry &amp; DLQ Abstractions](#retry--dead-letter-abstractions))
- **Delayed messaging &amp; publish backoff** - `publishLater()` / `publishTypedLater()` with TTL+DLX or `rabbitmq-delayed-message-exchange` plugin strategies, plus `PublishBackoff` for publisher-side transient retries (see [Delayed Messaging](#delayed-messaging--publish-backoff))
- **Typed message contracts** - `MessageContractInterface`, `TypedMessage` base class, `publishTyped()` / `consumeTyped()` with pluggable `MessageSerializerInterface` (JSON by default) (see [Typed Messaging](#typed-message-contracts--dto-serialization))
- **JSON schema validation** - Zero-dependency `SchemaValidator` (Draft 7 subset) validates payloads against contract `schema()` definitions (see [JSON Schema Validation](#json-schema-validation))
- Message Priority - Priority-based message processing
- TTL Support - Message and queue expiration
- Lazy Queues - Disk-based message storage
- Alternate Exchange - Unroutable message handling
- **Native Laravel Queue integration** - Use `amqp` as a `config/queue.php` driver with `queue:work`
- **Artisan commands** - `amqp:work` (with `--retry`/`--contract`/`--validate-schema`), `amqp:consume`, `amqp:listen`, `amqp:publish` (with `--delay-ms`), and `amqp:purge`
- **Exchange &amp; topology builders** - `ExchangeTopology` declarative exchange + queue bindings with `declareExchangeTopology()` (see [Production Infrastructure](#production-infrastructure))
- **Quorum &amp; priority queue profiles** - `QueueProfile::quorum()`, `priority()`, and `quorumWithPriority()` presets for `queue_properties` (see [Production Infrastructure](#production-infrastructure))
- **Resilient connections &amp; pooling** - `ResilientConnectionManager` auto-reconnect with heartbeat staleness checks; `ConnectionPool` for persistent worker channels (see [Production Infrastructure](#production-infrastructure))
- **Distributed tracing** - W3C `traceparent` propagation via `TracePropagatorInterface` (OTel bridge via `CallbackTracePropagator`); enable with `propagate_trace` on publish/consume (see [Production Infrastructure](#production-infrastructure))
- **Correlation ID propagation** - `CorrelationContext` with `propagate_correlation` on publish/consume (see [Production Infrastructure](#production-infrastructure))
- **Consumer lifecycle hooks** - `ConsumerLifecycle` graceful shutdown, signal handlers, and `consumeWithLifecycle()` (see [Production Infrastructure](#production-infrastructure))
- **SAGA workflow helpers** - `Saga` step/compensation orchestrator with `SagaResult` reporting (see [SAGA, Events, Middleware &amp; Testing](#saga-events-middleware--testing))
- **Laravel events &amp; consume middleware** - `MessagePublishing`/`MessagePublished`/`MessageReceived`/`MessageHandled`/`MessageFailed` events plus `ConsumePipeline` / `ConsumeMiddlewareInterface` and `consumeWithMiddleware()` (see [SAGA, Events, Middleware &amp; Testing](#saga-events-middleware--testing))
- **Fake AMQP test driver** - `Amqp::fake()` / `FakeAmqp` with `assertPublished()`, `assertPublishedCount()`, `assertNothingPublished()` (see [SAGA, Events, Middleware &amp; Testing](#saga-events-middleware--testing))
- **Publisher confirms &amp; async publishing** - persistent-channel `AsyncPublisher` with batched confirms via `Amqp::asyncPublisher()` (see [SAGA, Events, Middleware &amp; Testing](#saga-events-middleware--testing))
- **RPC abstraction helpers** - `RpcClient` / `RpcServer` with `RpcCallResult`, JSON mode, and `Amqp::rpcClient()` / `rpcServer()` (see [Scale &amp; Interop](#scale--interop))
- **Cross-service / polyglot messaging** - `InteropEnvelope` standard headers (`x-message-type`, `x-schema-version`, `x-source-service`) via `publishInterop()` / `consumeInterop()` (see [Scale &amp; Interop](#scale--interop))
- **Observability &amp; queue metrics** - `MetricsCollector`, `QueueMetrics`, `Amqp::metrics()`, `queueMetrics()` / `getQueueStats()` (see [Scale &amp; Interop](#scale--interop))
- **High-performance workers** - `WorkerOptions`, `HighPerformanceWorker`, `consumeOptimized()`, and `amqp:work --optimized` (see [Scale &amp; Interop](#scale--interop))
- **gRPC-lite typed RPC** - `Rpc::call(UserService::class, GetUserRequest::make([...]))` with typed request/response DTOs, service registries, and `Rpc::serve()` on the server (see [gRPC-lite RPC](#grpc-lite-rpc))
- **Service discovery** - `Rpc::service('payments')->call(...)` resolves a short name to a registered `RpcService` class; opt-in via `static alias()` on the service (see [Messaging Platform](#laravel-messaging-platform))
- **Saga facade** - `Saga::make()->step(...)->compensate(...)` fluent syntax with reverse-order compensation on failure (see [Messaging Platform](#laravel-messaging-platform))
- **Message contracts dispatch** - `OrderCreated::dispatch(['orderId' => 'o-1'])` auto-serializes and publishes typed messages via the Laravel container (see [Messaging Platform](#laravel-messaging-platform))
- **Dead-letter management** - `Amqp::deadLetters()->for('orders.dlq')->count()/peek()/summarize()/messages()/replayTo()/purge()` + `php artisan amqp:dlq` (see [Messaging Platform](#laravel-messaging-platform))
- **Declarative retry attribute** - `#[Retry(attempts: 5, strategy: RetryStrategy::EXPONENTIAL)]` on handlers, hydrated via `RetryPolicy::fromAttribute()` (see [Messaging Platform](#laravel-messaging-platform))
- **Monitoring dashboard** - `Amqp::dashboard($queues)->snapshot()` with lag, DLQ summaries, and RPC histograms; `php artisan amqp:monitor` / `amqp:dlq` (see [Messaging Platform](#laravel-messaging-platform))
- **RPC latency tracking** - `RpcLatencyRecorder`, per-call `durationMs` on `RpcCallResult`, `RpcCallStarted` / `RpcCallCompleted` / `RpcCallFailed` events (see [gRPC-lite RPC](#grpc-lite-rpc))
- **Causation ID propagation** - `CorrelationContext` now propagates both `correlation_id` and `x-causation-id` so consumers can chain "this happened because of that" (see [Messaging Platform](#laravel-messaging-platform))
- **MessageStore** - `MessageStoreInterface` + `InMemoryMessageStore`; opt-in audit log of every publish/consume via `Amqp::setMessageStore()` (see [Messaging Platform](#laravel-messaging-platform))
- **Async Laravel events** - mark events with `ShouldPublishToAmqpInterface` and enable `amqp.broadcast_laravel_events` to auto-publish `event(new OrderCreated())` to RabbitMQ (see [Messaging Platform](#laravel-messaging-platform))
- **Laravel Pulse integration** - `AmqpPulseRecorder` auto-records publish/handle/fail/RPC/DLQ events to Pulse when `laravel/pulse` is installed; opt out with `amqp.pulse_integration => false` (see [Messaging Platform](#laravel-messaging-platform))
- **OpenTelemetry bridge** - `OpenTelemetryTracePropagator` injects the active OTel span context into AMQP headers (with W3C fallback) when `open-telemetry/api` is installed (see [Production Infrastructure](#production-infrastructure))
- **Correlation ID visualisation** - `CorrelationChain::tree()` / `render()` reconstruct causation graphs from the `MessageStore`; `php artisan amqp:trace ` prints an ASCII tree or JSON (see [Messaging Platform](#laravel-messaging-platform))
- **Kubernetes liveness / readiness probes** - `HealthState` + `HealthCheck` + `Http\Controllers\HealthController`; opt-in HTTP routes (`GET {prefix}/live|ready`) plus `php artisan amqp:health` for exec probes (see [Kubernetes &amp; Cloud Native](#kubernetes--cloud-native))
- **Consumer autoscaling recommendations** - `AutoscalingAdvisor` (depth + lag heuristics, KEDA trigger spec) and `php artisan amqp:scale` CLI (see [Kubernetes &amp; Cloud Native](#kubernetes--cloud-native))
- **Laravel Cloud compatibility** - `LaravelCloud` detector + `AMQP_URL` / `CLOUDAMQP_URL` / `RABBITMQ_URL` DSN auto-hydration on register (see [Kubernetes &amp; Cloud Native](#kubernetes--cloud-native))
- **Multi-region deployment support** - `MultiRegionConnection` resolver with locality preference, cool-down blacklist, and `withFailover()` retry loop across region-scoped connection keys (see [Kubernetes &amp; Cloud Native](#kubernetes--cloud-native))

Planned Features
----------------

[](#planned-features)

Status legend: **\[x\]** shipped · **\[~\]** partial / building blocks shipped (full feature still planned) · **\[ \]** not started.

> Many "partial" items already ship as a programmatic API or CLI; the outstanding work is usually a UI, native integration, or codegen layer. See the [Features](#features) section above for the full list of already-shipped capabilities.

### Observability

[](#observability)

- Laravel Pulse integration for AMQP metrics — `AmqpPulseRecorder` auto-subscribes to publish/handle/fail/RPC/DLQ events when `laravel/pulse` is installed; disable via `amqp.pulse_integration => false`
- Native OpenTelemetry exporter support — `OpenTelemetryTracePropagator` bridges to `open-telemetry/api` when installed (active span context auto-injected); falls back to W3C generation otherwise. `CallbackTracePropagator` remains for custom APMs
- Correlation ID visualization — `CorrelationChain::tree()` / `render()` reconstruct causation graphs from the `MessageStore`; `php artisan amqp:trace ` prints an ASCII tree or JSON
- Queue throughput monitoring — `MetricsCollector`, `QueueMetrics`, `Amqp::metrics()` / `queueMetrics()`, `MonitoringDashboard`
- Consumer lag monitoring — `QueueMetrics::lag()`, `lagSeconds()`, `isLagging()`; `--lag-threshold` / `--lag-seconds` / `--lag-age` on `amqp:monitor`
- Dead-letter queue monitoring — `DeadLetterManager::peek()` / `summarize()`, `dead_letters` block in dashboard, `php artisan amqp:dlq`
- RPC latency tracking — `RpcLatencyRecorder`, `RpcCallResult::durationMs()`, `RpcCallCompleted` / `RpcCallFailed` events, `--rpc` on `amqp:monitor`
- Distributed trace propagation — `TraceContext`, `W3cTracePropagator`, `propagate_trace` on publish/consume

---

### Developer Experience

[](#developer-experience)

- AMQP Explorer (Telescope-like message inspector) — `php artisan amqp:explore` (with `--id`, filters, JSON mode)
- Message replay tooling — `php artisan amqp:replay` + `MessageStoreInterface` source + target/exchange overrides
- Failed message browser — `php artisan amqp:explore` + `amqp:dlq` (`messages`, `peek`, `summarize`) for CLI inspection
- Live queue inspector — `php artisan amqp:inspect` watch loop over `queueMetrics()` / Management API
- Message payload diff viewer — `php artisan amqp:diff {left} {right}` + structural JSON/text diffing
- Schema validation debugger — `php artisan amqp:schema:debug` (interactive/file/store payload sources)
- Interactive RPC testing console — `php artisan amqp:rpc:console` with JSON/raw payload modes

---

### Kubernetes &amp; Cloud Native

[](#kubernetes--cloud-native)

- Kubernetes-ready consumer lifecycle management — `ConsumerLifecycle::withHealth()` stamps `HealthState` on every start/stop/message/error
- Graceful shutdown support — `ConsumerLifecycle` signal handlers (`SIGTERM` / `SIGINT` via `pcntl`) + cooperative `requestStop()`
- Readiness probe endpoint — `GET {prefix}/ready` HTTP route + `php artisan amqp:health --probe=ready` for exec probes
- Liveness probe endpoint — `GET {prefix}/live` HTTP route + `php artisan amqp:health --probe=live`
- Auto-recovery after broker failures — `ResilientConnectionManager` (reconnect + heartbeat staleness) and `ConnectionPool`
- Consumer autoscaling recommendations — `AutoscalingAdvisor` + `php artisan amqp:scale` (depth/lag heuristics, KEDA-ready trigger output)
- Laravel Cloud compatibility — `LaravelCloud` detector + `AMQP_URL` / `CLOUDAMQP_URL` / `RABBITMQ_URL` auto-hydration
- Multi-region deployment support — `MultiRegionConnection` resolver with locality preference, cool-down blacklist, and `withFailover()` retry loop

---

### Enterprise Messaging

[](#enterprise-messaging)

- \[~\] Dead-letter queue management UI — `DeadLetterManager` API + `amqp:dlq` / `amqp:monitor` CLI; web UI still planned
- Scheduled message delivery (absolute time) — only **relative** delays today via `publishLater()` / `dispatchLater()`
- Delayed message support — `DelayedPublisher`, `Amqp::publishLater()` / `publishTypedLater()`, `TypedMessage::dispatchLater()`, `amqp:publish --delay-ms`
- Message priority queues — `QueueProfile::priority()` / `quorumWithPriority()`, `x-max-priority`, publish `priority`
- Bulk publish operations — `Amqp::batchBasicPublish()` / `batchPublish()` + `BatchManager`
- \[~\] Consumer rate limiting — QoS / prefetch only (`basic_qos`, `--prefetch-count`, `WorkerOptions::throughput()` / `lowLatency()`); no app-level msgs/sec throttling
- Circuit breaker support
- \[~\] Retry policy dashboard — `RetryPolicy`, `#[Retry]`, `RetryHandler`, `consumeWithRetry()`; metrics via `amqp:monitor` (no dedicated retry UI)

---

### Polyglot Microservices

[](#polyglot-microservices)

- Contract generation for TypeScript
- Contract generation for Go
- Contract generation for Python
- \[~\] JSON Schema export — contracts can declare `schema()` and validate in-process; no `export` CLI yet
- AsyncAPI specification generation
- Service registry integration — `ServiceRegistry`, `Rpc::services()->register()` / `autodiscover()`, `Rpc::service('alias')->call(...)`
- \[~\] Cross-language RPC contracts — `InteropEnvelope` standard headers (`x-message-type`, `x-schema-version`, `x-source-service`); no codegen for foreign languages

---

### Operations

[](#operations)

- \[~\] Horizon-style AMQP dashboard — `MonitoringDashboard` snapshot + `php artisan amqp:monitor [--json]`; full web UI still planned
- \[~\] Queue health monitoring — Management API stats + `amqp:monitor` (no dedicated health view)
- \[~\] Consumer health monitoring — consumer counts / rates via Management API; no consumer health UI
- Queue topology visualizer (`ExchangeTopology` / `DeadLetterTopology` are **code** builders, not a visualizer)
- Exchange / routing-key explorer
- \[~\] Production diagnostics commands — `amqp:monitor`, `amqp:publish`, `amqp:consume`, `amqp:listen`, `amqp:purge`, `amqp:work`; no dedicated `amqp:diagnose`
- \[~\] Broker connectivity diagnostics — `ResilientConnectionManager` handles reconnection; no standalone diagnostic command

---

### Security

[](#security)

- Message encryption support
- Message signing &amp; verification
- Sensitive payload masking
- \[~\] Audit trail integration — `MessageStoreInterface` + `InMemoryMessageStore` (opt-in append log of every publish/consume)
- Per-consumer access controls
- Security scanning for message contracts

---

### AI &amp; Modern Features

[](#ai--modern-features)

- AI-powered message anomaly detection
- AI-assisted queue optimization recommendations
- Natural language queue diagnostics
- Intelligent retry recommendations

Requirements
------------

[](#requirements)

- **PHP** 7.3 through 8.5 (`composer.json`: `^7.3|^8.0`)
- **Laravel** 8.x through 13.x (or Lumen 8.x+)
- RabbitMQ 3.x (tested with `rabbitmq:3-management` Docker image)

LaravelMinimum PHPNotes8.x7.3Last Laravel version for PHP 7.3 / 7.49.x8.0.2Use PHP 8.0.2+ (not 8.0.0/8.0.1)10.x8.111.x / 12.x8.213.x8.3Config supports both `use` + `properties` (current) and legacy `default` + `connections` layouts.

Installation
------------

[](#installation)

### Composer

[](#composer)

```
composer require bschmitt/laravel-amqp
```

For Laravel 5.5+:

```
"bschmitt/laravel-amqp": "^3.1"
```

For Laravel &lt; 5.5:

```
"bschmitt/laravel-amqp": "^2.0"
```

Quick Start
-----------

[](#quick-start)

### Publishing Messages

[](#publishing-messages)

```
use Bschmitt\Amqp\Facades\Amqp;

// Basic publish
Amqp::publish('routing-key', 'message');

// Publish with queue creation
Amqp::publish('routing-key', 'message', ['queue' => 'queue-name']);

// Publish with message properties
Amqp::publish('routing-key', 'message', [
    'priority' => 10,
    'correlation_id' => 'unique-id',
    'reply_to' => 'reply-queue',
    'application_headers' => [
        'X-Custom-Header' => 'value'
    ]
]);
```

### Consuming Messages

[](#consuming-messages)

```
use Bschmitt\Amqp\Facades\Amqp;

// Consume and acknowledge (using dynamic call)
$amqp = app('Amqp');
$amqp->consume('queue-name', function ($message, $resolver) {
    echo $message->body;
    $resolver->acknowledge($message);
    $resolver->stopWhenProcessed();
});

// Consume forever
$amqp = app('Amqp');
$amqp->consume('queue-name', function ($message, $resolver) {
    processMessage($message->body);
    $resolver->acknowledge($message);
}, ['persistent' => true]);

// Alternative: Using resolve() helper
$amqp = resolve('Amqp');
$amqp->consume('queue-name', function ($message, $resolver) {
    processMessage($message->body);
    $resolver->acknowledge($message);
});
```

### RPC Pattern

[](#rpc-pattern)

```
// Client side - Make RPC call (using dynamic call)
$amqp = app('Amqp');
$response = $amqp->rpc('rpc-queue', 'request-data', [], 30);

// Server side - Process and reply (using dynamic call)
$amqp = app('Amqp');
$amqp->consume('rpc-queue', function ($message, $resolver) {
    $result = processRequest($message->body);
    $resolver->reply($message, $result);
    $resolver->acknowledge($message);
});
```

### Listen to Multiple Routing Keys

[](#listen-to-multiple-routing-keys)

```
$amqp = app('Amqp');
$amqp->listen(['key1', 'key2', 'key3'], function ($message, $resolver) {
    processMessage($message->body);
    $resolver->acknowledge($message);
});
```

Artisan Commands
----------------

[](#artisan-commands)

The package registers five console commands. Handler classes must implement `Bschmitt\Amqp\Contracts\MessageHandlerInterface` or expose an `__invoke($message, $resolver)` method. The `$resolver` is the active consumer and provides `acknowledge()`, `reject()`, `reply()`, and `stopWhenProcessed()`.

### `amqp:work` — long-running worker

[](#amqpwork--long-running-worker)

```
php artisan amqp:work my-queue --handler="App\\Messaging\\ProcessOrderHandler"
```

OptionDescription`--handler=`**Required.** FQCN of your message handler`--connection=`Connection name from `config/amqp.php``--exchange=` / `--exchange-type=`Override exchange settings`--routing-key=*`Routing key(s) to bind (repeatable)`--prefetch-count=`Enable QoS with this prefetch count`--max-messages=0`Stop after N messages (0 = unlimited)`--max-time=0`Stop after N seconds`--memory=128`Exit if memory exceeds MB`--stop-when-empty`Exit when the queue is drained instead of waiting`--requeue-on-error`Requeue messages when the handler throws### `amqp:consume` — process a fixed number of messages

[](#amqpconsume--process-a-fixed-number-of-messages)

```
php artisan amqp:consume my-queue --handler="App\\Messaging\\ProcessOrderHandler" --max-messages=10
php artisan amqp:consume my-queue --handler="App\\Messaging\\ProcessOrderHandler" --all
```

Defaults to one message per invocation. Use `--all` to drain the queue.

### `amqp:listen` — listen on routing keys

[](#amqplisten--listen-on-routing-keys)

```
php artisan amqp:listen order.created order.updated --handler="App\\Messaging\\OrderHandler"
```

Creates an auto-deleted queue (unless `--queue=` or `--no-auto-delete` is set) and binds it to every supplied routing key.

### `amqp:publish` — publish from the CLI

[](#amqppublish--publish-from-the-cli)

```
php artisan amqp:publish order.created --body='{"id":42}' --exchange=orders --priority=5
php artisan amqp:publish order.created --file=./payload.json --headers='{"X-Source":"cli"}'
php artisan amqp:publish order.created --body='{"id":42}' --delay-ms=5000 --exchange=orders
```

Use `--delay-ms` to schedule delivery (TTL+DLX by default, or `--delay-strategy=plugin` when the delayed-message exchange plugin is installed).

### `amqp:purge` — empty a queue

[](#amqppurge--empty-a-queue)

```
php artisan amqp:purge my-queue --force
```

### Retry options on `amqp:work`

[](#retry-options-on-amqpwork)

OptionDescription`--retry=N`Wraps the handler in a `RetryHandler` and configures up to `N` retries (0 disables retries)`--retry-delay=ms`Base delay between retries in milliseconds (default `1000`)`--retry-backoff=fixed|exponential`Backoff strategy (default `fixed`)`--retry-multiplier=2.0`Growth factor for exponential backoff`--retry-max-delay=ms`Cap for the computed retry delay (`0` = uncapped)`--retry-jitter=ms`Random jitter added to each retry delay`--dlq=name`Override the dead-letter queue name (default `{queue}.dlq`)`--declare-topology`Pre-declare the work + DLQ + retry queues before consuming`--contract=`FQCN of a `MessageContractInterface` to deserialize bodies into (passed as 3rd handler arg)`--validate-schema`Validate inbound JSON against the contract's `schema()` before invoking the handlerSee [Retry &amp; Dead-Letter Abstractions](#retry--dead-letter-abstractions) for the full picture.

### Example handler

[](#example-handler)

```
namespace App\Messaging;

use Bschmitt\Amqp\Contracts\ConsumerInterface;
use Bschmitt\Amqp\Contracts\MessageHandlerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class ProcessOrderHandler implements MessageHandlerInterface
{
    public function handle(AMQPMessage $message, ConsumerInterface $resolver, $typed = null): void
    {
        $order = $typed !== null ? $typed->toPayload() : json_decode($message->body, true);
        // ... process $order ...
        $resolver->acknowledge($message);
    }
}
```

Laravel Queue Driver
--------------------

[](#laravel-queue-driver)

Use this package as a native Laravel queue backend so jobs can be dispatched with `dispatch()`, `Queue::push()`, and processed with `php artisan queue:work`.

### 1. Publish AMQP config

[](#1-publish-amqp-config)

```
php artisan vendor:publish --provider="Bschmitt\Amqp\Providers\AmqpServiceProvider"
```

### 2. Add queue connection

[](#2-add-queue-connection)

Merge the example from `config/queue-amqp.php` into `config/queue.php`:

```
'connections' => [
    // ...
    'amqp' => [
        'driver' => 'amqp',
        'connection' => env('AMQP_ENV', 'production'), // key in config/amqp.php properties
        'queue' => env('AMQP_QUEUE', 'default'),
        'retry_after' => 90,
    ],
],
```

### 3. Set default queue connection (optional)

[](#3-set-default-queue-connection-optional)

```
QUEUE_CONNECTION=amqp
```

### 4. Run the worker

[](#4-run-the-worker)

```
php artisan queue:work amqp --queue=default
```

Jobs are published to your configured exchange with the queue name as the routing key. Delayed jobs use a TTL dead-letter queue per delay interval.

### Delayed &amp; released jobs

[](#delayed--released-jobs)

```
ProcessOrder::dispatch($order)->delay(now()->addMinutes(5));
```

`AmqpQueue::later()` publishes to a per-TTL delay queue (`{queue}.delay.{ttl_ms}`) with `x-dead-letter-exchange` / `x-message-ttl` so RabbitMQ delivers the job back to the main queue when the delay expires. `$job->release($seconds)` uses the same mechanism.

### Verify the driver

[](#verify-the-driver)

```
vendor/bin/phpunit --testdox \
  --filter 'AmqpQueue|AmqpJob|AmqpConnector|AmqpServiceProviderQueue|QueueConfigResolver|LaravelQueue'
```

Full setup, architecture and troubleshooting: [docs/content/queue-driver.md](docs/content/queue-driver.md) or the interactive docs site (`docs/index.html`).

Configuration
-------------

[](#configuration)

### Laravel

[](#laravel)

Publish the configuration file:

```
php artisan vendor:publish --provider="Bschmitt\Amqp\Providers\AmqpServiceProvider"
```

Or manually copy `vendor/bschmitt/laravel-amqp/config/amqp.php` to `config/amqp.php`.

### Lumen

[](#lumen)

Create a `config` folder in your Lumen root and copy the configuration file:

```
mkdir config
cp vendor/bschmitt/laravel-amqp/config/amqp.php config/amqp.php
```

Register the service provider in `bootstrap/app.php`:

```
$app->configure('amqp');
$app->register(Bschmitt\Amqp\Providers\LumenServiceProvider::class);

// For Lumen 5.2+, enable facades
$app->withFacades(true, [
    'Bschmitt\Amqp\Facades\Amqp' => 'Amqp',
]);
```

### Configuration Example

[](#configuration-example)

```
return [
    'use' => env('AMQP_ENV', 'production'),

    'properties' => [
        'production' => [
            'host'                => env('AMQP_HOST', 'localhost'),
            'port'                => env('AMQP_PORT', 5672),
            'username'            => env('AMQP_USER', 'guest'),
            'password'            => env('AMQP_PASSWORD', 'guest'),
            'vhost'               => env('AMQP_VHOST', '/'),
            'exchange'            => env('AMQP_EXCHANGE', 'amq.topic'),
            'exchange_type'       => env('AMQP_EXCHANGE_TYPE', 'topic'),
            'consumer_tag'        => 'consumer',
            'ssl_options'         => [],
            'connect_options'     => [],
            'queue_properties'    => ['x-ha-policy' => ['S', 'all']],
            'exchange_properties' => [],
            'timeout'             => 0,

            // Management API (optional)
            'management_api_url' => env('AMQP_MANAGEMENT_URL', 'http://localhost:15672'),
            'management_api_user' => env('AMQP_MANAGEMENT_USER', 'guest'),
            'management_api_password' => env('AMQP_MANAGEMENT_PASSWORD', 'guest'),
        ],
    ],
];
```

Documentation
-------------

[](#documentation)

### Comprehensive Guides

[](#comprehensive-guides)

- **[User Manual](docs/USER_MANUAL.md)** - Complete usage guide
- **[Release Notes](RELEASE_NOTES.md)** - Version 3.4.0 changelog (latest: 3.4.0 minor release)
- **[FAQ](docs/laravel-amqp.wiki/FAQ.md)** - Common questions and answers

### Wiki Documentation

[](#wiki-documentation)

- **[Getting Started](docs/laravel-amqp.wiki/Getting-Started.md)** - Installation and first steps
- **[Configuration](docs/laravel-amqp.wiki/Configuration.md)** - Configuration guide
- **[Publishing Messages](docs/laravel-amqp.wiki/Publishing-Messages.md)** - Publishing guide
- **[Consuming Messages](docs/laravel-amqp.wiki/Consuming-Messages.md)** - Consumption guide
- **[RPC Pattern](docs/laravel-amqp.wiki/RPC-Pattern.md)** - Request-response patterns
- **[Queue Management](docs/laravel-amqp.wiki/Queue-Management.md)** - Queue operations
- **[Management API](docs/laravel-amqp.wiki/Management-API.md)** - HTTP API integration
- **[Message Properties](docs/laravel-amqp.wiki/Message-Properties.md)** - Message properties
- **[Advanced Features](docs/laravel-amqp.wiki/Advanced-Features.md)** - Advanced usage
- **[Architecture](docs/laravel-amqp.wiki/Architecture.md)** - Package architecture
- **[Testing](docs/laravel-amqp.wiki/Testing.md)** - Testing guide

### Module Documentation

[](#module-documentation)

See [docs/modules/](docs/modules/) for detailed module documentation:

- RPC Module
- Management Operations
- Management API
- Message Properties
- Consumer Prefetch
- And more...

Examples
--------

[](#examples)

### Fanout Exchange

[](#fanout-exchange)

```
// Publishing
Amqp::publish('', 'message', [
    'exchange_type' => 'fanout',
    'exchange' => 'amq.fanout',
]);

// Consuming (using dynamic call)
$amqp = app('Amqp');
$amqp->consume('', function ($message, $resolver) {
    echo $message->body;
    $resolver->acknowledge($message);
}, [
    'routing' => '',
    'exchange' => 'amq.fanout',
    'exchange_type' => 'fanout',
    'queue_force_declare' => true,
    'queue_exclusive' => true,
    'persistent' => true
]);
```

### Queue Management

[](#queue-management)

```
// Get Amqp instance
$amqp = app('Amqp');

// Purge queue
$amqp->queuePurge('my-queue', ['queue' => 'my-queue']);

// Delete queue
$amqp->queueDelete('my-queue', ['queue' => 'my-queue']);

// Get queue statistics
$stats = $amqp->getQueueStats('my-queue', '/');
```

### Management API

[](#management-api)

```
// Get Amqp instance
$amqp = app('Amqp');

// Get queue statistics
$stats = $amqp->getQueueStats('my-queue', '/');

// List connections
$connections = $amqp->getConnections();

// Create policy
$amqp->createPolicy('my-policy', [
    'pattern' => '^my-queue$',
    'definition' => ['max-length' => 1000]
], '/');
```

Retry &amp; Dead-Letter Abstractions
------------------------------------

[](#retry--dead-letter-abstractions)

Three small primitives let you build production-grade retry pipelines without hand-rolling DLX wiring:

- **`Bschmitt\Amqp\Support\RetryPolicy`** — declarative `max attempts` + backoff strategy (fixed, exponential, immediate, none) with optional cap and jitter.
- **`Bschmitt\Amqp\Support\DeadLetterTopology`** — describes the work queue, the DLQ, and the per-delay retry queues. Produces ready-to-use property arrays for `publish()` / `consume()`.
- **`Bschmitt\Amqp\Support\RetryHandler`** — decorator that wraps your handler. On exception it republishes the message to a TTL'd retry queue (which dead-letters back to the work queue when the TTL expires) and acknowledges the original delivery. When the retry budget is spent it rejects without requeue so RabbitMQ routes the message to the DLQ via the `x-dead-letter-exchange` configured on the work queue.

### Declare the topology once

[](#declare-the-topology-once)

```
use Bschmitt\Amqp\Support\DeadLetterTopology;
use Bschmitt\Amqp\Support\RetryPolicy;

$amqp = app('Amqp');

// RetryPolicy::exponential($maxAttempts, $baseDelayMs, $multiplier, $maxDelayMs)
$policy   = RetryPolicy::exponential(5, 1000, 2.0, 60000);
$topology = DeadLetterTopology::for('orders.process', $policy)
    ->on('app.events', 'topic')
    ->withRoutingKey('orders.process');

// Idempotently creates: orders.process, orders.process.dlq,
// and orders.process.retry.{1000,2000,4000,8000,16000} (capped at 60000).
$amqp->declareRetryTopology($topology);
```

### Consume with auto-retry / DLQ routing

[](#consume-with-auto-retry--dlq-routing)

```
$amqp->consumeWithRetry($topology, function ($message, $resolver) {
    processOrder(json_decode($message->body, true));
    $resolver->acknowledge($message);
});
```

When the handler throws:

1. `RetryHandler` reads (and bumps) the `x-retry-attempt` application header.
2. If the next attempt still fits the policy, the message is republished to `orders.process.retry.{delayMs}` with the computed TTL. RabbitMQ's DLX on that queue routes the message back to `orders.process` once the TTL expires.
3. When the retry budget is exhausted, the handler rejects the message without requeue and RabbitMQ forwards it to `orders.process.dlq` via the work queue's `x-dead-letter-exchange`.
4. The `x-first-failed-at` and `x-last-error` headers carry diagnostics forward across retries so DLQ inspection is meaningful.

### Pick a policy

[](#pick-a-policy)

```
use Bschmitt\Amqp\Support\RetryPolicy;

RetryPolicy::fixed(3, 1000);                       // 3 retries, 1s apart
RetryPolicy::exponential(5, 500, 2.0, 30000);      // 500ms doubling, capped at 30s
RetryPolicy::immediate(2);                         // 2 retries with zero delay
RetryPolicy::none();                               // failures go straight to the DLQ
```

### Wrap an existing handler manually

[](#wrap-an-existing-handler-manually)

```
use Bschmitt\Amqp\Support\RetryHandler;

$wrapped = $amqp->retryHandler($yourHandler, $topology, function ($level, $message, $context) {
    Log::log($level, $message, $context);
});

$amqp->consume('orders.process', $wrapped, $topology->toWorkProperties());
```

### Driving the worker from the CLI

[](#driving-the-worker-from-the-cli)

```
php artisan amqp:work orders.process \
    --handler="App\\Messaging\\ProcessOrderHandler" \
    --retry=5 \
    --retry-backoff=exponential \
    --retry-delay=1000 \
    --retry-multiplier=2.0 \
    --retry-max-delay=60000 \
    --dlq=orders.process.failed \
    --declare-topology
```

See `docs/content/advanced.md` and the unit tests under `test/Unit/Retry*` / `test/Unit/DeadLetterTopologyTest.php` for more examples.

Delayed Messaging &amp; Publish Backoff
---------------------------------------

[](#delayed-messaging--publish-backoff)

Schedule messages for later delivery or absorb transient broker errors on publish.

### `publishLater()` — schedule delivery

[](#publishlater--schedule-delivery)

```
$amqp = app('Amqp');

// TTL + dead-letter exchange (works on stock RabbitMQ)
$amqp->publishLater('orders.reminder', json_encode(['orderId' => 42]), 60000, [
    'exchange' => 'shop.events',
]);

// rabbitmq-delayed-message-exchange plugin (exchange must be x-delayed-message)
$amqp->publishLater('orders.reminder', $body, 60000, [
    'exchange' => 'shop.delayed',
    'delay_strategy' => 'plugin',
]);
```

`DelayedPublisher` creates a per-delay queue (`{routing}.delayed.{ms}`) with `x-message-ttl` and DLX routing back to the target exchange when using the default TTL strategy.

### `PublishBackoff` — retry failed publishes

[](#publishbackoff--retry-failed-publishes)

```
use Bschmitt\Amqp\Support\RetryPolicy;

$amqp->withPublishBackoff(RetryPolicy::exponential(3, 100, 2.0))->run(function () use ($amqp) {
    return $amqp->publish('orders.created', $payload);
});
```

This is separate from consumer-side `RetryHandler` — it retries the **publish** call itself when the broker throws.

Typed Message Contracts &amp; DTO Serialization
-----------------------------------------------

[](#typed-message-contracts--dto-serialization)

Define message shapes as plain PHP classes and let the package handle JSON encoding/decoding.

```
use Bschmitt\Amqp\Support\TypedMessage;

class OrderCreated extends TypedMessage
{
    public $orderId;
    public $total;
    public $currency;

    public function __construct($orderId = null, $total = null, $currency = null)
    {
        $this->orderId = $orderId;
        $this->total = $total;
        $this->currency = $currency;
    }

    public static function routingKey()
    {
        return 'orders.created';
    }

    public static function exchange()
    {
        return 'shop.events';
    }
}
```

```
$amqp = app('Amqp');

// Publish — picks up routing key + exchange from the contract
$amqp->publishTyped(new OrderCreated('order-1', 19.99, 'USD'));

// Consume — callback receives ($typed, $message, $resolver)
$amqp->consumeTyped('orders.queue', OrderCreated::class, function ($order, $message, $resolver) {
    processOrder($order->orderId);
    $resolver->acknowledge($message);
});

// Delayed typed publish
$amqp->publishTypedLater(new OrderCreated('order-2', 9.99, 'USD'), 30000);
```

Swap the serializer via `$amqp->setSerializer($mySerializer)` when you need MessagePack, Avro, etc.

JSON Schema Validation
----------------------

[](#json-schema-validation)

Contracts may expose a static `schema()` method returning a JSON Schema-style array. The package validates payloads on publish and consume using the bundled `SchemaValidator` (no external dependencies).

```
class OrderCreated extends TypedMessage
{
    // ...properties...

    public static function schema()
    {
        return [
            'type' => 'object',
            'required' => ['orderId', 'total', 'currency'],
            'additionalProperties' => false,
            'properties' => [
                'orderId'  => ['type' => 'string', 'minLength' => 1],
                'total'    => ['type' => 'number', 'minimum' => 0],
                'currency' => ['type' => 'string', 'enum' => ['USD', 'EUR', 'GBP']],
            ],
        ];
    }
}
```

Invalid payloads raise `Bschmitt\Amqp\Exception\SchemaValidationException` with a list of pointer-style error messages. On the CLI, combine `--contract` with `--validate-schema` on `amqp:work`.

Supported keywords include `type`, `required`, `properties`, `additionalProperties`, `enum`, `const`, `minimum`/`maximum`, `minLength`/`maxLength`, `pattern`, `format` (email, uri, uuid, date, date-time), `items`, `oneOf`/`anyOf`/`allOf`/`not`, and more — see `docs/content/advanced.md`.

Production Infrastructure
-------------------------

[](#production-infrastructure)

### Exchange topology builder

[](#exchange-topology-builder)

Declare an exchange and multiple bound queues in one fluent builder:

```
use Bschmitt\Amqp\Facades\Amqp;
use Bschmitt\Amqp\Support\ExchangeTopology;
use Bschmitt\Amqp\Support\QueueProfile;

$topology = ExchangeTopology::exchange('events', 'topic')
    ->bindQueue('orders.created', 'order.created')
    ->bindQueue('orders.shipped', 'order.shipped', QueueProfile::quorum());

Amqp::declareExchangeTopology($topology);

// Publish using properties for a specific queue in the topology
Amqp::publish('order.created', $payload, $topology->propertiesForQueue('orders.created'));
```

Shortcut: `Amqp::exchangeTopology('events', 'topic')->bindQueue(...)`.

### Quorum &amp; priority queues

[](#quorum--priority-queues)

```
use Bschmitt\Amqp\Support\QueueProfile;

Amqp::publish('jobs', $payload, QueueProfile::quorumWithPriority(10)->mergeInto([
    'queue' => 'jobs',
    'routing' => 'jobs',
]));
```

### Resilient connections &amp; connection pool

[](#resilient-connections--connection-pool)

```
use Bschmitt\Amqp\Facades\Amqp;
use Bschmitt\Amqp\Managers\ConnectionPool;

// Per-request resilient manager (reconnect + heartbeat staleness)
$resilient = Amqp::resilientConnection(['host' => 'rabbitmq'], [
    'max_reconnect_attempts' => 5,
    'heartbeat' => 30,
]);
$channel = $resilient->getChannel();

// Long-lived worker pool (persistent keys survive disconnectAll(false))
$pool = Amqp::connectionPool();
$manager = $pool->connection('worker', ['use' => 'production', 'resilient' => true], true);
```

### Correlation ID &amp; distributed tracing

[](#correlation-id--distributed-tracing)

```
use Bschmitt\Amqp\Support\CorrelationContext;

CorrelationContext::set('request-abc-123');

Amqp::publish('orders.created', $payload, [
    'propagate_correlation' => true,
    'propagate_trace' => true,
]);

Amqp::consumeWithLifecycle('orders.created', function ($message, $resolver) {
    // CorrelationContext::get() is populated when propagate_* flags are used
}, null, [
    'propagate_correlation' => true,
    'propagate_trace' => true,
]);
```

Bridge OpenTelemetry (or any APM) without a hard dependency:

```
use Bschmitt\Amqp\Support\CallbackTracePropagator;

Amqp::setTracePropagator(new CallbackTracePropagator(
    function (array $carrier, $context) {
        // inject active span into $carrier
        return $carrier;
    },
    function (array $carrier) {
        // extract TraceContext from $carrier or return null
        return null;
    }
));
```

### Consumer lifecycle

[](#consumer-lifecycle)

```
use Bschmitt\Amqp\Support\ConsumerLifecycle;

$lifecycle = (new ConsumerLifecycle())
    ->registerSignalHandlers()
    ->onStopping(function ($lifecycle) {
        // flush buffers, close DB connections, etc.
    });

Amqp::consumeWithLifecycle('jobs', $handler, $lifecycle);
```

See `docs/content/production-features.md` for the full reference.

SAGA, Events, Middleware &amp; Testing
--------------------------------------

[](#saga-events-middleware--testing)

### SAGA workflow

[](#saga-workflow)

```
use Bschmitt\Amqp\Facades\Amqp;

$saga = Amqp::saga('checkout')
    ->step('reserveStock', $reserveStock, $releaseStock)
    ->step('chargeCard',  $chargeCard,  $refundCard)
    ->step('shipOrder',   $shipOrder);

$result = $saga->execute(['orderId' => 42]);
if ($result->failed()) {
    Log::error('Saga failed', [
        'step' => $result->getFailedStep(),
        'compensated' => $result->getCompensatedSteps(),
        'error' => $result->getException()->getMessage(),
    ]);
}
```

Compensations only run for steps that completed before the failure, in reverse order.

### Laravel events

[](#laravel-events)

The package dispatches the following events through `\Illuminate\Support\Facades\Event` (and a local listener registry as a fallback for non-Laravel contexts):

EventWhen`Bschmitt\Amqp\Events\MessagePublishing`Right before a publish is sent`Bschmitt\Amqp\Events\MessagePublished`After a successful publish`Bschmitt\Amqp\Events\MessageReceived`When a message is received by the consume pipeline`Bschmitt\Amqp\Events\MessageHandled`After the handler completes`Bschmitt\Amqp\Events\MessageFailed`When the handler throwsListen in Laravel as usual:

```
Event::listen(\Bschmitt\Amqp\Events\MessageFailed::class, function ($event) {
    Log::warning('AMQP handler failed', ['error' => $event->exception->getMessage()]);
});
```

### Consume middleware

[](#consume-middleware)

Wrap the consume handler with a pipeline:

```
use Bschmitt\Amqp\Facades\Amqp;

Amqp::consumeWithMiddleware('orders', function ($message, $resolver) {
    // handle...
}, [
    function ($message, $next) {
        $start = microtime(true);
        $next($message);
        Log::info('handled', ['duration_ms' => (microtime(true) - $start) * 1000]);
    },
    // ...or a ConsumeMiddlewareInterface instance
]);
```

Each middleware receives `(AMQPMessage $message, callable $next)` and can short-circuit by not calling `$next`.

### Fake AMQP driver

[](#fake-amqp-driver)

In tests, replace the bound singleton with a recording fake:

```
use Bschmitt\Amqp\Core\Amqp;

public function test_publishes_order_created()
{
    $fake = Amqp::fake();

    (new CreateOrder)->handle();

    $fake->assertPublished('orders.created');
    $fake->assertPublishedCount(1, 'orders.created');
    $fake->assertNotPublished('orders.shipped');
}
```

The fake records both `publish()` and `publishLater()` calls; never touches the broker.

### Async publishing with publisher confirms

[](#async-publishing-with-publisher-confirms)

```
$async = Amqp::asyncPublisher(['exchange' => 'events'])
    ->onAck(function ($tag)  { /* metric: published */ })
    ->onNack(function ($tag) { /* metric: failed */ });

foreach ($messages as $m) {
    $async->publish('events.created', json_encode($m));
}

if (!$async->flush(30)) {
    Log::warning('Some publisher confirms timed out');
}

$async->close();
```

`AsyncPublisher` keeps a single channel open with `confirm_select` and only waits for confirmations on `flush()`, so high-throughput publishers don't block on the per-message round-trip.

Scale &amp; Interop
-------------------

[](#scale--interop)

### RPC abstraction

[](#rpc-abstraction)

```
// Client
$result = Amqp::rpcClient(['exchange' => 'rpc'])->asJson()->timeout(10)
    ->call('users.lookup', ['id' => 42]);

if ($result->succeeded()) {
    $user = $result->body();
}

// Server
Amqp::rpcServer()->asJson()->serve('rpc.users', function ($request, $consumer) {
    return ['id' => $request['id'], 'name' => 'Ada'];
});
```

`RpcCallResult` exposes `succeeded()`, `timedOut()`, and `body()`.

### Cross-service / polyglot messaging

[](#cross-service--polyglot-messaging)

```
Amqp::publishInterop(
    'orders.created',
    ['orderId' => 99, 'total' => 12.50],
    'orders.created',
    'billing-service',
    ['exchange' => 'events'],
    '2.0'
);

Amqp::consumeInterop('events.orders', function ($interop, $raw, $resolver) {
    $payload = \Bschmitt\Amqp\Support\InteropEnvelope::decodePayload($interop);
    // $interop->messageType, $interop->sourceService, $interop->schemaVersion
});
```

Standard headers (`x-message-type`, `x-schema-version`, `x-source-service`) let Node, Go, or Java consumers route messages without PHP DTOs.

### Observability &amp; queue metrics

[](#observability--queue-metrics)

```
// In-process counters (per worker / request)
$stats = Amqp::metrics()->snapshot();

// Broker-side queue depth + rates (Management API)
$metrics = Amqp::queueMetrics('orders', '/');
Log::info('queue depth', $metrics->toArray());
```

Publish/consume paths increment `MetricsCollector` automatically when using `publish()`, `consumeWithMiddleware()`, or `HighPerformanceWorker`.

### High-performance workers

[](#high-performance-workers)

```
Amqp::consumeOptimized('jobs', $handler, ['exchange' => 'work']);

// Or explicitly:
Amqp::highPerformanceWorker(
    \Bschmitt\Amqp\Support\WorkerOptions::throughput(100)
)->run('jobs', $handler);
```

CLI: `php artisan amqp:work jobs --handler=App\\Handlers\\JobHandler --optimized`

See `docs/content/scale-and-interop.md` for the full reference.

gRPC-lite RPC
-------------

[](#grpc-lite-rpc)

A typed, service-oriented RPC layer that feels like gRPC but rides on RabbitMQ. Define a service once, then call it from any process with typed DTOs.

### Define the service contract

[](#define-the-service-contract)

```
use Bschmitt\Amqp\Rpc\RpcService;
use Bschmitt\Amqp\Rpc\RpcRequest;
use Bschmitt\Amqp\Rpc\RpcResponse;

class UserService extends RpcService
{
    public static function queue(): string
    {
        return 'rpc.user-service';
    }

    public static function methods(): array
    {
        return [
            GetUserRequest::class    => 'getUser',
            CreateUserRequest::class => 'createUser',
        ];
    }
}

class GetUserRequest extends RpcRequest
{
    public $id;

    public function __construct($id = null) { $this->id = $id; }

    public static function responseClass()
    {
        return GetUserResponse::class;
    }
}

class GetUserResponse extends RpcResponse
{
    public $id;
    public $name;

    public function __construct($id = null, $name = null)
    {
        $this->id = $id;
        $this->name = $name;
    }
}
```

### Call from any client

[](#call-from-any-client)

```
use Rpc; // facade alias auto-registered

$response = Rpc::call(
    UserService::class,
    GetUserRequest::make(['id' => 5])
);

echo $response->name; // GetUserResponse instance, hydrated for you
```

`Rpc::call()` automatically:

- Resolves the queue from the service contract.
- JSON-encodes the request DTO.
- Issues a synchronous AMQP RPC round-trip via the existing primitive.
- Hydrates the reply into the request's `responseClass()` (or returns the raw decoded array).

Throws `RpcTimeoutException` if no reply arrives, or `RpcException` if the server returned an error envelope.

### Serve on the server side

[](#serve-on-the-server-side)

```
use Rpc;

class UserServiceHandler
{
    public function getUser(GetUserRequest $request): GetUserResponse
    {
        $user = User::findOrFail($request->id);
        return GetUserResponse::make([
            'id'   => $user->id,
            'name' => $user->name,
        ]);
    }

    public function createUser(CreateUserRequest $request): GetUserResponse
    {
        $user = User::create(['name' => $request->name]);
        return GetUserResponse::make(['id' => $user->id, 'name' => $user->name]);
    }
}

Rpc::register(UserService::class, UserServiceHandler::class)
   ->serve(UserService::class);
```

The handler may be an instance or a container-resolvable FQCN (Laravel only). Handler exceptions are wrapped into an `_rpc_error` envelope so the client raises a typed `RpcException` with the original message and class name.

### Configurable

[](#configurable)

```
// Global default timeout
Rpc::defaultTimeout(10);

// Per-call timeout + extra publish properties
Rpc::call(UserService::class, GetUserRequest::make(['id' => 1]), 5, [
    'exchange' => 'rpc.svc',
]);
```

### RPC latency &amp; events

[](#rpc-latency--events)

Every `Rpc::call()` records timing in `Amqp::rpcMetrics()` and dispatches Laravel events you can wire to Pulse, logs, or APM:

```
use Bschmitt\Amqp\Events\RpcCallCompleted;
use Bschmitt\Amqp\Events\RpcCallFailed;

Event::listen(RpcCallCompleted::class, fn ($e) => Log::info('rpc.ok', [
    'service' => $e->service,
    'request' => $e->request,
    'ms'      => $e->durationMs,
]));

$stats = Amqp::rpcMetrics()->snapshot();
// ['UserService::GetUserRequest' => ['count' => 42, 'p95_ms' => 12.5, 'error_rate' => 0.02, ...]]
```

Lower-level `RpcClient::call()` also returns `RpcCallResult::durationMs()`.

See `docs/content/grpc-lite-rpc.md` for the full reference.

Laravel Messaging Platform
--------------------------

[](#laravel-messaging-platform)

A set of higher-level building blocks that turn the package from "an AMQP client" into a full microservice toolkit: service discovery, sagas, message contracts, dead-letter management, declarative retry, monitoring, automatic context propagation, an audit log, and an event bridge.

### Service Discovery (`Rpc::service(...)`)

[](#service-discovery-rpcservice)

Skip exchange/routing-key/queue gymnastics — register a short name and call by that name.

```
use Bschmitt\Amqp\Facades\Rpc;

// Either: explicit registration
Rpc::services()->register('payments', PaymentsService::class);

// Or: opt-in auto-discovery (service exposes `public static function alias()`)
class PaymentsService extends RpcService {
    public static function queue(): string   { return 'rpc.payments'; }
    public static function methods(): array  { return [GetPayment::class => 'find']; }
    public static function alias(): ?string  { return 'payments'; }
}

Rpc::services()->autodiscover([PaymentsService::class]);

$response = Rpc::service('payments')
    ->timeout(5)
    ->call(GetPayment::make(['id' => 123]));
```

`Rpc::service()` accepts an alias **or** a service FQCN.

### Saga Facade

[](#saga-facade)

`Saga::make()->step()->compensate()` with reverse-order compensation when a step throws.

```
use Bschmitt\Amqp\Facades\Saga;

$result = Saga::make('checkout')
    ->step('reserve', fn($ctx) => $stock->reserve($ctx['orderId']))
        ->compensate(fn($ctx) => $stock->release($ctx['orderId']))
    ->step('charge',  fn($ctx) => $payments->charge($ctx['amount']))
        ->compensate(fn($ctx, $tx) => $payments->refund($tx))
    ->execute(['orderId' => 1, 'amount' => 49.99]);

if (!$result->succeeded()) {
    Log::error('Saga failed at ' . $result->getFailedStep(), [
        'compensated' => $result->getCompensatedSteps(),
    ]);
}
```

### Message Contracts (`OrderCreated::dispatch(...)`)

[](#message-contracts-ordercreateddispatch)

`TypedMessage` now exposes `make()` and `dispatch()` (and `dispatchLater()` for the delayed-queue variant).

```
use Bschmitt\Amqp\Support\TypedMessage;

class OrderCreated extends TypedMessage
{
    public $orderId;
    public $total;

    public static function name(): string { return 'orders.created'; }
}

OrderCreated::dispatch(['orderId' => 'o-1', 'total' => 9.99]);

OrderCreated::dispatchLater(['orderId' => 'o-1'], 2_000); // 2s delay
```

### Dead-Letter Management

[](#dead-letter-management)

```
use Bschmitt\Amqp\Facades\Amqp;

Amqp::deadLetters()->for('orders.dlq')->count();           // 17
Amqp::deadLetters()->for('orders.dlq')->peek(20);         // non-destructive sample
Amqp::deadLetters()->for('orders.dlq')->summarize(100);   // group by reason / error
Amqp::deadLetters()->for('orders.dlq')->messages(10);      // drain & inspect (destructive)
Amqp::deadLetters()->for('orders.dlq')->replayTo('orders', 50);
Amqp::deadLetters()->for('orders.dlq')->purge();
```

CLI:

```
php artisan amqp:dlq inspect orders.dlq
php artisan amqp:dlq summary orders.dlq --limit=200 --json
php artisan amqp:dlq replay  orders.dlq --target=orders --limit=50
php artisan amqp:dlq purge   orders.dlq --force
```

Lifecycle events: `DeadLetterDetected`, `DeadLetterReplayed`, `DeadLetterPurged`.

### Declarative Retry (`#[Retry]`)

[](#declarative-retry-retry)

```
use Bschmitt\Amqp\Attributes\Retry;
use Bschmitt\Amqp\Support\RetryStrategy;
use Bschmitt\Amqp\Support\RetryPolicy;

class CreateOrderHandler
{
    #[Retry(attempts: 5, strategy: RetryStrategy::EXPONENTIAL, delayMs: 500)]
    public function handle($message): void { /* ... */ }
}

$policy = RetryPolicy::fromAttribute(CreateOrderHandler::class, 'handle');
$amqp->consumeWithRetry('orders', $handler, $policy);
```

On PHP 7.x the attribute parses as a comment (the package still loads); call sites that want the attribute need PHP 8+.

### Monitoring Dashboard

[](#monitoring-dashboard)

```
$snapshot = Amqp::dashboard(['orders', 'orders.dlq'])
    ->deadLetters(['orders.dlq'])
    ->lagThresholds(1000, 60.0, 300)
    ->snapshot();
// process, queues (with lag / lag_seconds / lagging), dead_letters, rpc, lagging[], generated
```

CLI:

```
php artisan amqp:monitor --queue=orders --queue=orders.dlq --json
php artisan amqp:monitor --queue=orders --dlq=orders.dlq --rpc
php artisan amqp:monitor --queue=orders --lag-threshold=1000 --lag-seconds=60
# exits 1 when any queue breaches a lag threshold (cron-friendly)
```

Wire the snapshot into any HTTP route (Laravel, Symfony, Slim) to expose a JSON dashboard.

### Causation ID Propagation

[](#causation-id-propagation)

`CorrelationContext::inheritFromMessage()` now picks up the inbound `message_id` as the **causation\_id** for everything published afterwards, so downstream services can trace "this happened because of that" through a chain.

```
CorrelationContext::inheritFromMessage($incoming);

Amqp::publish('orders.created', $body, [
    'propagate_correlation' => true,
    'message_id' => uniqid('msg_', true),
]);
// outbound has `correlation_id`, `x-correlation-id`, and `x-causation-id` set
```

### Correlation Chain Visualisation

[](#correlation-chain-visualisation)

`CorrelationChain` walks the `MessageStore`, groups entries by `correlation_id`, and rebuilds the causation tree using the `x-causation-id` header — no UI server required.

```
use Bschmitt\Amqp\Support\CorrelationChain;

$chain = new CorrelationChain($amqp->messageStore());

$summary = $chain->summarize('corr_abc123');
// total, published, consumed, routings, first_at, last_at, duration_ms

$tree = $chain->tree('corr_abc123');         // nested ['entry' => ..., 'children' => [...]]
echo $chain->render($tree);                  // ASCII tree, perfect for logs
```

CLI:

```
php artisan amqp:trace corr_abc123
php artisan amqp:trace corr_abc123 --summary
php artisan amqp:trace corr_abc123 --json --limit=50
```

Sample output:

```
correlation_id: corr_abc123
messages: 4 (published=3, consumed=1)
span: 18.42 ms
routings: orders.created(1), orders.shipped(2), orders.invoiced(1)

[published] >> orders.created (msg=msg_root)
├── [published] >> orders.shipped (msg=msg_a)
│   └── [published] >> orders.invoiced (msg=msg_grand)
└── [consumed]   false,
];
```

The recorder is a silent no-op when Pulse is not installed — no exceptions, no log spam.

### OpenTelemetry Bridge

[](#opentelemetry-bridge)

`OpenTelemetryTracePropagator` plugs the `open-telemetry/api` SDK into the package's `TracePropagatorInterface` so that the active OTel span context is auto-injected into every AMQP `traceparent` / `tracestate` header.

```
use Bschmitt\Amqp\Contracts\TracePropagatorInterface;
use Bschmitt\Amqp\Support\OpenTelemetryTracePropagator;

// In a service provider:
$this->app->singleton(TracePropagatorInterface::class, function () {
    return new OpenTelemetryTracePropagator();
    // Or pass an explicit \OpenTelemetry\Context\Propagation\TextMapPropagatorInterface
});
```

When the SDK is absent the propagator falls back to W3C generation (`W3cTracePropagator`), so the same wiring works on stripped-down environments and in CI.

### MessageStore (audit log / event-sourcing seed)

[](#messagestore-audit-log--event-sourcing-seed)

```
use Bschmitt\Amqp\Support\InMemoryMessageStore;

$amqp->setMessageStore(new InMemoryMessageStore());

Amqp::publish('orders.created', '{}');

$entries = $amqp->messageStore()->all(['direction' => 'published']);
```

Implement `MessageStoreInterface` to back it with Eloquent / Redis / files for durable replay.

### Async Laravel Events

[](#async-laravel-events)

```
use Bschmitt\Amqp\Contracts\ShouldPublishToAmqpInterface;

class OrderCreated implements ShouldPublishToAmqpInterface
{
    public function __construct(public string $orderId) {}
}

// config/amqp.php
return [
    // ...
    'broadcast_laravel_events' => true,
];

event(new OrderCreated('o-1'));
// auto-published to RabbitMQ with routing key `order_created`
```

Override `amqpRouting()`, `amqpExchange()`, or `amqpPayload()` on the event to customise routing.

Testing
-------

[](#testing)

The package includes comprehensive test coverage:

```
# Run all tests
php vendor/bin/phpunit

# Run unit tests only
php vendor/bin/phpunit test/Unit/

# Run integration tests only
php vendor/bin/phpunit test/Integration/
```

**Test Requirements:**

- RabbitMQ server running (for integration tests)
- Docker: `docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management`

See [Testing Guide](docs/laravel-amqp.wiki/Testing.md) for more information.

Version 3.1.0+ Highlights
-------------------------

[](#version-310-highlights)

### New Methods

[](#new-methods)

**RPC:**

- `$amqp->rpc($routingKey, $request, $properties, $timeout)` - Make RPC calls (use `$amqp = app('Amqp')`)
- `Consumer::reply($message, $response, $properties)` - Send RPC responses
- `$amqp->listen($routingKeys, $callback, $properties)` - Auto-create queues with multiple bindings (use `$amqp = app('Amqp')`)

**Management:**

- `$amqp->queuePurge($queue, $properties)` - Purge queue (use `$amqp = app('Amqp')`)
- `$amqp->queueDelete($queue, $ifUnused, $ifEmpty, $properties)` - Delete queue
- `$amqp->queueUnbind(...)` - Unbind queue
- `$amqp->exchangeDelete(...)` - Delete exchange
- `$amqp->exchangeUnbind(...)` - Unbind exchange

**Management API:**

- `$amqp->getQueueStats($queue, $vhost, $properties)` - Queue statistics (use `$amqp = app('Amqp')`)
- `$amqp->getConnections($connectionName, $properties)` - List connections
- `$amqp->getChannels($channelName, $properties)` - List channels
- `$amqp->getNodes($nodeName, $properties)` - Cluster nodes
- `$amqp->getPolicies($properties)` - List policies
- `$amqp->createPolicy(...)` - Create policy
- `$amqp->updatePolicy(...)` - Update policy
- `$amqp->deletePolicy(...)` - Delete policy
- `$amqp->listFeatureFlags($properties)` - List feature flags
- `$amqp->getFeatureFlag($name, $properties)` - Get feature flag

**Helpers:**

- `$amqp->getConnectionConfig($connectionName)` - Get connection config (use `$amqp = app('Amqp')`)

**Note:** For `consume()`, `listen()`, `rpc()`, and all management methods, you must resolve the Amqp instance from the container using `$amqp = app('Amqp')` or `$amqp = resolve('Amqp')`. The static facade `Amqp::` works for `publish()` but not for `consume()` and other instance methods.

Kubernetes &amp; Cloud Native
-----------------------------

[](#kubernetes--cloud-native-1)

### Liveness / readiness probes

[](#liveness--readiness-probes)

Two complementary surfaces — HTTP routes for sidecars and a CLI for exec probes — both backed by the same `HealthState` + `HealthCheck` pair.

#### 1. HTTP routes

[](#1-http-routes)

Enable in `config/amqp.php` (or via `AMQP_PROBES_ENABLED=true`):

```
'probes' => [
    'enabled' => true,
    'prefix' => 'amqp/health',     // GET /amqp/health/live, /ready, /
    'middleware' => [],             // optional middleware (e.g. ['api'])
    'state_file' => storage_path('framework/amqp-health.json'),
    'heartbeat_age' => 60,          // seconds before liveness flips to 503
    'queues' => ['orders', 'orders.dlq'],
    'max_backlog' => 5000,
],
```

The service provider registers:

MethodPathResponseGET`/amqp/health/live`200 alive / 503 deadGET`/amqp/health/ready`200 ready / 503 not readyGET`/amqp/health/`combined snapshotWire it from your consumer:

```
use Bschmitt\Amqp\Support\ConsumerLifecycle;
use Bschmitt\Amqp\Support\HealthState;

$lifecycle = (new ConsumerLifecycle())
    ->withHealth(HealthState::instance(storage_path('framework/amqp-health.json')))
    ->registerSignalHandlers();

Amqp::consumeWithLifecycle('orders', $handler, $lifecycle);
```

#### 2. CLI exec probe (sidecar / `livenessProbe.exec.command`)

[](#2-cli-exec-probe-sidecar--livenessprobeexeccommand)

```
# Readiness (default)
php artisan amqp:health
php artisan amqp:health --queue=orders --backlog=1000

# Liveness
php artisan amqp:health --probe=live --heartbeat-age=30

# Combined snapshot
php artisan amqp:health --all --state-file=/var/run/amqp-health.json
```

Exit codes: `0` = healthy, `1` = unhealthy — exactly what `livenessProbe.exec` / `readinessProbe.exec` expect.

### Consumer autoscaling recommendations

[](#consumer-autoscaling-recommendations)

`AutoscalingAdvisor` is a pure function that turns a `QueueMetrics` snapshot into a recommended replica count and a ready-to-paste KEDA trigger:

```
use Bschmitt\Amqp\Support\AutoscalingAdvisor;

$metrics = Amqp::queueMetrics('orders');

$advice = (new AutoscalingAdvisor())
    ->messagesPerConsumer(100)
    ->maxLagSeconds(15.0)
    ->minReplicas(1)
    ->maxReplicas(20)
    ->advise($metrics);

// $advice['desired_consumers'] => 4
// $advice['action']            => 'scale_up'
// $advice['reasons']           => ['depth 350 / 100 ...', 'lag 20s > 15s -> +1 ...']
// $advice['keda']              => KEDA RabbitMQ trigger spec
```

CLI form:

```
php artisan amqp:scale orders orders.priority \
    --per-consumer=100 --max=20 --lag-seconds=15

php artisan amqp:scale orders --keda     # emit only the KEDA trigger
php artisan amqp:scale orders --json --fail-on-scale-up  # CI-friendly
```

The `--keda` output drops straight into a `ScaledObject` manifest under `spec.triggers`.

### Laravel Cloud / managed hosting compatibility

[](#laravel-cloud--managed-hosting-compatibility)

`LaravelCloud` detects managed environments (Laravel Cloud, Forge, Vapor, Render, Fly.io) and, when `amqp.cloud.auto_hydrate` is true (default), parses an `AMQP_URL` / `CLOUDAMQP_URL` / `RABBITMQ_URL` DSN into the active connection block on `register()` — without overwriting explicit config:

```
AMQP_URL=amqps://app:secret@rabbit.cloudamqp.com/%2Fprod
```

Explicit `AMQP_HOST` / `AMQP_USER` / etc. still win. You can also call the detector directly:

```
use Bschmitt\Amqp\Support\LaravelCloud;

if (LaravelCloud::isHosted()) {
    logger()->info('amqp hosted env', LaravelCloud::summary());
}

$props = LaravelCloud::parseDsn(env('AMQP_URL'));
```

### Multi-region deployment support

[](#multi-region-deployment-support)

Configure region-scoped connection keys, then resolve / fail over with locality preference:

```
// config/amqp.php
'regions' => [
    'enabled' => true,
    'connections' => ['production-us', 'production-eu', 'production-apac'],
    'primary' => null,           // null = match LARAVEL_CLOUD_REGION/AWS_REGION
    'cooldown_seconds' => 30,
],
```

```
use Bschmitt\Amqp\Support\MultiRegionConnection;

$resolver = app(MultiRegionConnection::class);

// Single attempt with locality preference
$connectionKey = $resolver->pick();              // 'production-us'

// Run a publish across regions until one succeeds
$resolver->withFailover(function ($region) {
    Amqp::publish('orders.created', $payload, ['use' => $region]);
});

// Fan-out to every region (e.g. announcements)
foreach ($resolver->each() as $region) {
    Amqp::publish('events.maintenance', $payload, ['use' => $region]);
}
```

Failed regions cool down for the configured window before re-entering rotation.

---

Backward Compatibility
----------------------

[](#backward-compatibility)

Version 3.4.0 is fully backward compatible with previous versions. All existing code will continue to work without modifications.

Contributing
------------

[](#contributing)

Contributions are welcome! Please feel free to submit a Pull Request.

Credits
-------

[](#credits)

- Some concepts were used from [mookofe/tail](https://github.com/mookofe/tail)
- Built and tested with `rabbitmq:3-management` Docker image

License
-------

[](#license)

This package is open-sourced software licensed under the [MIT license](http://opensource.org/licenses/MIT).

Support
-------

[](#support)

For issues, questions, or contributions:

- GitHub Issues:
- Documentation: See `docs/` directory
- FAQ: [docs/laravel-amqp.wiki/FAQ.md](docs/laravel-amqp.wiki/FAQ.md)

---

**Version:** 3.4.0
**Status:** Ready

###  Health Score

71

—

ExcellentBetter than 100% of packages

Maintenance92

Actively maintained with recent releases

Popularity62

Solid adoption and visibility

Community37

Small or concentrated contributor base

Maturity79

Established project with proven stability

 Bus Factor2

2 contributors hold 50%+ of commits

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~134 days

Recently: every ~39 days

Total

28

Last Release

32d ago

Major Versions

1.2.6 → 2.0.02018-06-01

2.1.2 → v3.1.02025-12-11

PHP version history (4 changes)1.2.1PHP &gt;=5.5.9

2.0.0PHP &gt;=7.0

2.1.0PHP ^7.3|~8.0.0

2.1.1PHP ^7.3|^8.0

### Community

Maintainers

![](https://www.gravatar.com/avatar/15900e637cce6b1ab00b998d0b88051d5d17385cad7cbd1f3d9cfe2732bcd5ed?d=identicon)[bschmitt](/maintainers/bschmitt)

---

Top Contributors

[![zfhassaan](https://avatars.githubusercontent.com/u/17079656?v=4)](https://github.com/zfhassaan "zfhassaan (110 commits)")[![bschmitt](https://avatars.githubusercontent.com/u/239644?v=4)](https://github.com/bschmitt "bschmitt (55 commits)")[![petekelly](https://avatars.githubusercontent.com/u/1177933?v=4)](https://github.com/petekelly "petekelly (6 commits)")[![mirkojotic](https://avatars.githubusercontent.com/u/10652272?v=4)](https://github.com/mirkojotic "mirkojotic (6 commits)")[![stevenklar](https://avatars.githubusercontent.com/u/379650?v=4)](https://github.com/stevenklar "stevenklar (5 commits)")[![ni-bschmitt](https://avatars.githubusercontent.com/u/58559970?v=4)](https://github.com/ni-bschmitt "ni-bschmitt (4 commits)")[![SpaceK33z](https://avatars.githubusercontent.com/u/533616?v=4)](https://github.com/SpaceK33z "SpaceK33z (4 commits)")[![xaviapa](https://avatars.githubusercontent.com/u/8439057?v=4)](https://github.com/xaviapa "xaviapa (3 commits)")[![junaidnasir](https://avatars.githubusercontent.com/u/10703810?v=4)](https://github.com/junaidnasir "junaidnasir (3 commits)")[![jwkblades](https://avatars.githubusercontent.com/u/517211?v=4)](https://github.com/jwkblades "jwkblades (3 commits)")[![dlpro](https://avatars.githubusercontent.com/u/53492833?v=4)](https://github.com/dlpro "dlpro (3 commits)")[![Cellard](https://avatars.githubusercontent.com/u/1220316?v=4)](https://github.com/Cellard "Cellard (3 commits)")[![hertzigger](https://avatars.githubusercontent.com/u/4991108?v=4)](https://github.com/hertzigger "hertzigger (2 commits)")[![Pe46dro](https://avatars.githubusercontent.com/u/6197075?v=4)](https://github.com/Pe46dro "Pe46dro (2 commits)")[![smartyaunt](https://avatars.githubusercontent.com/u/12381885?v=4)](https://github.com/smartyaunt "smartyaunt (2 commits)")[![joskfg](https://avatars.githubusercontent.com/u/524887?v=4)](https://github.com/joskfg "joskfg (2 commits)")[![josemanuel-cardona](https://avatars.githubusercontent.com/u/196229448?v=4)](https://github.com/josemanuel-cardona "josemanuel-cardona (2 commits)")[![lukebakken](https://avatars.githubusercontent.com/u/514926?v=4)](https://github.com/lukebakken "lukebakken (1 commits)")[![alupuleasa](https://avatars.githubusercontent.com/u/26110108?v=4)](https://github.com/alupuleasa "alupuleasa (1 commits)")[![dennisgon](https://avatars.githubusercontent.com/u/6257311?v=4)](https://github.com/dennisgon "dennisgon (1 commits)")

---

Tags

laravelpackagelumenqueuerabbitmqmessage queueAMQPlaravel5Björn Schmittbschmitt

###  Code Quality

TestsPHPUnit

Code StylePHP\_CodeSniffer

### Embed Badge

![Health badge](/badges/bschmitt-laravel-amqp/health.svg)

```
[![Health](https://phpackages.com/badges/bschmitt-laravel-amqp/health.svg)](https://phpackages.com/packages/bschmitt-laravel-amqp)
```

###  Alternatives

[mookofe/tail

RabbitMQ and PHP client for Laravel and Lumen that allows you to add and listen queues messages just simple

5252.9k](/packages/mookofe-tail)[nuwber/rabbitevents

The Nuwber RabbitEvents package

122529.7k4](/packages/nuwber-rabbitevents)[harris21/laravel-fuse

Circuit breaker for Laravel queue jobs. Protect your workers from cascading failures.

44855.7k](/packages/harris21-laravel-fuse)[iamfarhad/laravel-rabbitmq

Native ext-amqp RabbitMQ queue driver for Laravel production workloads with connection pooling, publisher confirms, Horizon support, Octane support, quorum queues, and high-performance workers

3319.5k](/packages/iamfarhad-laravel-rabbitmq)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
