PHPackages                             jardisadapter/messaging - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. jardisadapter/messaging

ActiveLibrary[Queues &amp; Workers](/categories/queues)

jardisadapter/messaging
=======================

Multi-transport messaging for Redis, Kafka, RabbitMQ, and Database with automatic serialization, consumer groups, and failover

v1.0.0(2mo ago)0761proprietaryPHPPHP &gt;=8.2CI passing

Since Mar 18Pushed 2mo agoCompare

[ Source](https://github.com/jardisAdapter/messaging)[ Packagist](https://packagist.org/packages/jardisadapter/messaging)[ Docs](https://github.com/jardisAdapter/messaging)[ RSS](/packages/jardisadapter-messaging/feed)WikiDiscussions main Synced 1mo ago

READMEChangelog (1)Dependencies (9)Versions (6)Used By (1)

Jardis Messaging
================

[](#jardis-messaging)

[![Build Status](https://github.com/jardisAdapter/messaging/actions/workflows/ci.yml/badge.svg)](https://github.com/jardisAdapter/messaging/actions/workflows/ci.yml/badge.svg)[![License: PolyForm Shield](https://camo.githubusercontent.com/d8fb46c82be4c5312bf3e372ac734dfdf6a8b328e9c2b2856af671adbb0600a5/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f4c6963656e73652d506f6c79466f726d253230536869656c642d626c75652e737667)](LICENSE.md)[![PHP Version](https://camo.githubusercontent.com/a68b290dcc313d698dc138a1111aa83eee2f143605449d7e8b5416ea6f88558f/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f5048502d253345253344382e322d3737374242342e737667)](https://www.php.net/)[![PHPStan Level](https://camo.githubusercontent.com/c51bda247654363d3e30bc352674dd761a9557803a14af0226eb411d6dc0006b/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f5048505374616e2d4c6576656c253230382d627269676874677265656e2e737667)](phpstan.neon)[![PSR-12](https://camo.githubusercontent.com/34b10db0caa29bacd49bda5c437a8de95385f036f3230b31fa605326e18da22c/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f436f64652532305374796c652d5053522d2d31322d626c75652e737667)](phpcs.xml)

> Part of the **[Jardis Business Platform](https://jardis.io)** — Enterprise-grade PHP components for Domain-Driven Design

Multi-transport messaging with a unified API for Redis, Kafka, RabbitMQ, Database, and InMemory. `ConnectionFactory` creates transport-specific connections; `PublisherFactory` and `ConsumerFactory` produce typed publishers and consumers from those connections. `MessagePublisher` and `MessageConsumer` are immutable facades that accept one or more transport instances via constructor injection and provide automatic serialization, priority-based failover, and graceful shutdown.

---

Features
--------

[](#features)

- **4 Transports + InMemory** — Redis (Pub/Sub and Streams), Kafka, RabbitMQ, Database (PDO), InMemory (testing)
- **Unified Publish/Consume API** — `publish(topic, message)` and `consume(topic, handler)` across all transports
- **Immutable Facades** — `MessagePublisher` and `MessageConsumer` via variadic constructor injection
- **Automatic Serialization** — Arrays and objects encoded to JSON on publish, decoded transparently on consume
- **Consumer Groups** — Redis Streams and Kafka for horizontal scaling
- **Priority Failover** — Constructor order determines priority; first healthy transport wins
- **Lazy Connection** — `MessagingService` defers publisher and consumer creation until first use
- **Database Transport** — PDO-based messaging with Point-to-Point and Fan-Out modes, no external broker required
- **External Connections** — Wrap existing Redis, PDO, AMQP, or Kafka clients via `ConnectionFactory::from*()`
- **Message Validation** — Payload validation before transmission via `MessageValidator`
- **Graceful Shutdown** — SIGTERM/SIGINT handling enabled automatically on `consume()`

---

Installation
------------

[](#installation)

```
composer require jardisadapter/messaging
```

**Optional extensions** (install only what you need):

- `ext-redis` — Redis Streams/Pub-Sub transport
- `ext-rdkafka` — Apache Kafka transport
- `ext-amqp` — RabbitMQ transport

PDO (Database transport) is always available via `ext-pdo`.

---

Quick Start
-----------

[](#quick-start)

```
use JardisAdapter\Messaging\MessagePublisher;
use JardisAdapter\Messaging\MessageConsumer;
use JardisAdapter\Messaging\Factory\ConnectionFactory;
use JardisAdapter\Messaging\Factory\PublisherFactory;
use JardisAdapter\Messaging\Factory\ConsumerFactory;
use JardisAdapter\Messaging\Handler\CallbackHandler;

$connFactory = new ConnectionFactory();
$pubFactory  = new PublisherFactory();
$conFactory  = new ConsumerFactory();

// Create and share a Redis connection
$redisConn = $connFactory->redis('localhost', 6379);

// Publish
$publisher = new MessagePublisher($pubFactory->redis($redisConn));
$publisher->publish('orders', ['order_id' => 42, 'total' => 99.99]);

// Consume
$consumer = new MessageConsumer($conFactory->redis($redisConn));
$consumer->consume('orders', new CallbackHandler(function (string|array $message, array $metadata): bool {
    // $message = ['order_id' => 42, 'total' => 99.99]  (auto-deserialized)
    return true; // true = ACK, false = reject/requeue
}));
```

---

Priority Failover
-----------------

[](#priority-failover)

Constructor order determines priority — first argument is tried first. On `MessageException`, the next transport is used automatically.

```
$primary = $connFactory->redis('redis-primary');
$secondary = $connFactory->redis('redis-secondary');

$publisher = new MessagePublisher(
    $pubFactory->redis($primary),     // tried first
    $pubFactory->redis($secondary),   // fallback if primary fails
);
$publisher->publish('orders', ['order_id' => 42]);
```

---

Transports
----------

[](#transports)

### Redis

[](#redis)

Supports both Pub/Sub (default) and Streams mode.

```
$redisConn = $connFactory->redis('localhost', 6379);

// Pub/Sub (default)
$publisher = new MessagePublisher($pubFactory->redis($redisConn));

// Streams
$publisher = new MessagePublisher($pubFactory->redis($redisConn, useStreams: true));

// Consumer groups (Streams only)
$consumer = new MessageConsumer($conFactory->redis($redisConn, useStreams: true));
$consumer->consume('orders', $handler, [
    'group'    => 'order-processors',  // auto-created if missing
    'consumer' => 'worker-1',
    'block'    => 5000,
    'count'    => 1,
]);
```

### Kafka

[](#kafka)

Separate connection types for producer and consumer. Consumer group ID is configured on the connection.

```
// Producer
$kafkaConn = $connFactory->kafka('kafka:9092');
$publisher = new MessagePublisher($pubFactory->kafka($kafkaConn));
$publisher->publish('invoices', ['invoice_id' => 7], ['key' => 'partition-key']);

// Consumer (groupId is part of connection)
$kafkaConsumerConn = $connFactory->kafkaConsumer('kafka:9092', 'invoice-processor');
$consumer = new MessageConsumer($conFactory->kafka($kafkaConsumerConn));
$consumer->consume('invoices', $handler);
```

### RabbitMQ

[](#rabbitmq)

Queue-based messaging with automatic ACK/NACK handling.

```
$rabbitConn = $connFactory->rabbitMq('localhost', 5672, 'guest', 'guest');

$publisher = new MessagePublisher($pubFactory->rabbitMq($rabbitConn));
$publisher->publish('order.created', ['orderId' => 42]);

$consumer = new MessageConsumer($conFactory->rabbitMq($rabbitConn, 'order-queue'));
$consumer->consume('order.created', $handler);
```

### Database (PDO)

[](#database-pdo)

No external broker required — uses the application's existing database. Supports MySQL, PostgreSQL, and SQLite.

```
use JardisAdapter\Messaging\Config\DatabaseTransportOptions;

$dbConn = $connFactory->database('mysql:host=localhost;dbname=app', 'user', 'pass');

$options = new DatabaseTransportOptions(
    table: 'domain_events',
    deleteAfterProcessing: false,  // soft delete (default)
    pollingIntervalMs: 1000,
    batchSize: 10,
    maxAttempts: 3,
);

$publisher = new MessagePublisher($pubFactory->database($dbConn, $options));
$consumer = new MessageConsumer($conFactory->database($dbConn, $options));

// Point-to-Point (default): one consumer per event
$consumer->consume('OrderCreated', $handler);

// Fan-Out: multiple consumer groups process the same event
$consumer->consume('InvoiceCreated', $handler, ['group' => 'email-service']);
$consumer->consume('InvoiceCreated', $handler, ['group' => 'pdf-service']);
```

Schema: `src/Schema/domain_events.sql`

### InMemory (Testing)

[](#inmemory-testing)

Synchronous in-memory transport for unit and integration tests.

```
use JardisAdapter\Messaging\Transport\InMemoryTransport;

$transport = new InMemoryTransport();

$publisher = new MessagePublisher($pubFactory->inMemory($transport));
$consumer  = new MessageConsumer($conFactory->inMemory($transport));

$publisher->publish('test', ['id' => 1]);
$transport->getMessageCount('test');  // 1

$consumer->consume('test', $handler, ['limit' => 5]);
```

---

External Connections
--------------------

[](#external-connections)

Wrap existing connections from legacy systems, DI containers, or frameworks.

```
// Existing Redis instance
$redisConn = $connFactory->fromRedis($existingRedis, manageLifecycle: false);
$publisher = new MessagePublisher($pubFactory->redis($redisConn));

// Existing PDO instance
$dbConn = $connFactory->fromPdo($existingPdo, manageLifecycle: false);

// Existing AMQP connection
$rabbitConn = $connFactory->fromAmqp($amqpConnection, exchangeName: 'custom');

// Existing Kafka producer/consumer
$kafkaProducerConn = $connFactory->fromKafkaProducer($producer, flushOnDisconnect: true);
$kafkaConsumerConn = $connFactory->fromKafkaConsumer($consumer);
```

When `manageLifecycle: false`, `disconnect()` is a no-op — the external system owns the connection lifecycle.

---

MessagingService (Lazy Loading)
-------------------------------

[](#messagingservice-lazy-loading)

Defers publisher and consumer creation until first use. Ideal for DI containers.

```
use JardisAdapter\Messaging\MessagingService;

$messaging = new MessagingService(
    publisherFactory: fn() => new MessagePublisher($pubFactory->redis($redisConn)),
    consumerFactory:  fn() => new MessageConsumer($conFactory->redis($redisConn)),
);

$messaging->publish('notifications', ['type' => 'email', 'to' => 'user@example.com']);
$messaging->consume('notifications', $handler);

$messaging->getPublisher();  // MessagePublisherInterface
$messaging->getConsumer();   // MessageConsumerInterface
```

---

Error Handling
--------------

[](#error-handling)

All exceptions extend `MessageException`:

ExceptionWhen`ConnectionException`Connection fails, `getClient()` called without `connect()``PublishException`Publishing fails, serialization error, validation failure`ConsumerException`Consumer initialization or polling fails**Publisher fallback** only triggers on `MessageException` — other exceptions propagate immediately.

**Consumer state-cleanup** is performed before re-throwing: NACK in RabbitMQ, attempt tracking in Database, requeue in InMemory.

---

Documentation
-------------

[](#documentation)

Full documentation, guides, and API reference:

**[jardis.io/docs/adapter/messaging](https://jardis.io/docs/adapter/messaging)**

License
-------

[](#license)

This package is licensed under the [PolyForm Shield License 1.0.0](LICENSE.md). Free for all use except building competing frameworks or developer tooling.

---

**[Jardis](https://jardis.io)** · [Documentation](https://jardis.io/docs) · [Headgent](https://headgent.com)

###  Health Score

42

—

FairBetter than 90% of packages

Maintenance88

Actively maintained with recent releases

Popularity13

Limited adoption so far

Community8

Small or concentrated contributor base

Maturity50

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~20 days

Total

2

Last Release

61d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/e07a1b668e9e01ee6d1b85de7b3be1c2513f68aae9494b2011d1592104d5daa0?d=identicon)[jardis](/maintainers/jardis)

---

Top Contributors

[![Headgent](https://avatars.githubusercontent.com/u/245725954?v=4)](https://github.com/Headgent "Headgent (1 commits)")

---

Tags

messagingDomain Driven DesignHeadgentjardisJardisAdapter

###  Code Quality

TestsPHPUnit

Static AnalysisPHPStan

Code StylePHP\_CodeSniffer

Type Coverage Yes

### Embed Badge

![Health badge](/badges/jardisadapter-messaging/health.svg)

```
[![Health](https://phpackages.com/badges/jardisadapter-messaging/health.svg)](https://phpackages.com/packages/jardisadapter-messaging)
```

###  Alternatives

[queue-interop/queue-interop

Promoting the interoperability of MQs objects. Based on Java JMS

48130.5M87](/packages/queue-interop-queue-interop)[bunny/bunny

Performant pure-PHP AMQP (RabbitMQ) non-blocking ReactPHP library

7426.5M37](/packages/bunny-bunny)[enqueue/enqueue-bundle

Message Queue Bundle

27615.6M38](/packages/enqueue-enqueue-bundle)[enqueue/enqueue

Message Queue Library

19820.0M56](/packages/enqueue-enqueue)[enqueue/amqp-tools

Message Queue Amqp Tools

14721.1M12](/packages/enqueue-amqp-tools)[enqueue/amqp-lib

Message Queue Amqp Transport

1078.5M61](/packages/enqueue-amqp-lib)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
