PHPackages                             idct/symfony-nats-messenger - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. idct/symfony-nats-messenger

ActiveLibrary[Queues &amp; Workers](/categories/queues)

idct/symfony-nats-messenger
===========================

Symfony NATS (JetStream) Messenger Bridge

v5.0.0(2w ago)249.1k↑628.4%6[1 issues](https://github.com/ideaconnect/symfony-nats-messenger/issues)MITPHPPHP ^8.2CI passing

Since Nov 2Pushed 1w ago2 watchersCompare

[ Source](https://github.com/ideaconnect/symfony-nats-messenger)[ Packagist](https://packagist.org/packages/idct/symfony-nats-messenger)[ Fund](https://www.buymeacoffee.com/idct)[ GitHub Sponsors](https://github.com/ideaconnect)[ RSS](/packages/idct-symfony-nats-messenger/feed)WikiDiscussions main Synced 2d ago

READMEChangelog (9)Dependencies (22)Versions (14)Used By (0)

Symfony NATS Messenger Bridge
=============================

[](#symfony-nats-messenger-bridge)

[![PHP Version](https://camo.githubusercontent.com/e771ed2874cdbf856ba15fd6a3cdd70ca87551ef2d02a04f946ce3e3b26adabf/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f5048502d253545382e322d3738374342353f6c6f676f3d706870266c6f676f436f6c6f723d7768697465)](https://php.net)[![Symfony Version](https://camo.githubusercontent.com/201f0d32598cad1657544dbd6798ff4fc142c0a69d5a05add890e20c32bc9f29/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f53796d666f6e792d253545372e32253230253743253230253545382e302d3030303030303f6c6f676f3d73796d666f6e79266c6f676f436f6c6f723d7768697465)](https://symfony.com)[![Unit Tests Coverage](https://camo.githubusercontent.com/41b7896d6af28765cbc29cfb1660bcd5554d083a34cd7cc0cea9e436caa75511/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f436f7665726167652d39392e35382532352d627269676874677265656e)](https://github.com/ideaconnect/symfony-nats-messenger/actions)[![Mutation MSI](https://camo.githubusercontent.com/28b81237c0b608626cddab906daa41a055b04c36bb61831c5cd2228a1a26626e/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f4d75746174696f6e2532304d53492d3130302532352d627269676874677265656e)](https://infection.github.io/)[![Functional Tests](https://camo.githubusercontent.com/eb2880245c72f1bad2347758a7bb08140abde7e01262ed384f3bf63d112203dc/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f46756e6374696f6e616c25323054657374732d42656861742d626c7565)](tests/functional)[![License](https://camo.githubusercontent.com/5caa455d8debc46fb23abbadb45a733a937f3910a73fc875c2f7820468e1bb54/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f4c6963656e73652d4d49542d677265656e)](LICENSE)[![CI](https://github.com/ideaconnect/symfony-nats-messenger/actions/workflows/ci.yml/badge.svg)](https://github.com/ideaconnect/symfony-nats-messenger/actions/workflows/ci.yml)[![Made in the EU](https://raw.githubusercontent.com/ideaconnect/made-in-the-eu/main/software-badge/made-in-the-eu.svg)](https://raw.githubusercontent.com/ideaconnect/made-in-the-eu/main/software-badge/made-in-the-eu.svg)

A Symfony Messenger transport integration for [NATS JetStream](https://docs.nats.io/nats-concepts/jetstream), enabling reliable asynchronous messaging with persistent message streaming.

Features
--------

[](#features)

- 🚀 **High-Performance Messaging** - Leverage NATS JetStream for fast, reliable message delivery
- 📦 **Symfony Integration** - Implements Messenger's `TransportInterface`, `MessageCountAwareInterface`, `SetupableTransportInterface`, `KeepaliveReceiverInterface`, and `CloseableTransportInterface`
- ⚙️ **Configurable Consumers** - Support for multiple consumer strategies
- 🔄 **Flexible Batching** - Adjustable message batch sizes and timeouts
- 🔐 **Authentication Support** - Built-in support for NATS authentication
- 📊 **Stream Configuration** - Configurable retention policies and replication
- 🧪 **Thoroughly Tested** - 303 unit tests, ~99.6% coverage, mutation-tested (100% MSI)

🚀 This project looks for funding. Love my work? Support it! 💖
-------------------------------------------------------------

[](#-this-project-looks-for-funding-love-my-work-support-it-)

- ☕ **Buy me a coffee**:
- 💝 **Sponsor**:

Requirements
------------

[](#requirements)

### System Requirements

[](#system-requirements)

- **PHP**: ^8.2
- **Symfony**: ^7.2 || ^8
- **NATS Server**: ^2.9 with JetStream enabled, ^2.12 for scheduled messages support.

Installation
------------

[](#installation)

```
composer require idct/symfony-nats-messenger
```

### Development Setup

[](#development-setup)

For contributors and development:

```
# Install dependencies
composer install

# Run static analysis and the default unit test suite after every modification
composer test

# Start NATS server for testing
composer nats:start

# Run unit tests with coverage
composer test:unit

# Set up functional tests
composer test:functional:setup

# Run functional tests
composer test:functional

# Stop NATS server
composer nats:stop
```

Quick Start
-----------

[](#quick-start)

### 1. Configure NATS Server

[](#1-configure-nats-server)

Ensure your NATS server has JetStream enabled:

```
nats-server -js
```

### 2. Set Up Transport in Symfony

[](#2-set-up-transport-in-symfony)

Add the NATS transport to your Symfony Messenger configuration:

```
# config/packages/messenger.yaml
framework:
  messenger:
    transports:
      nats_transport:
        dsn: 'nats-jetstream://localhost:4222/my-stream/my-topic'
        options:
          consumer: 'my-consumer'
          batching: 5
          max_batch_timeout: 1.0

    routing:
      'App\Message\MyAsyncMessage': nats_transport
```

> **Tested by:** `testReadmeDsnExamplesParseSuccessfully[README: quick-start transport]`, `testReadmeConfigurationOptionsAreAccepted`, `testReadmeBatchingExamplesAreAccepted`, `testReadmeTimeoutExamplesAreAccepted`

### 3. Configure Custom Serializers (Optional)

[](#3-configure-custom-serializers-optional)

This transport ships with a high-performance `IgbinarySerializer`, but under the Symfony framework it is **not** selected automatically. Symfony Messenger always resolves a serializer and passes it to the transport factory - its framework default is the native `PhpSerializer` - so to use igbinary you must set the transport's `serializer:` key explicitly (shown below). The transport's *own* igbinary auto-selection (and the `PhpSerializer` fallback, with an `E_USER_WARNING`, when `ext-igbinary` is unavailable) only applies when you construct `NatsTransport` directly without passing a serializer, e.g. outside the framework.

#### Using IgbinarySerializer (recommended)

[](#using-igbinaryserializer-recommended)

```
# config/packages/messenger.yaml
framework:
  messenger:
    transports:
      nats_transport:
        dsn: 'nats-jetstream://localhost:4222/my-stream/my-topic'
        serializer: 'IDCT\NatsMessenger\Serializer\IgbinarySerializer'
        options:
          consumer: 'my-consumer'
```

Register the serializer service the `serializer:` key refers to. For example:

```
    igbinary_serializer:
        class: IDCT\NatsMessenger\Serializer\IgbinarySerializer
```

or:

```
    IDCT\NatsMessenger\Serializer\IgbinarySerializer: ~
```

> **Tested by:** `createTransport_UsesProvidedSerializer`, `serialize_WithValidEnvelope_ReturnsSerializedString`, `decode_WithValidEncodedEnvelope_ReturnsEnvelope`, `testConstructorWithoutIgbinaryDoesNotCrash`

#### Creating Custom Serializers

[](#creating-custom-serializers)

You can create your own serializer by extending `AbstractEnveloperSerializer`:

```
use IDCT\NatsMessenger\Serializer\AbstractEnveloperSerializer;
use Symfony\Component\Messenger\Envelope;

class MyCustomSerializer extends AbstractEnveloperSerializer
{
    protected function serialize(Envelope $envelope): string
    {
        // Your custom serialization logic
        return serialize($envelope);
    }

    protected function deserialize(string $data): mixed
    {
        // Your custom deserialization logic
        return unserialize($data);
    }
}
```

> **Tested by:** `readmeCustomSerializerExample_EncodeDecode_RoundTrips`, `readmeCustomSerializerExample_DecodeInvalidBody_ThrowsException` - the exact code above is compiled and exercised via `ReadmeExampleSerializer` in the unit tests.

For reference implementations, see:

- `src/Serializer/IgbinarySerializer.php` - Binary serialization
- `src/Serializer/AbstractEnveloperSerializer.php` - Base class

### 4. Send Messages

[](#4-send-messages)

```
use App\Message\MyAsyncMessage;
use Symfony\Component\Messenger\MessageBusInterface;

class MyController
{
    public function __construct(private MessageBusInterface $bus) {}

    public function send(): void
    {
        $this->bus->dispatch(new MyAsyncMessage('Hello NATS!'));
    }
}
```

> **Tested by:** `testSendPublishesEncodedBodyWithoutHeaders`, `testSendUsesPublishWithHeadersWhenHeadersArePresent`, Behat scenario `Complete message flow - send, check stats, consume, verify`

### 5. Handle Messages

[](#5-handle-messages)

```
use App\Message\MyAsyncMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class MyAsyncMessageHandler
{
    public function __invoke(MyAsyncMessage $message): void
    {
        echo "Processing: " . $message->getText();
    }
}
```

> **Tested by:** Behat scenarios `Complete message flow - send, check stats, consume, verify`, `Send and consume messages with a custom consumer name`, and `High-volume message processing with file output verification` - handlers are exercised through real `messenger:consume` runs.

### 6. Consume Messages

[](#6-consume-messages)

```
symfony console messenger:consume nats_transport
```

> **Tested by:** Behat scenarios `Complete message flow - send, check stats, consume, verify`, `Send and consume messages with a custom consumer name`, and `Partial message consumption with multiple consumers` - the Behat context runs `messenger:consume` as a Symfony CLI process.

Configuration Guide
-------------------

[](#configuration-guide)

### DSN Format

[](#dsn-format)

```
nats-jetstream://[user:password@]host:port/stream-name/topic-name

```

> **Tested by:** `testBuildWithValidDsnReturnsConfiguration`, `testBuildWithoutPathThrowsException`, `testBuildWithoutTopicThrowsException`, `createTransport_WithValidDsn_ReturnsNatsTransportInstance`

**Examples:**

```
# Default port (4222)
nats-jetstream://localhost/my-stream/my-topic

# Custom port
nats-jetstream://localhost:5000/my-stream/my-topic

# With authentication
nats-jetstream://user:password@localhost:4222/my-stream/my-topic

# With query parameters
nats-jetstream://localhost/my-stream/my-topic?consumer=worker&batching=10

# TLS transport scheme
nats-jetstream+tls://localhost:4222/my-stream/my-topic
```

> **Tested by:** `testReadmeDsnExamplesParseSuccessfully` - each DSN above is parsed through the configuration builder via a dedicated data provider case.

### Configuration Options

[](#configuration-options)

```
framework:
  messenger:
    transports:
      nats_transport:
        dsn: 'nats-jetstream://localhost:4222/my-stream/my-topic'
        options:
          # Consumer Configuration
          consumer: 'my-consumer'           # Consumer group name (default: 'client')

          # Performance Tuning
          batching: 5                       # Messages per batch (default: 1)
          max_batch_timeout: 1.0            # Timeout in seconds for batch fetching (default: 1)
          connection_timeout: 1.0           # Connection (dial) timeout in seconds (default: 1)

          # Stream Retention Policies
          stream_max_age: 86400             # Max message age in seconds (0 = unlimited, default: 0)
          stream_max_bytes: 1073741824      # Max storage size in bytes (null = unlimited)
          stream_max_messages: 1000000      # Max number of messages in the stream (null = unlimited)
          stream_max_messages_per_subject: 1000 # Max number of messages retained per subject (null = unlimited)

          # Storage Backend
          stream_storage: 'file'            # Storage type: 'file' or 'memory' (default: 'file')

          # High Availability
          stream_replicas: 1                # Number of replicas (default: 1)

          # Failure Handling Strategy
          retry_handler: 'symfony'          # symfony|nats (default: symfony)
                                            # symfony => TERM on failed/rejected message
                                            # nats    => NAK on failed/rejected message

          # NATS-native Redelivery Tuning (mainly relevant with retry_handler: nats)
          nak_delay: 0                      # Seconds to wait before NATS redelivers a NAK'd
                                            # message (default: 0 = immediate). Use to back off
                                            # instead of hot-looping a failing message.
          ack_wait: null                    # Seconds JetStream waits for an ACK before redelivering
                                            # (default: null = server default, ~30s). Raise it for
                                            # slow handlers to avoid premature redelivery.
          max_deliver: null                 # Max redelivery attempts before NATS gives up
                                            # (default: null = unlimited). Set it to stop a poison
                                            # message redelivering forever under retry_handler: nats.
          backoff: null                     # List of per-attempt delays in seconds, e.g. [1, 5, 30].
                                            # Pairs with max_deliver. (default: null)

          # Acknowledgement Mode
          ack_sync: false                   # Wait for server confirmation of each ACK (default: false)
                                            # false => fire-and-forget ACK (lower latency)
                                            # true  => JetStream double-ack; a dropped ACK cannot
                                            #          silently cause redelivery, at a latency cost

          # Scheduled / Delayed Messages (requires NATS >= 2.12)
          scheduled_messages: false         # Enable scheduled message support (default: false)
                                            # When enabled, Symfony DelayStamp triggers NATS
                                            # scheduled message delivery via Nats-Schedule headers

          # TLS Configuration
          tls_required: false               # Force TLS for NATS connection (default: false)
          tls_handshake_first: false        # Use TLS-first handshake mode (default: false)
          tls_ca_file: null                 # Path to CA certificate file
          tls_cert_file: null               # Path to client certificate file
          tls_key_file: null                # Path to client private key
          tls_key_passphrase: null          # Passphrase for encrypted private key
          tls_peer_name: null               # Override TLS peer name for certificate validation
          tls_verify_peer: true             # Verify TLS peer certificate (default: true)

          # Additional Authentication
          token: null                       # NATS token authentication
          username: null                    # Overrides DSN username if provided
          password: null                    # Overrides DSN password if provided
          jwt: null                         # JWT authentication value
          nkey: null                        # NKey public value
```

> **Tested by:** `testReadmeConfigurationOptionsAreAccepted` (all options above), `testReadmeBatchingExamplesAreAccepted`, `testReadmeTimeoutExamplesAreAccepted`, `testReadmeStreamRetentionExamplesAreAccepted`, `testBuildWithTlsAndAuthOptionsPropagatesToNatsOptions`

### Retry Handler Behavior

[](#retry-handler-behavior)

- `retry_handler: symfony` (default) sends `TERM` when a message fails during transport decoding or is rejected. Symfony's retry/failure transport then handles redelivery.
- `retry_handler: nats` sends `NAK` when a message fails during transport decoding or is rejected, so NATS redelivers the message itself.

When NATS manages redelivery (`retry_handler: nats`), tune it with `nak_delay`, `ack_wait`, `max_deliver`, and `backoff`:

- **`nak_delay`** delays each NAK so a failing message backs off instead of redelivering immediately (a hot loop).
- **`max_deliver`** caps redeliveries. ⚠️ Without it, `retry_handler: nats` redelivers a permanently-failing ("poison") message **forever** - set `max_deliver` in production.
- **`backoff`** sets an escalating per-attempt delay schedule (e.g. `[1, 5, 30]` seconds); pair it with `max_deliver` greater than the number of backoff steps.
- **`ack_wait`** is how long JetStream waits for an ACK before considering a delivery failed and redelivering - raise it for handlers that legitimately take a while.

> **Tested by:** `testRejectUsesTermByDefault`, `testRejectUsesNakWhenRetryHandlerIsNats`, `testHandleFailedDeliveryUsesNakWithDelayWhenConfigured`, `testSetupAppliesConsumerRetryTuning`, `testBuildAcceptsNatsRetryTuningOptions`, `testBuildUsesRetryHandlerFromQuery`, Behat scenarios `nats_nak.feature` and `nats_term.feature`

Important: Consumer Strategies
------------------------------

[](#important-consumer-strategies)

This is critical to understand before setting up multiple transport instances:

### ⚠️ Strategy A: Same Consumer, Batching = 1

[](#️-strategy-a-same-consumer-batching--1)

**Use when:** Multiple instances should cooperate on the same consumer

```
# All instances use the same consumer with batching=1
transports:
  nats_worker_1:
    dsn: 'nats-jetstream://localhost/my-stream/my-topic'
    options:
      consumer: 'shared-consumer'  # Same consumer name
      batching: 1                  # MUST be 1 for shared consumers

  nats_worker_2:
    dsn: 'nats-jetstream://localhost/my-stream/my-topic'
    options:
      consumer: 'shared-consumer'  # Same consumer name
      batching: 1                  # MUST be 1 for shared consumers
```

**Why batching must be 1:**

- With explicit acknowledge (ACK) mode, only messages that are explicitly acknowledged are considered processed
- Multiple instances sharing the same consumer need to ACK individually
- Batching &gt; 1 with multiple instances causes delivery conflicts
- Each instance should fetch and ACK one message at a time

**Benefits:**

- Automatic load balancing across instances
- NATS handles message distribution
- Guaranteed single processing per message

> **Tested by:** `testReadmeBatchingExamplesAreAccepted` (batching=1), Behat scenario `Partial message consumption with multiple consumers`

### ✅ Strategy B: Different Consumers, Any Batching

[](#-strategy-b-different-consumers-any-batching)

**Use when:** Each instance needs independent message processing (duplicates allowed)

```
# Each instance uses a different consumer
transports:
  nats_worker_1:
    dsn: 'nats-jetstream://localhost/my-stream/my-topic'
    options:
      consumer: 'worker-1-consumer'   # Unique consumer per instance
      batching: 10                    # Can use any batching

  nats_worker_2:
    dsn: 'nats-jetstream://localhost/my-stream/my-topic'
    options:
      consumer: 'worker-2-consumer'   # Unique consumer per instance
      batching: 10                    # Can use any batching
```

**Why this works:**

- Each consumer maintains its own state
- All messages are delivered to all consumers independently
- Each instance can use higher batching for better throughput
- Duplicate processing is expected (fan-out pattern)

**Use cases:**

- Event broadcasting to multiple systems
- Multiple independent processors
- Audit logging / event replay

> **Tested by:** `testReadmeBatchingExamplesAreAccepted` (batching=10), Behat scenario `Partial message consumption with multiple consumers`

Batching &amp; Timeouts
-----------------------

[](#batching--timeouts)

### Batching Explained

[](#batching-explained)

- **Higher batching**: Better throughput, slightly higher latency
- **Lower batching**: Lower latency, slightly reduced throughput
- **Optimal batching**: Depends on message size and processing time

```
options:
  batching: 1        # Fetch 1 message at a time (low latency)
  batching: 5        # Fetch 5 messages (balanced)
  batching: 20       # Fetch 20 messages (high throughput)
```

> **Tested by:** `testReadmeBatchingExamplesAreAccepted` - values 1, 5, 10, 20, 50 are all verified.

### Batch Timeout

[](#batch-timeout)

Controls how long to wait for a batch to fill:

```
options:
  batching: 10
  max_batch_timeout: 0.5  # Wait max 0.5s for batch to fill
                          # Returns early if timeout reached
```

> **Tested by:** `testReadmeTimeoutExamplesAreAccepted` - values 0.5, 1.0, 2.0 are verified. Behat scenarios `nats_batching.feature`.

**Example scenarios:**

- If you set `batching: 10` and `max_batch_timeout: 0.5`
- If 10 messages arrive quickly, all are fetched immediately
- If only 3 messages arrive in 0.5s, return those 3

### Connection Timeout

[](#connection-timeout)

Controls the timeout for establishing the NATS connection (initial and reconnect dial attempts):

```
options:
  connection_timeout: 2.0  # Connection (dial) timeout in seconds
```

> **Tested by:** `testReadmeTimeoutExamplesAreAccepted` (1.0, 2.0, 3.0), `testBuildWithConnectionTimeoutPropagatesMs`

**Purpose:**

- Sets the timeout for the initial TCP/TLS dial and handshake when connecting to NATS
- Does **not** govern per-operation read/write timeouts (publish/ack/request keep the client's own request timeout); the batch fetch is bounded separately by `max_batch_timeout`
- Lower values fail faster on connection issues
- Higher values tolerate slower connection establishment

**When to adjust:**

- Increase for high-latency networks or geographically distant NATS servers
- Decrease for faster failure detection in local environments
- Default of 1 second works well for most local/regional deployments
- Don't wait forever for the batch to fill

Stream Configuration
--------------------

[](#stream-configuration)

### Retention Policies

[](#retention-policies)

Control how long messages are kept in the stream:

```
options:
  # By age (24 hours)
  stream_max_age: 86400

  # By total size (1GB)
  stream_max_bytes: 1073741824

  # By total message count across the entire stream (NATS: max_msgs)
  stream_max_messages: 1000000

  # By message count per individual subject (NATS: max_msgs_per_subject)
  stream_max_messages_per_subject: 1000

  # Unlimited (default)
  stream_max_age: 0
  stream_max_bytes: null
  stream_max_messages: null
  stream_max_messages_per_subject: null
```

> **Tested by:** `testReadmeStreamRetentionExamplesAreAccepted` - all retention options above are verified. Behat scenarios `nats_stream_limits.feature`.

> **Note:** `stream_max_messages` limits the total number of messages stored in the stream (maps to NATS `max_msgs`), while `stream_max_messages_per_subject` limits messages retained per individual subject (maps to NATS `max_msgs_per_subject`). The per-subject limit is especially useful with [multi-subject streams](#multi-subject-streams) to prevent one high-volume subject from dominating retention.

### High Availability

[](#high-availability)

```
options:
  # Single replica (no redundancy)
  stream_replicas: 1

  # 3 replicas (recommended for production)
  stream_replicas: 3
```

> **Tested by:** `testReadmeStreamRetentionExamplesAreAccepted` (replicas 1 and 3), `testSetupPassesConfiguredStreamOptions`

Testing
-------

[](#testing)

### Unit Tests

[](#unit-tests)

```
# Install dependencies
composer install

# Run static analysis and the fast unit suite after every modification
composer test

# Run NATS
composer nats:start

# Run all unit tests with coverage (recommended)
composer test:unit

# Or run tests manually
./vendor/bin/phpunit
```

The target is to have at least 90% of code coverage.

**What's tested:**

- DSN parsing and validation
- Configuration option handling
- Authentication support
- Port configuration
- Error handling
- Interface compliance

### Mutation Tests

[](#mutation-tests)

The unit suite is mutation-tested with [Infection](https://infection.github.io/) to ensure the tests actually detect behavioral changes (not just execute lines):

```
# Requires a coverage driver (xdebug or pcov)
composer test:mutation
```

Configuration lives in `infection.json5`. It enforces a minimum MSI of 90% and a minimum covered MSI of 95%; the suite currently scores 100% covered MSI with 100% mutation code coverage. CI runs it on the PHP 8.5 job.

### Functional Tests

[](#functional-tests)

Functional tests require a running NATS server with JetStream enabled:

```
# Set up functional test dependencies
composer test:functional:setup

# Start NATS server in Docker
composer nats:start

# Run functional tests
composer test:functional

# Stop NATS server
composer nats:stop
```

**Manual approach:**

```
# Set up NATS in Docker (optional)
cd tests/nats
docker-compose up -d

# Run functional tests
cd ../functional
./vendor/bin/behat features/

# Stop NATS
cd ../nats
docker-compose down
```

**What's tested:**

- Message publishing
- Message consumption
- Message acknowledgment
- Consumer setup
- Stream persistence

**See also:** `tests/functional/README.md`

Advanced Usage
--------------

[](#advanced-usage)

### Multiple Transports

[](#multiple-transports)

Set up multiple independent transports for different use cases:

```
framework:
  messenger:
    transports:
      # High-priority, low-latency messages
      nats_fast:
        dsn: 'nats-jetstream://localhost/fast-stream/fast-topic'
        options:
          consumer: 'fast-consumer'
          batching: 1

      # Bulk processing, high throughput
      nats_bulk:
        dsn: 'nats-jetstream://localhost/bulk-stream/bulk-topic'
        options:
          consumer: 'bulk-consumer'
          batching: 50

      # Audit logging
      nats_audit:
        dsn: 'nats-jetstream://localhost/audit-stream/audit-topic'
        options:
          consumer: 'audit-consumer'
          stream_max_age: 2592000  # 30 days
          stream_replicas: 3
```

> **Tested by:** `testReadmeDsnExamplesParseSuccessfully[README: fast transport]`, `testReadmeDsnExamplesParseSuccessfully[README: bulk transport]`, `testReadmeDsnExamplesParseSuccessfully[README: audit transport]`, `testReadmeAuditTransportOptionsAreAccepted`, `testReadmeBatchingExamplesAreAccepted`

### Multi-Subject Streams

[](#multi-subject-streams)

Multiple transports can share the same NATS stream with different subjects. When `messenger:setup-transports` runs, each transport adds its subject to the existing stream rather than overwriting it:

```
framework:
  messenger:
    transports:
      # Both transports share the "events" stream
      nats_orders:
        dsn: 'nats-jetstream://localhost/events/orders'
        options:
          consumer: 'order-consumer'
          batching: 1
          stream_max_age: 300

      nats_payments:
        dsn: 'nats-jetstream://localhost/events/payments'
        options:
          consumer: 'payment-consumer'
          batching: 2
```

The `events` stream will have both `orders` and `payments` as subjects.

> **Tested by:** `testReadmeDsnExamplesParseSuccessfully[README: multi-subject orders]`, `testReadmeDsnExamplesParseSuccessfully[README: multi-subject payments]`, `testReadmeMultiSubjectOptionsAreAccepted`, `testSetupUpdatesExistingStreamMergesSubjectsAndPreservesServerConfig`, Behat scenario `Setup command merges subjects for transports sharing one stream`

> **Note:** When a stream already exists, setup reads the current JetStream configuration, merges in any new subjects, and then overlays the stream settings managed by this transport. Existing subjects are preserved, duplicate subjects are not added, and the existing storage backend is kept for already-created streams.

### Setup on Initialization

[](#setup-on-initialization)

Automatically create streams and consumers on first run:

```
framework:
  messenger:
    transports:
      nats_transport:
        dsn: 'nats-jetstream://localhost/my-stream/my-topic'
        options:
          consumer: 'my-consumer'
```

Then call setup command:

```
symfony console messenger:setup-transports nats_transport
```

> **Tested by:** `testSetupCreatesStreamAndConsumer`, `testSetupPassesConfiguredStreamOptions`, `testSetupUpdatesExistingStreamMergesSubjectsAndPreservesServerConfig`, Behat scenarios `Setup NATS stream with max age configuration`, `Setup command handles existing streams gracefully`, and `Custom consumer name is registered in JetStream`

### Delayed / Scheduled Messages

[](#delayed--scheduled-messages)

**Requires NATS Server &gt;= 2.12** with JetStream enabled. If `scheduled_messages` is enabled against an older server, `messenger:setup-transports` fails with a clear, actionable error telling you to upgrade NATS or disable the option.

Enable `scheduled_messages` in the DSN to use Symfony's `DelayStamp` for deferred delivery:

```
framework:
  messenger:
    transports:
      nats_transport:
        dsn: 'nats-jetstream://localhost/my-stream/my-topic?scheduled_messages=true'
```

Then dispatch messages with a delay:

```
use Symfony\Component\Messenger\Stamp\DelayStamp;

// Deliver after 30 seconds
$bus->dispatch(new MyMessage(), [new DelayStamp(30000)]);
```

> **Tested by:** `testSendWithDelayStampPublishesToDelayedSubjectWithScheduleHeaders`, `testSendDelayedMessageSchedulesAtRequestedDelay`, `testSendDelayedMessageNeverSchedulesBeforeRequestedDelay`, `testReadmeScheduledMessagesDsnEnablesFeature`, Behat scenarios `Delayed messages are delivered after the scheduled time` and `Delayed messages are not available to the consumer before the scheduled time`

When `scheduled_messages` is enabled and a `DelayStamp` is present:

- The message is published to a `{topic}.delayed.{uuid}` subject with `Nats-Schedule` and `Nats-Schedule-Target` headers
- The stream is created with an additional `{topic}.delayed.>` subject and `allow_msg_schedules` enabled
- NATS holds the message and delivers it to the original topic at the scheduled time
- The consumer processes it like any other message

The `DelayStamp` delay (milliseconds) maps onto a NATS `@at` schedule, which has **whole-second resolution**. The delay is rounded **up** to the next whole second, so a message is never delivered *before* the requested delay elapses (it may arrive up to ~1 second later); a sub-second delay therefore schedules at the next whole second rather than firing immediately.

When `scheduled_messages` is disabled (the default), any `DelayStamp` on the envelope is silently ignored and messages are published immediately.

This will:

1. Create the stream with configured settings
2. Create the consumer with explicit ACK policy
3. Verify consumer creation

### Stream Monitoring

[](#stream-monitoring)

View stream and consumer information:

```
# List streams
nats stream list

# View stream info
nats stream info my-stream

# List consumers
nats consumer list my-stream

# View consumer info
nats consumer info my-stream my-consumer

# View message count
nats consumer info my-stream my-consumer --json | jq '.state.num_pending'
```

### Manual Message Operations

[](#manual-message-operations)

```
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

// Get message count
$count = $transport->getMessageCount();

// Check if messages are pending
if ($count > 0) {
    echo "Pending messages: $count";
}
```

> **Tested by:** `testGetMessageCountReturnsConsumerPendingMessages`, `testGetMessageCountFallsBackToStreamState`, `testGetMessageCountReturnsZeroWhenLookupsFail`, `testGetMessageCountSumsAckPendingAndPending`

Troubleshooting
---------------

[](#troubleshooting)

### Connection Issues

[](#connection-issues)

**Error: "Connection refused"**

```
# Check NATS is running
nats-server --js

# Verify host and port
nats-jetstream://localhost:4222/stream/topic
```

> **Tested by:** Behat scenario `Setup command fails gracefully when NATS is unavailable`

**Error: "Stream not found"**

```
# Run setup command to create stream
symfony console messenger:setup-transports nats_transport
```

> **Tested by:** `testSetupCreatesStreamAndConsumer`, `testSetupUpdatesStreamWhenItAlreadyExists`, Behat scenarios `Setup NATS stream with max age configuration` and `Setup command handles existing streams gracefully`

### Message Processing Issues

[](#message-processing-issues)

**Messages not being consumed**

```
# Check consumer exists
nats consumer list my-stream

# View consumer status
nats consumer info my-stream my-consumer

# Check for errors in consumer
nats consumer info my-stream my-consumer --json | jq '.state'
```

**Messages stuck in pending**

```
# Check handler is not throwing exceptions
# Verify handler implementation
# Check application logs for errors
```

Architecture
------------

[](#architecture)

The bridge consists of two main components:

### NatsTransportFactory

[](#natstransportfactory)

- Handles DSN scheme detection (`nats-jetstream://`)
- Creates `NatsTransport` instances
- Validates configuration

### NatsTransport

[](#natstransport)

- Implements Symfony's `TransportInterface`, `MessageCountAwareInterface`, `SetupableTransportInterface`, `KeepaliveReceiverInterface`, and `CloseableTransportInterface`
- Manages stream and consumer connections
- Handles message serialization via a pluggable `SerializerInterface` (igbinary when constructed directly without one)
- Supports batching and explicit acknowledgment

Performance Tips
----------------

[](#performance-tips)

1. **Choose appropriate batching**

    - Start with `batching: 5` for balanced performance
    - Increase to 20+ for high throughput workloads
    - Use 1 for strict low-latency requirements
2. **Set reasonable timeouts**

    - `max_batch_timeout: 0.5` for responsive systems
    - `max_batch_timeout: 2.0` for background jobs
    - `connection_timeout: 1.0` for local/regional deployments
    - `connection_timeout: 3.0+` for cross-region or high-latency networks
3. **Use appropriate replicas**

    - `stream_replicas: 1` for development
    - `stream_replicas: 3` for production
4. **Monitor performance**

    - Use `getMessageCount()` to track queue depth
    - Monitor handler execution time
    - Watch for stuck messages

Security Considerations
-----------------------

[](#security-considerations)

### ⚠️ Deserialization of Untrusted Data

[](#️-deserialization-of-untrusted-data)

The default `IgbinarySerializer` (and any serializer extending `AbstractEnveloperSerializer`) deserializes raw message payloads from NATS into PHP objects. PHP object unserialization is a [well-known attack vector](https://owasp.org/Top10/A08_2021-Software_and_Data_Integrity_Failures/) - a crafted payload can trigger arbitrary code execution via magic methods (`__wakeup`, `__destruct`, etc.).

> **⚠️ PhpSerializer fallback:** when no serializer is configured **and `ext-igbinary` is not installed**, the transport automatically falls back to Symfony's `PhpSerializer`, which uses native `unserialize()` - the **same** untrusted-deserialization (object injection) risk as igbinary, not a safer alternative. The transport emits an `E_USER_WARNING` when this happens. Do not rely on the fallback in production: either install `ext-igbinary` or explicitly configure a serializer (ideally a safe-format one, per below).

**If your NATS topics are not fully trusted** (e.g. shared infrastructure, external publishers), you should:

- Implement a custom serializer that uses a safe format (JSON, Protobuf) instead of PHP object serialization
- Add message-level authentication (e.g. HMAC signatures) to verify publisher identity before deserializing
- Restrict NATS topic publish permissions via ACLs so only trusted services can publish

The type check (`instanceof Envelope`) happens *after* deserialization, which is too late to prevent exploitation.

### Stream-Exists Detection During Setup

[](#stream-exists-detection-during-setup)

During `setup()`, if `createStream` fails the transport detects a pre-existing stream **deterministically** by querying JetStream stream info: a `404` means the stream is absent (so the creation error was genuine and is rethrown), while a successful lookup means the stream exists (so it is updated, reusing the fetched configuration). This relies on the JetStream stream-info API rather than matching server-specific conflict strings (`"already in use"` / `"already exists"`), whose wording varies across NATS versions.

If you experience unexpected behavior during stream setup, confirm the stream can be queried via JetStream stream-info APIs and review the exact error returned by your NATS server version.

### Publish Response Validation

[](#publish-response-validation)

On `send()`, the transport awaits the JetStream publish acknowledgement returned by the client's `publish()` call. The client validates that acknowledgement and raises a `JetStreamException` if JetStream reports an error or returns an empty/malformed response, so a proxy or protocol misconfiguration fails closed instead of silently accepting an invalid publish acknowledgement.

### General Recommendations

[](#general-recommendations)

1. **Authentication**

    - Prefer environment variables or explicit options for credentials over hard-coded DSNs
    - If you use credentials in a DSN, avoid logging the full DSN because it may expose secrets
    - Store credentials in environment variables
    - Never commit credentials to version control
2. **Message Encryption**

    - Encrypt sensitive data before dispatching
    - NATS can be configured with TLS for transit encryption
    - Implement application-level encryption for sensitive payloads
3. **Access Control**

    - Restrict stream/consumer creation to authorized users
    - Use NATS access control lists (ACLs) for fine-grained permissions
    - Audit stream operations

Contributing
------------

[](#contributing)

Contributions are welcome! Please ensure:

- Every modification runs the relevant verification commands before it is considered done
- Minimum verification for PHP changes: `composer test`
- All tests pass: `composer test:unit`
- Code coverage remains above 90%
- New features include corresponding tests
- Documentation is updated
- Functional tests pass: `composer test:functional` (if applicable)
- `docs/TESTS.md` is kept up to date when tests are added, removed, or renamed
- Each release has an entry in `docs/CHANGELOG.md` following [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) format
- When a PR is merged or its features are adapted, a description is added to `docs/PRs/`

### Quick Development Workflow

[](#quick-development-workflow)

```
# 1. Run static analysis and the default unit suite after each modification
composer test

# 2. Set up functional tests (first time only)
composer test:functional:setup

# 3. Start NATS for functional tests
composer nats:start

# 4. Run functional tests
composer test:functional

# 5. Clean up
composer nats:stop
```

License
-------

[](#license)

MIT License - see LICENSE file for details

Support
-------

[](#support)

For issues, questions, or suggestions:

1. Check the [troubleshooting](#troubleshooting) section
2. Check existing issues on GitHub
3. Create a new issue with detailed information

###  Health Score

55

—

FairBetter than 97% of packages

Maintenance97

Actively maintained with recent releases

Popularity38

Limited adoption so far

Community17

Small or concentrated contributor base

Maturity55

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 82.3% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~25 days

Recently: every ~50 days

Total

10

Last Release

17d ago

Major Versions

v1.0.3 → v2.0.02025-11-24

v2.0.0 → v3.0.02025-11-27

v3.2.0 → v4.0.02026-03-20

v4.0.0 → v5.0.02026-06-16

### Community

Maintainers

![](https://avatars.githubusercontent.com/u/168780730?v=4)[IDCT Bartosz Pachołek](/maintainers/ideaconnect)[@ideaconnect](https://github.com/ideaconnect)

---

Top Contributors

[![bpacholek](https://avatars.githubusercontent.com/u/3039162?v=4)](https://github.com/bpacholek "bpacholek (116 commits)")[![jphooiveld-axtion](https://avatars.githubusercontent.com/u/124688698?v=4)](https://github.com/jphooiveld-axtion "jphooiveld-axtion (9 commits)")[![coff33cat](https://avatars.githubusercontent.com/u/2161184?v=4)](https://github.com/coff33cat "coff33cat (7 commits)")[![zlatkoverk](https://avatars.githubusercontent.com/u/25726655?v=4)](https://github.com/zlatkoverk "zlatkoverk (6 commits)")[![dependabot[bot]](https://avatars.githubusercontent.com/in/29110?v=4)](https://github.com/dependabot[bot] "dependabot[bot] (2 commits)")[![pimjansen](https://avatars.githubusercontent.com/u/1457219?v=4)](https://github.com/pimjansen "pimjansen (1 commits)")

###  Code Quality

TestsPHPUnit

Static AnalysisPHPStan

Type Coverage Yes

### Embed Badge

![Health badge](/badges/idct-symfony-nats-messenger/health.svg)

```
[![Health](https://phpackages.com/badges/idct-symfony-nats-messenger/health.svg)](https://phpackages.com/packages/idct-symfony-nats-messenger)
```

###  Alternatives

[rcsofttech/audit-trail-bundle

Enterprise-grade, high-performance Symfony audit trail bundle. Automatically track Doctrine entity changes with split-phase architecture, multiple transports (HTTP, Queue, Doctrine), and sensitive data masking.

1189.8k](/packages/rcsofttech-audit-trail-bundle)[easycorp/easyadmin-bundle

Admin generator for Symfony applications

4.3k17.9M388](/packages/easycorp-easyadmin-bundle)[sylius/sylius

E-Commerce platform for PHP, based on Symfony framework.

8.5k5.9M737](/packages/sylius-sylius)[pimcore/pimcore

Content &amp; Product Management Framework (CMS/PIM/E-Commerce)

3.8k3.8M508](/packages/pimcore-pimcore)[sulu/sulu

Core framework that implements the functionality of the Sulu content management system

1.3k1.4M203](/packages/sulu-sulu)[open-dxp/opendxp

Content &amp; Product Management Framework (CMS/PIM)

9421.6k61](/packages/open-dxp-opendxp)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
