PHPackages                             airygen/laravel-amqp-producer - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. airygen/laravel-amqp-producer

ActiveLibrary[Queues &amp; Workers](/categories/queues)

airygen/laravel-amqp-producer
=============================

RabbitMQ publisher for Laravel with confirms, retries, DLX strategy.

1.0.2(7mo ago)05MITPHPPHP &gt;=8.2CI passing

Since Sep 25Pushed 7mo agoCompare

[ Source](https://github.com/airygen/laravel-amqp-producer)[ Packagist](https://packagist.org/packages/airygen/laravel-amqp-producer)[ RSS](/packages/airygen-laravel-amqp-producer/feed)WikiDiscussions master Synced 1mo ago

READMEChangelog (1)Dependencies (6)Versions (2)Used By (0)

Laravel AMQP Producer
=====================

[](#laravel-amqp-producer)

Laravel-oriented RabbitMQ publisher with:

- Publisher confirms
- Mandatory publish (detect unroutable messages)
- Exponential retry for transient AMQP errors
- Structured automatic headers: `request_id`, `source`, `env`, ISO8601 `datetime`
- Multi-connection support (choose connection per payload)

This library focuses on publishing only. For queue workers / consumers you can use: [vyuldashev/laravel-queue-rabbitmq](https://github.com/vyuldashev/laravel-queue-rabbitmq).

Installation
------------

[](#installation)

```
composer require airygen/laravel-amqp-producer
```

Publish Configuration
---------------------

[](#publish-configuration)

```
php artisan vendor:publish --provider="Airygen\\RabbitMQ\\RabbitMQServiceProvider" --tag=config
```

Generated file: `config/amqp.php`

```
return [
    'retry' => [
        'base_delay' => 0.2,
        'max_delay' => 1.5,
        'jitter' => false, // set true to randomize backoff (helps avoid thundering herd)
    ],
    'connections' => [
        'default' => [
            'host' => env('AMQP_HOST', '127.0.0.1'),
            'port' => (int) env('AMQP_PORT', 5672),
            'user' => env('AMQP_USER', 'guest'),
            'password' => env('AMQP_PASSWORD', 'guest'),
            'vhost' => env('AMQP_VHOST', '/'),
            'options' => [
                'lazy' => true,
                'keepalive' => true,
                'heartbeat' => 60,
                // Channel reuse removed: each publish opens/closes channel and connection
            ],
        ],
        // Add more named connections if needed
        // 'analytics' => [ ... ],
    ],
];
```

Defining a Payload
------------------

[](#defining-a-payload)

Extend `ProducerPayload` and (optionally) override connection / exchange / routing key.

```
use Airygen\RabbitMQ\ProducerPayload;

final class MemberCreatedPayload extends ProducerPayload
{
    protected string $connectionName = 'default';              // optional (defaults to 'default')
    protected ?string $exchangeName = 'ex.members';            // required if you publish to a non-empty exchange
    protected ?string $routingKey = 'member.created';          // required for direct/topic exchanges
}
```

Basic Publish
-------------

[](#basic-publish)

```
use Airygen\RabbitMQ\Publisher;

$publisher = app(Publisher::class);
$publisher->publish(new MemberCreatedPayload(['id' => 123]));
```

Custom Headers
--------------

[](#custom-headers)

```
$publisher->publish(
    new MemberCreatedPayload(['id' => 123]),
    header: ['foo' => 'bar']
);
```

Batch Publish
-------------

[](#batch-publish)

```
$payloads = [
    new MemberCreatedPayload(['id' => 1]),
    new MemberCreatedPayload(['id' => 2]),
];

$publisher->batchPublish($payloads);
```

Retry Strategy
--------------

[](#retry-strategy)

The publisher retries transient AMQP IO / protocol errors (IO / protocol channel exceptions) with exponential backoff:

- Initial delay `base_delay` (~200ms), doubled each attempt, capped at `max_delay` (~1500ms)
- Default attempts: 3

Custom rule:

```
$publisher->publish(
    new MemberCreatedPayload(['id' => 1]),
    retryTimes: 5,
    when: function (Throwable $e): bool {
        return $e instanceof PhpAmqpLib\Exception\AMQPIOException
            || str_contains($e->getMessage(), 'timeout');
    }
);
```

Disable retry: Enable jitter:

```
config(['amqp.retry.jitter' => true]);
$publisher->publish($payload);
```

Jitter multiplies each delay by a random factor ~0.85 - 1.15.

```
$publisher->publish(
    new MemberCreatedPayload(['id' => 1]),
    retryTimes: 1,
    when: fn() => false
);
```

Multi-Connection Example
------------------------

[](#multi-connection-example)

```
final class AnalyticsEventPayload extends ProducerPayload
{
    protected string $connectionName = 'analytics';
    protected ?string $exchangeName = 'ex.analytics';
    protected ?string $routingKey = 'event.ingest';
}

$publisher->publish(new AnalyticsEventPayload(['type' => 'login']));
```

Automatic Headers Added
-----------------------

[](#automatic-headers-added)

`MessageFactory` injects:

- `request_id` (existing `X-Request-Id` header or a new UUID)
- `source` (Laravel app name)
- `env` (current environment)
- `datetime` (ISO8601)

You can still provide additional custom headers; your keys override defaults if duplicated.

Header precedence: custom headers provided in `publish()` / `batchPublish()` override automatically injected keys when the same key exists.

Metrics &amp; Stats
-------------------

[](#metrics--stats)

The package ships with an in-memory static counter registry `Stats` intended for lightweight instrumentation or exporting into your own monitoring system.

Global counters:

- `publish_attempts`
- `publish_retries`
- `publish_failures`
- `connection_resets`

Per‑connection counters (nested under `per_connection[connection_name]`):

- `publish_attempts`
- `publish_retries`
- `publish_failures`
- `connection_resets`

Example snapshot:

```
use Airygen\RabbitMQ\Support\Stats;
$snapshot = Stats::snapshot();
// [ 'publish_attempts' => 10, 'per_connection' => ['default' => ['publish_attempts' => 7]] ]
```

You can reset counters (e.g. at the start of a test or scheduled export) with:

```
Stats::reset();
```

For production telemetry, consider periodically reading the snapshot and pushing to Prometheus / OpenTelemetry.\*

> Note: These counters are process‑local (not shared across workers). If you run Octane/Swoole multi-worker, aggregate externally.

Behavior: Always Open/Close
---------------------------

[](#behavior-always-openclose)

For operational safety across PHP-FPM/CLI and worker runtimes (Octane/Swoole/RoadRunner), this package always opens a fresh channel for each publish and closes both channel and connection afterwards.

Octane / Swoole / RoadRunner
----------------------------

[](#octane--swoole--roadrunner)

Health Check Command
--------------------

[](#health-check-command)

Run a simple connectivity probe:

```
php artisan rabbitmq:ping            # test all configured connections
php artisan rabbitmq:ping secondary  # test a specific connection
```

Exit code is non‑zero on failure (suitable for container readiness / liveness probes). Long-lived worker environments reuse PHP processes, so you must ensure stale connections/channels don't leak across deploys or forks.

Built-in safeguards:

- On worker start/stop (Octane events) the connection manager `reset()` is invoked (if Octane is installed).
- Connections are opened and closed per publish; you can call `ConnectionManager::reset()` manually if desired.

Recommended practices:

1. Avoid holding a `Publisher` instance in static singletons you construct before workers fork.
2. Call `app(ProducerInterface::class)` per request/job (container will reuse safe singleton manager underneath).
3. If you rotate workers periodically, no extra action is needed—the hook already clears state.
4. For Swoole without Octane events, you can manually schedule: `ConnectionManager::reset()` during your custom lifecycle hooks.

Optional manual reset example:

```
// e.g. in a scheduled task or health hook
app(\Airygen\RabbitMQ\Support\ConnectionManager::class)->reset();
```

TLS / SSL (Optional)
--------------------

[](#tls--ssl-optional)

If you need TLS encryption, enable and configure the SSL related options inside `config/amqp.php`:

```
    'connections' => [
        'default' => [
            // ... host, port, user, password, vhost
            'options' => [
                'ssl' => true,
                'cafile' => base_path('certs/ca.pem'),
                'local_cert' => base_path('certs/client.pem'),
                'local_pk' => base_path('certs/client.key'),
                'verify_peer' => true,
                // 'passphrase' => env('AMQP_CERT_PASSPHRASE'),
            ],
        ],
    ],
```

The factory will build a stream context when `ssl` is truthy and any of the certificate fields are present. If `verify_peer` is enabled, ensure `cafile` is supplied.

Roadmap
-------

[](#roadmap)

- Pluggable metrics exporter interface
- Circuit breaker / total backoff budget
- Async publisher confirm pipeline
- Mandatory publish return callbacks (unroutable detection)
- Dead letter / delayed publish helpers

Development
-----------

[](#development)

### Dockerized Workflow

[](#dockerized-workflow)

All commands are wrapped to run inside the `php` service defined in `docker-compose.yml`.

Startup &amp; install:

```
docker compose up -d rabbitmq
docker compose build php
docker compose run --rm php composer install
```

Using Makefile targets:

```
make unit          # run unit tests
make integration   # run integration tests (requires rabbitmq service up)
make test          # unit + integration
make coverage      # generates coverage/html & coverage/clover.xml
make lint          # code style check
make analyse       # phpstan static analysis
make fix           # auto-fix style
```

Manual (host) without Docker wrapper:

```
php -d xdebug.mode=off vendor/bin/phpunit --testsuite Unit
INTEGRATION_TESTS=1 php -d xdebug.mode=off vendor/bin/phpunit --testsuite Integration
```

Management UI:  (guest / guest)

If you prefer to run host-native (without docker) use the `host:*` scripts or call PHPUnit directly.

### Prometheus Metrics (Skeleton)

[](#prometheus-metrics-skeleton)

The package keeps lightweight in-memory counters (not persisted). A minimal Prometheus text exporter is provided.

Artisan command:

```
php artisan rabbitmq:metrics            # full HELP/TYPE + samples
php artisan rabbitmq:metrics --raw      # only metric lines
```

Example output:

```
# HELP rabbitmq_publish_attempts_total Total publish attempts (before confirm).
# TYPE rabbitmq_publish_attempts_total counter
rabbitmq_publish_attempts_total 42
... (other metrics)

```

Per-connection metrics are emitted with a `connection` label, e.g.:

```
rabbitmq_connection_publish_attempts_total{connection="primary"} 10

```

You can bind your own implementation of `Airygen\\RabbitMQ\\Contracts\\MetricsExporterInterface` if you need richer aggregation or to integrate with an existing metrics system.

License
-------

[](#license)

MIT

###  Health Score

32

—

LowBetter than 72% of packages

Maintenance62

Regular maintenance activity

Popularity4

Limited adoption so far

Community6

Small or concentrated contributor base

Maturity48

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Unknown

Total

1

Last Release

229d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/fca418a834057a4203c6856e316ca4d7c71fa1b9076977e08e5c4d5a9cc9aec6?d=identicon)[terrylinooo](/maintainers/terrylinooo)

---

Top Contributors

[![terrylinooo](https://avatars.githubusercontent.com/u/11989371?v=4)](https://github.com/terrylinooo "terrylinooo (15 commits)")

###  Code Quality

TestsPHPUnit

Static AnalysisPHPStan

Code StylePHP\_CodeSniffer

Type Coverage Yes

### Embed Badge

![Health badge](/badges/airygen-laravel-amqp-producer/health.svg)

```
[![Health](https://phpackages.com/badges/airygen-laravel-amqp-producer/health.svg)](https://phpackages.com/packages/airygen-laravel-amqp-producer)
```

###  Alternatives

[bschmitt/laravel-amqp

AMQP wrapper for Laravel and Lumen to publish and consume messages

2752.3M7](/packages/bschmitt-laravel-amqp)[stancl/jobpipeline

Turn any series of jobs into Laravel listeners.

1226.6M10](/packages/stancl-jobpipeline)[jwage/phpamqplib-messenger

Symfony messenger transport for the php-amqplib/php-amqplib library.

84149.7k1](/packages/jwage-phpamqplib-messenger)[harris21/laravel-fuse

Circuit breaker for Laravel queue jobs. Protect your workers from cascading failures.

3786.5k](/packages/harris21-laravel-fuse)[mookofe/tail

RabbitMQ and PHP client for Laravel and Lumen that allows you to add and listen queues messages just simple

5552.5k](/packages/mookofe-tail)[palpalani/laravel-sqs-queue-json-reader

Custom SQS queue reader for Laravel

26109.8k](/packages/palpalani-laravel-sqs-queue-json-reader)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
