PHPackages                             idct/symfony-nats-messenger - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. idct/symfony-nats-messenger

ActiveLibrary[Queues &amp; Workers](/categories/queues)

idct/symfony-nats-messenger
===========================

Symfony NATS (JetStream) Messenger Bridge

v4.0.0(1mo ago)181.2k↓17.6%4[1 issues](https://github.com/ideaconnect/symfony-nats-messenger/issues)MITPHPPHP ^8.2CI passing

Since Nov 2Pushed 1mo ago2 watchersCompare

[ Source](https://github.com/ideaconnect/symfony-nats-messenger)[ Packagist](https://packagist.org/packages/idct/symfony-nats-messenger)[ RSS](/packages/idct-symfony-nats-messenger/feed)WikiDiscussions main Synced 1mo ago

READMEChangelog (8)Dependencies (13)Versions (12)Used By (0)

Symfony NATS Messenger Bridge
=============================

[](#symfony-nats-messenger-bridge)

[![PHP Version](https://camo.githubusercontent.com/e771ed2874cdbf856ba15fd6a3cdd70ca87551ef2d02a04f946ce3e3b26adabf/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f5048502d253545382e322d3738374342353f6c6f676f3d706870266c6f676f436f6c6f723d7768697465)](https://php.net)[![Symfony Version](https://camo.githubusercontent.com/201f0d32598cad1657544dbd6798ff4fc142c0a69d5a05add890e20c32bc9f29/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f53796d666f6e792d253545372e32253230253743253230253545382e302d3030303030303f6c6f676f3d73796d666f6e79266c6f676f436f6c6f723d7768697465)](https://symfony.com)[![Unit Tests Coverage](https://camo.githubusercontent.com/2b682d89fd699147f49a721c75c85938984713a6a64bc7069d1b703766daaaa8/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f436f7665726167652d39352e39372532352d627269676874677265656e)](https://github.com/ideaconnect/symfony-nats-messenger/actions)[![Functional Tests](https://camo.githubusercontent.com/eb2880245c72f1bad2347758a7bb08140abde7e01262ed384f3bf63d112203dc/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f46756e6374696f6e616c25323054657374732d42656861742d626c7565)](tests/functional)[![License](https://camo.githubusercontent.com/5caa455d8debc46fb23abbadb45a733a937f3910a73fc875c2f7820468e1bb54/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f4c6963656e73652d4d49542d677265656e)](LICENSE)[![CI](https://github.com/ideaconnect/symfony-nats-messenger/actions/workflows/ci.yml/badge.svg)](https://github.com/ideaconnect/symfony-nats-messenger/actions/workflows/ci.yml)

A Symfony Messenger transport integration for [NATS JetStream](https://docs.nats.io/nats-concepts/jetstream), enabling reliable asynchronous messaging with persistent message streaming.

Features
--------

[](#features)

- 🚀 **High-Performance Messaging** - Leverage NATS JetStream for fast, reliable message delivery
- 📦 **Symfony Integration** - Seamless integration with Symfony Messenger
- ⚙️ **Configurable Consumers** - Support for multiple consumer strategies
- 🔄 **Flexible Batching** - Adjustable message batch sizes and timeouts
- 🔐 **Authentication Support** - Built-in support for NATS authentication
- 📊 **Stream Configuration** - Configurable retention policies and replication
- 🧪 **Thoroughly Tested** - 102 unit tests with ~96% code coverage

🚀 This project looks for funding. Love my work? Support it! 💖
-------------------------------------------------------------

[](#-this-project-looks-for-funding-love-my-work-support-it-)

- ☕ **Buy me a coffee**:
- 💝 **Sponsor**:
- 🪙 **BTC**: bc1qntms755swm3nplsjpllvx92u8wdzrvs474a0hr
- 💎 **ETH**: 0x08E27250c91540911eD27F161572aFA53Ca24C0a
- ⚡ **TRX**: TVXWaU4ScNV9RBYX5RqFmySuB4zF991QaE
- 🚀 **LTC**: LN5ApP1Yhk4iU9Bo1tLU8eHX39zDzzyZxB

Requirements
------------

[](#requirements)

### System Requirements

[](#system-requirements)

- **PHP**: ^8.2
- **Symfony**: ^7.2 || ^8
- **NATS Server**: ^2.9 with JetStream enabled, ^2.12 for scheduled messages support.

Installation
------------

[](#installation)

```
composer require idct/symfony-nats-messenger
```

> **Operational note:** This is the package installation command, not a runtime behavior covered by the library test suite. The installed package is exercised by the unit and functional tests documented below.

### Development Setup

[](#development-setup)

For contributors and development:

```
# Install dependencies
composer install

# Run static analysis and the default unit test suite after every modification
composer test

# Start NATS server for testing
composer nats:start

# Run unit tests with coverage
composer test:unit

# Set up functional tests
composer test:functional:setup

# Run functional tests
composer test:functional

# Stop NATS server
composer nats:stop
```

> **Verification note:** This is the repository verification workflow used for this package. The same command sequence is run when validating documentation and code changes in this repository.

Quick Start
-----------

[](#quick-start)

### 1. Configure NATS Server

[](#1-configure-nats-server)

Ensure your NATS server has JetStream enabled:

```
nats-server -js
```

> **Operational note:** Starting `nats-server -js` is an environment prerequisite rather than package behavior. Automated coverage begins once JetStream is available and the transport is exercised by the functional scenarios below.

### 2. Set Up Transport in Symfony

[](#2-set-up-transport-in-symfony)

Add the NATS transport to your Symfony Messenger configuration:

```
# config/packages/messenger.yaml
framework:
  messenger:
    transports:
      nats_transport:
        dsn: 'nats-jetstream://localhost:4222/my-stream/my-topic'
        options:
          consumer: 'my-consumer'
          batching: 5
          max_batch_timeout: 1.0

    routing:
      'App\Message\MyAsyncMessage': nats_transport
```

> **Tested by:** `testReadmeDsnExamplesParseSuccessfully[README: quick-start transport]`, `testReadmeConfigurationOptionsAreAccepted`, `testReadmeBatchingExamplesAreAccepted`, `testReadmeTimeoutExamplesAreAccepted`

### 3. Configure Custom Serializers (Optional)

[](#3-configure-custom-serializers-optional)

By default, the transport uses `igbinary` serialization for high performance when the extension is available. If `ext-igbinary` is not installed, it falls back to Symfony's `PhpSerializer` and emits a notice. You can also customize this explicitly:

#### Using IgbinarySerializer (Default)

[](#using-igbinaryserializer-default)

```
# config/packages/messenger.yaml
framework:
  messenger:
    transports:
      nats_transport:
        dsn: 'nats-jetstream://localhost:4222/my-stream/my-topic'
        serializer: 'IDCT\NatsMessenger\Serializer\IgbinarySerializer'
        options:
          consumer: 'my-consumer'
```

**Note:** Custom serializer services are resolved by Symfony before transport creation. When no serializer service is provided, the transport instantiates its built-in default serializer and falls back to Symfony's `PhpSerializer` if `ext-igbinary` is unavailable.

For example:

```
    igbinary_serializer:
        class: IDCT\NatsMessenger\Serializer\IgbinarySerializer
```

or:

```
    IDCT\NatsMessenger\Serializer\IgbinarySerializer: ~
```

> **Tested by:** `createTransport_UsesProvidedSerializer`, `serialize_WithValidEnvelope_ReturnsSerializedString`, `decode_WithValidEncodedEnvelope_ReturnsEnvelope`, `testConstructorWithoutIgbinaryDoesNotCrash`

#### Creating Custom Serializers

[](#creating-custom-serializers)

You can create your own serializer by extending `AbstractEnveloperSerializer`:

```
use IDCT\NatsMessenger\Serializer\AbstractEnveloperSerializer;
use Symfony\Component\Messenger\Envelope;

class MyCustomSerializer extends AbstractEnveloperSerializer
{
    protected function serialize(Envelope $envelope): string
    {
        // Your custom serialization logic
        return serialize($envelope);
    }

    protected function deserialize(string $data): mixed
    {
        // Your custom deserialization logic
        return unserialize($data);
    }
}
```

> **Tested by:** `readmeCustomSerializerExample_EncodeDecode_RoundTrips`, `readmeCustomSerializerExample_DecodeInvalidBody_ThrowsException` — the exact code above is compiled and exercised via `ReadmeExampleSerializer` in the unit tests.

For reference implementations, see:

- `src/Serializer/IgbinarySerializer.php` - Binary serialization
- `src/Serializer/AbstractEnveloperSerializer.php` - Base class

### 4. Send Messages

[](#4-send-messages)

```
use App\Message\MyAsyncMessage;
use Symfony\Component\Messenger\MessageBus;

class MyController
{
    public function __construct(private MessageBus $bus) {}

    public function send(): void
    {
        $this->bus->dispatch(new MyAsyncMessage('Hello NATS!'));
    }
}
```

> **Tested by:** `testSendPublishesEncodedBodyWithoutHeaders`, `testSendUsesRequestWithHeadersWhenHeadersArePresent`, Behat scenario `Complete message flow - send, check stats, consume, verify`

### 5. Handle Messages

[](#5-handle-messages)

```
use App\Message\MyAsyncMessage;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class MyAsyncMessageHandler implements MessageHandlerInterface
{
    public function __invoke(MyAsyncMessage $message): void
    {
        echo "Processing: " . $message->getText();
    }
}
```

> **Tested by:** Behat scenarios `Complete message flow - send, check stats, consume, verify`, `Send and consume messages with a custom consumer name`, and `High-volume message processing with file output verification` — handlers are exercised through real `messenger:consume` runs.

### 6. Consume Messages

[](#6-consume-messages)

```
symfony console messenger:consume nats_transport
```

> **Tested by:** Behat scenarios `Complete message flow - send, check stats, consume, verify`, `Send and consume messages with a custom consumer name`, and `Partial message consumption with multiple consumers` — the Behat context runs `messenger:consume` as a Symfony CLI process.

Configuration Guide
-------------------

[](#configuration-guide)

### DSN Format

[](#dsn-format)

```
nats-jetstream://[user:password@]host:port/stream-name/topic-name

```

> **Tested by:** `testBuildWithValidDsnReturnsConfiguration`, `testBuildWithoutPathThrowsException`, `testBuildWithoutTopicThrowsException`, `createTransport_WithValidDsn_ReturnsNatsTransportInstance`

**Examples:**

```
# Default port (4222)
nats-jetstream://localhost/my-stream/my-topic

# Custom port
nats-jetstream://localhost:5000/my-stream/my-topic

# With authentication
nats-jetstream://user:password@localhost:4222/my-stream/my-topic

# With query parameters
nats-jetstream://localhost/my-stream/my-topic?consumer=worker&batching=10

# TLS transport scheme
nats-jetstream+tls://localhost:4222/my-stream/my-topic
```

> **Tested by:** `testReadmeDsnExamplesParseSuccessfully` — each DSN above is parsed through the configuration builder via a dedicated data provider case.

### Configuration Options

[](#configuration-options)

```
framework:
  messenger:
    transports:
      nats_transport:
        dsn: 'nats-jetstream://localhost:4222/my-stream/my-topic'
        options:
          # Consumer Configuration
          consumer: 'my-consumer'           # Consumer group name (default: 'client')

          # Performance Tuning
          batching: 5                       # Messages per batch (default: 1)
          max_batch_timeout: 1.0            # Timeout in seconds for batch fetching (default: 1)
          connection_timeout: 1.0           # Socket I/O timeout in seconds (default: 1)

          # Stream Retention Policies
          stream_max_age: 86400             # Max message age in seconds (0 = unlimited, default: 0)
          stream_max_bytes: 1073741824      # Max storage size in bytes (null = unlimited)
          stream_max_messages: 1000000      # Max number of messages in the stream (null = unlimited)
          stream_max_messages_per_subject: 1000 # Max number of messages retained per subject (null = unlimited)

          # Storage Backend
          stream_storage: 'file'            # Storage type: 'file' or 'memory' (default: 'file')

          # High Availability
          stream_replicas: 1                # Number of replicas (default: 1)

          # Failure Handling Strategy
          retry_handler: 'symfony'          # symfony|nats (default: symfony)
                                            # symfony => TERM on failed/rejected message
                                            # nats    => NAK on failed/rejected message

          # Scheduled / Delayed Messages (requires NATS >= 2.12)
          scheduled_messages: false         # Enable scheduled message support (default: false)
                                            # When enabled, Symfony DelayStamp triggers NATS
                                            # scheduled message delivery via Nats-Schedule headers

          # TLS Configuration
          tls_required: false               # Force TLS for NATS connection (default: false)
          tls_handshake_first: false        # Use TLS-first handshake mode (default: false)
          tls_ca_file: null                 # Path to CA certificate file
          tls_cert_file: null               # Path to client certificate file
          tls_key_file: null                # Path to client private key
          tls_key_passphrase: null          # Passphrase for encrypted private key
          tls_peer_name: null               # Override TLS peer name for certificate validation
          tls_verify_peer: true             # Verify TLS peer certificate (default: true)

          # Additional Authentication
          token: null                       # NATS token authentication
          username: null                    # Overrides DSN username if provided
          password: null                    # Overrides DSN password if provided
          jwt: null                         # JWT authentication value
          nkey: null                        # NKey public value
```

> **Tested by:** `testReadmeConfigurationOptionsAreAccepted` (all options above), `testReadmeBatchingExamplesAreAccepted`, `testReadmeTimeoutExamplesAreAccepted`, `testReadmeStreamRetentionExamplesAreAccepted`, `testBuildWithTlsAndAuthOptionsPropagatesToNatsOptions`

### Retry Handler Behavior

[](#retry-handler-behavior)

- `retry_handler: symfony` (default) sends `TERM` when a message fails during transport decoding or is rejected.
- `retry_handler: nats` sends `NAK` when a message fails during transport decoding or is rejected.

> **Tested by:** `testRejectUsesTermByDefault`, `testRejectUsesNakWhenRetryHandlerIsNats`, `testBuildUsesRetryHandlerFromQuery`, Behat scenarios `nats_nak.feature` and `nats_term.feature`

Important: Consumer Strategies
------------------------------

[](#important-consumer-strategies)

This is critical to understand before setting up multiple transport instances:

### ⚠️ Strategy A: Same Consumer, Batching = 1

[](#️-strategy-a-same-consumer-batching--1)

**Use when:** Multiple instances should cooperate on the same consumer

```
# All instances use the same consumer with batching=1
transports:
  nats_worker_1:
    dsn: 'nats-jetstream://localhost/my-stream/my-topic'
    options:
      consumer: 'shared-consumer'  # Same consumer name
      batching: 1                  # MUST be 1 for shared consumers

  nats_worker_2:
    dsn: 'nats-jetstream://localhost/my-stream/my-topic'
    options:
      consumer: 'shared-consumer'  # Same consumer name
      batching: 1                  # MUST be 1 for shared consumers
```

**Why batching must be 1:**

- With explicit acknowledge (ACK) mode, only messages that are explicitly acknowledged are considered processed
- Multiple instances sharing the same consumer need to ACK individually
- Batching &gt; 1 with multiple instances causes delivery conflicts
- Each instance should fetch and ACK one message at a time

**Benefits:**

- Automatic load balancing across instances
- NATS handles message distribution
- Guaranteed single processing per message

> **Tested by:** `testReadmeBatchingExamplesAreAccepted` (batching=1), Behat scenario `Partial message consumption with multiple consumers`

### ✅ Strategy B: Different Consumers, Any Batching

[](#-strategy-b-different-consumers-any-batching)

**Use when:** Each instance needs independent message processing (duplicates allowed)

```
# Each instance uses a different consumer
transports:
  nats_worker_1:
    dsn: 'nats-jetstream://localhost/my-stream/my-topic'
    options:
      consumer: 'worker-1-consumer'   # Unique consumer per instance
      batching: 10                    # Can use any batching

  nats_worker_2:
    dsn: 'nats-jetstream://localhost/my-stream/my-topic'
    options:
      consumer: 'worker-2-consumer'   # Unique consumer per instance
      batching: 10                    # Can use any batching
```

**Why this works:**

- Each consumer maintains its own state
- All messages are delivered to all consumers independently
- Each instance can use higher batching for better throughput
- Duplicate processing is expected (fan-out pattern)

**Use cases:**

- Event broadcasting to multiple systems
- Multiple independent processors
- Audit logging / event replay

> **Tested by:** `testReadmeBatchingExamplesAreAccepted` (batching=10), Behat scenario `Partial message consumption with multiple consumers`

Batching &amp; Timeouts
-----------------------

[](#batching--timeouts)

### Batching Explained

[](#batching-explained)

- **Higher batching**: Better throughput, slightly higher latency
- **Lower batching**: Lower latency, slightly reduced throughput
- **Optimal batching**: Depends on message size and processing time

```
options:
  batching: 1        # Fetch 1 message at a time (low latency)
  batching: 5        # Fetch 5 messages (balanced)
  batching: 20       # Fetch 20 messages (high throughput)
```

> **Tested by:** `testReadmeBatchingExamplesAreAccepted` — values 1, 5, 10, 20, 50 are all verified.

### Batch Timeout

[](#batch-timeout)

Controls how long to wait for a batch to fill:

```
options:
  batching: 10
  max_batch_timeout: 0.5  # Wait max 0.5s for batch to fill
                          # Returns early if timeout reached
```

> **Tested by:** `testReadmeTimeoutExamplesAreAccepted` — values 0.5, 1.0, 2.0 are verified. Behat scenarios `nats_batching.feature`.

**Example scenarios:**

- If you set `batching: 10` and `max_batch_timeout: 0.5`
- If 10 messages arrive quickly, all are fetched immediately
- If only 3 messages arrive in 0.5s, return those 3

### Connection Timeout

[](#connection-timeout)

Controls the socket-level I/O timeout for all NATS operations:

```
options:
  connection_timeout: 2.0  # Socket timeout in seconds
```

> **Tested by:** `testReadmeTimeoutExamplesAreAccepted` (1.0, 2.0, 3.0), `testBuildWithConnectionTimeoutPropagatesMs`

**Purpose:**

- Sets the timeout for socket read/write operations
- Affects all NATS communication (publish, subscribe, ack, etc.)
- Lower values fail faster on network issues
- Higher values tolerate slower networks

**When to adjust:**

- Increase for high-latency networks or geographically distant NATS servers
- Decrease for faster failure detection in local environments
- Default of 1 second works well for most local/regional deployments
- Don't wait forever for the batch to fill

Stream Configuration
--------------------

[](#stream-configuration)

### Retention Policies

[](#retention-policies)

Control how long messages are kept in the stream:

```
options:
  # By age (24 hours)
  stream_max_age: 86400

  # By total size (1GB)
  stream_max_bytes: 1073741824

  # By total message count across the entire stream (NATS: max_msgs)
  stream_max_messages: 1000000

  # By message count per individual subject (NATS: max_msgs_per_subject)
  stream_max_messages_per_subject: 1000

  # Unlimited (default)
  stream_max_age: 0
  stream_max_bytes: null
  stream_max_messages: null
  stream_max_messages_per_subject: null
```

> **Tested by:** `testReadmeStreamRetentionExamplesAreAccepted` — all retention options above are verified. Behat scenarios `nats_stream_limits.feature`.

> **Note:** `stream_max_messages` limits the total number of messages stored in the stream (maps to NATS `max_msgs`), while `stream_max_messages_per_subject` limits messages retained per individual subject (maps to NATS `max_msgs_per_subject`). The per-subject limit is especially useful with [multi-subject streams](#multi-subject-streams) to prevent one high-volume subject from dominating retention.

### High Availability

[](#high-availability)

```
options:
  # Single replica (no redundancy)
  stream_replicas: 1

  # 3 replicas (recommended for production)
  stream_replicas: 3
```

> **Tested by:** `testReadmeStreamRetentionExamplesAreAccepted` (replicas 1 and 3), `testSetupPassesConfiguredStreamOptions`

Testing
-------

[](#testing)

### Unit Tests

[](#unit-tests)

```
# Install dependencies
composer install --dev

# Run static analysis and the fast unit suite after every modification
composer test

# Run NATS
composer nats:start

# Run all unit tests with coverage (recommended)
composer test:unit

# Or run tests manually
./vendor/bin/phpunit
```

> **Verification note:** This block documents the supported contributor workflow. The same `composer test` and `composer test:unit` commands are used to verify changes in this repository.

The target is to have at least 90% of code coverage.

**What's tested:**

- DSN parsing and validation
- Configuration option handling
- Authentication support
- Port configuration
- Error handling
- Interface compliance

### Functional Tests

[](#functional-tests)

Functional tests require a running NATS server with JetStream enabled:

```
# Set up functional test dependencies
composer test:functional:setup

# Start NATS server in Docker
composer nats:start

# Run functional tests
composer test:functional

# Stop NATS server
composer nats:stop
```

> **Verification note:** This is the scripted functional test workflow used for the transport's end-to-end verification.

**Manual approach:**

```
# Set up NATS in Docker (optional)
cd tests/nats
docker-compose up -d

# Run functional tests
cd ../functional
./vendor/bin/behat features/

# Stop NATS
cd ../nats
docker-compose down
```

> **Operational note:** This manual Docker/Behat flow mirrors the scripted functional commands above and is not asserted separately by the package tests.

**What's tested:**

- Message publishing
- Message consumption
- Message acknowledgment
- Consumer setup
- Stream persistence

**See also:** `tests/functional/README.md`

Advanced Usage
--------------

[](#advanced-usage)

### Multiple Transports

[](#multiple-transports)

Set up multiple independent transports for different use cases:

```
framework:
  messenger:
    transports:
      # High-priority, low-latency messages
      nats_fast:
        dsn: 'nats-jetstream://localhost/fast-stream/fast-topic'
        options:
          consumer: 'fast-consumer'
          batching: 1

      # Bulk processing, high throughput
      nats_bulk:
        dsn: 'nats-jetstream://localhost/bulk-stream/bulk-topic'
        options:
          consumer: 'bulk-consumer'
          batching: 50

      # Audit logging
      nats_audit:
        dsn: 'nats-jetstream://localhost/audit-stream/audit-topic'
        options:
          consumer: 'audit-consumer'
          stream_max_age: 2592000  # 30 days
          stream_replicas: 3
```

> **Tested by:** `testReadmeDsnExamplesParseSuccessfully[README: fast transport]`, `testReadmeDsnExamplesParseSuccessfully[README: bulk transport]`, `testReadmeDsnExamplesParseSuccessfully[README: audit transport]`, `testReadmeAuditTransportOptionsAreAccepted`, `testReadmeBatchingExamplesAreAccepted`

### Multi-Subject Streams

[](#multi-subject-streams)

Multiple transports can share the same NATS stream with different subjects. When `messenger:setup-transports` runs, each transport adds its subject to the existing stream rather than overwriting it:

```
framework:
  messenger:
    transports:
      # Both transports share the "events" stream
      nats_orders:
        dsn: 'nats-jetstream://localhost/events/orders'
        options:
          consumer: 'order-consumer'
          delay: 0.5
          batching: 1
          stream_max_age: 300

      nats_payments:
        dsn: 'nats-jetstream://localhost/events/payments'
        options:
          consumer: 'payment-consumer'
          delay: 1
          batching: 2
```

The `events` stream will have both `orders` and `payments` as subjects.

> **Tested by:** `testReadmeDsnExamplesParseSuccessfully[README: multi-subject orders]`, `testReadmeDsnExamplesParseSuccessfully[README: multi-subject payments]`, `testReadmeMultiSubjectOptionsAreAccepted`, `testSetupUpdatesExistingStreamMergesSubjectsAndPreservesServerConfig`, Behat scenario `Setup command merges subjects for transports sharing one stream`

> **Note:** When a stream already exists, setup reads the current JetStream configuration, merges in any new subjects, and then overlays the stream settings managed by this transport. Existing subjects are preserved, duplicate subjects are not added, and the existing storage backend is kept for already-created streams.

### Setup on Initialization

[](#setup-on-initialization)

Automatically create streams and consumers on first run:

```
framework:
  messenger:
    transports:
      nats_transport:
        dsn: 'nats-jetstream://localhost/my-stream/my-topic'
        options:
          consumer: 'my-consumer'
```

Then call setup command:

```
symfony console messenger:setup-transports nats_transport
```

> **Tested by:** `testSetupCreatesStreamAndConsumer`, `testSetupPassesConfiguredStreamOptions`, `testSetupUpdatesExistingStreamMergesSubjectsAndPreservesServerConfig`, Behat scenarios `Setup NATS stream with max age configuration`, `Setup command handles existing streams gracefully`, and `Custom consumer name is registered in JetStream`

### Delayed / Scheduled Messages

[](#delayed--scheduled-messages)

**Requires NATS Server &gt;= 2.12** with JetStream enabled.

Enable `scheduled_messages` in the DSN to use Symfony's `DelayStamp` for deferred delivery:

```
framework:
  messenger:
    transports:
      nats_transport:
        dsn: 'nats-jetstream://localhost/my-stream/my-topic?scheduled_messages=true'
```

Then dispatch messages with a delay:

```
use Symfony\Component\Messenger\Stamp\DelayStamp;

// Deliver after 30 seconds
$bus->dispatch(new MyMessage(), [new DelayStamp(30000)]);
```

> **Tested by:** `testSendWithDelayStampPublishesToDelayedSubjectWithScheduleHeaders`, `testReadmeScheduledMessagesDsnEnablesFeature`, Behat scenario `Delayed messages are delivered after the scheduled time`

When `scheduled_messages` is enabled and a `DelayStamp` is present:

- The message is published to a `{topic}.delayed.{uuid}` subject with `Nats-Schedule` and `Nats-Schedule-Target` headers
- The stream is created with an additional `{topic}.delayed.>` subject and `allow_msg_schedules` enabled
- NATS holds the message and delivers it to the original topic at the scheduled time
- The consumer processes it like any other message

When `scheduled_messages` is disabled (the default), any `DelayStamp` on the envelope is silently ignored and messages are published immediately.

This will:

1. Create the stream with configured settings
2. Create the consumer with explicit ACK policy
3. Verify consumer creation

### Stream Monitoring

[](#stream-monitoring)

View stream and consumer information:

```
# List streams
nats stream list

# View stream info
nats stream info my-stream

# List consumers
nats consumer list my-stream

# View consumer info
nats consumer info my-stream my-consumer

# View message count
nats consumer info my-stream my-consumer --json | jq '.state.num_pending'
```

> **Operational note:** These are NATS CLI inspection commands, so this package does not assert their exact textual output directly. The underlying stream, consumer, and message-count state is covered by Behat scenarios `Setup NATS stream with max age configuration`, `Custom consumer name is registered in JetStream`, `Complete message flow - send, check stats, consume, verify`, and the `getMessageCount()` unit tests.

### Manual Message Operations

[](#manual-message-operations)

```
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Transport\TransportInterface;

// Get message count
$count = $transport->getMessageCount();

// Check if messages are pending
if ($count > 0) {
    echo "Pending messages: $count";
}
```

> **Tested by:** `testGetMessageCountReturnsConsumerPendingMessages`, `testGetMessageCountFallsBackToStreamState`, `testGetMessageCountReturnsZeroWhenLookupsFail`, `testGetMessageCountReturnsAckPendingWhenHigherThanPending`

Troubleshooting
---------------

[](#troubleshooting)

### Connection Issues

[](#connection-issues)

**Error: "Connection refused"**

```
# Check NATS is running
nats-server --js

# Verify host and port
nats-jetstream://localhost:4222/stream/topic
```

> **Tested by:** Behat scenario `Setup command fails gracefully when NATS is unavailable`

**Error: "Stream not found"**

```
# Run setup command to create stream
symfony console messenger:setup-transports nats_transport
```

> **Tested by:** `testSetupCreatesStreamAndConsumer`, `testSetupUpdatesStreamWhenItAlreadyExists`, Behat scenarios `Setup NATS stream with max age configuration` and `Setup command handles existing streams gracefully`

### Message Processing Issues

[](#message-processing-issues)

**Messages not being consumed**

```
# Check consumer exists
nats consumer list my-stream

# View consumer status
nats consumer info my-stream my-consumer

# Check for errors in consumer
nats consumer info my-stream my-consumer --json | jq '.state'
```

> **Operational note:** These are manual diagnosis commands. The actual consume path is covered by Behat scenarios `Complete message flow - send, check stats, consume, verify`, `Send and consume messages with a custom consumer name`, and `Partial message consumption with multiple consumers`.

**Messages stuck in pending**

```
# Check handler is not throwing exceptions
# Verify handler implementation
# Check application logs for errors
```

> **Operational note:** This checklist is manual guidance. Pending-count behavior is covered by `testGetMessageCountReturnsConsumerPendingMessages`, `testGetMessageCountFallsBackToStreamState`, `testGetMessageCountReturnsAckPendingWhenHigherThanPending`, and Behat scenario `Complete message flow - send, check stats, consume, verify`.

Architecture
------------

[](#architecture)

The bridge consists of two main components:

### NatsTransportFactory

[](#natstransportfactory)

- Handles DSN scheme detection (`nats-jetstream://`)
- Creates `NatsTransport` instances
- Validates configuration

### NatsTransport

[](#natstransport)

- Implements Symfony's `TransportInterface`
- Manages stream and consumer connections
- Handles message serialization (igbinary)
- Supports batching and explicit acknowledgment

Performance Tips
----------------

[](#performance-tips)

1. **Choose appropriate batching**

    - Start with `batching: 5` for balanced performance
    - Increase to 20+ for high throughput workloads
    - Use 1 for strict low-latency requirements
2. **Set reasonable timeouts**

    - `max_batch_timeout: 0.5` for responsive systems
    - `max_batch_timeout: 2.0` for background jobs
    - `connection_timeout: 1.0` for local/regional deployments
    - `connection_timeout: 3.0+` for cross-region or high-latency networks
3. **Use appropriate replicas**

    - `stream_replicas: 1` for development
    - `stream_replicas: 3` for production
4. **Monitor performance**

    - Use `getMessageCount()` to track queue depth
    - Monitor handler execution time
    - Watch for stuck messages

Security Considerations
-----------------------

[](#security-considerations)

### ⚠️ Deserialization of Untrusted Data

[](#️-deserialization-of-untrusted-data)

The default `IgbinarySerializer` (and any serializer extending `AbstractEnveloperSerializer`) deserializes raw message payloads from NATS into PHP objects. PHP object unserialization is a [well-known attack vector](https://owasp.org/Top10/A08_2021-Software_and_Data_Integrity_Failures/) — a crafted payload can trigger arbitrary code execution via magic methods (`__wakeup`, `__destruct`, etc.).

**If your NATS topics are not fully trusted** (e.g. shared infrastructure, external publishers), you should:

- Implement a custom serializer that uses a safe format (JSON, Protobuf) instead of PHP object serialization
- Add message-level authentication (e.g. HMAC signatures) to verify publisher identity before deserializing
- Restrict NATS topic publish permissions via ACLs so only trusted services can publish

The type check (`instanceof Envelope`) happens *after* deserialization, which is too late to prevent exploitation.

### Stream-Exists Detection During Setup

[](#stream-exists-detection-during-setup)

During `setup()`, the transport prefers explicit NATS conflict messages (for example `"already in use"` or `"already exists"`) to detect a pre-existing stream. When NATS returns a generic HTTP `400`, the transport now verifies whether the stream actually exists before switching to `updateStream()`. This avoids misclassifying unrelated bad-request errors as an existing-stream conflict.

If you experience unexpected behavior during stream setup, review the exact error returned by your NATS server version and confirm the stream can be queried via JetStream stream info APIs.

### Publish Response Validation

[](#publish-response-validation)

When JetStream publish acknowledgements are received through the header-aware request path, the transport parses the response as JSON and throws an exception if JetStream reports an error or the response is not valid JSON. This makes proxy or protocol misconfiguration fail closed instead of silently accepting an invalid publish acknowledgement.

### General Recommendations

[](#general-recommendations)

1. **Authentication**

    - Prefer environment variables or explicit options for credentials over hard-coded DSNs
    - If you use credentials in a DSN, avoid logging the full DSN because it may expose secrets
    - Store credentials in environment variables
    - Never commit credentials to version control
2. **Message Encryption**

    - Encrypt sensitive data before dispatching
    - NATS can be configured with TLS for transit encryption
    - Implement application-level encryption for sensitive payloads
3. **Access Control**

    - Restrict stream/consumer creation to authorized users
    - Use NATS access control lists (ACLs) for fine-grained permissions
    - Audit stream operations

Contributing
------------

[](#contributing)

Contributions are welcome! Please ensure:

- Every modification runs the relevant verification commands before it is considered done
- Minimum verification for PHP changes: `composer test`
- All tests pass: `composer test:unit`
- Code coverage remains above 90%
- New features include corresponding tests
- Documentation is updated
- Functional tests pass: `composer test:functional` (if applicable)
- `docs/TESTS.md` is kept up to date when tests are added, removed, or renamed
- Each release has an entry in `docs/CHANGELOG.md` following [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) format
- When a PR is merged or its features are adapted, a description is added to `docs/PRs/`

### Quick Development Workflow

[](#quick-development-workflow)

```
# 1. Run static analysis and the default unit suite after each modification
composer test

# 2. Set up functional tests (first time only)
composer test:functional:setup

# 3. Start NATS for functional tests
composer nats:start

# 4. Run functional tests
composer test:functional

# 5. Clean up
composer nats:stop
```

> **Verification note:** This is the exact end-to-end verification sequence required for repository changes and the same sequence used for this task.

License
-------

[](#license)

MIT License - see LICENSE file for details

Support
-------

[](#support)

For issues, questions, or suggestions:

1. Check the [troubleshooting](#troubleshooting) section
2. Check existing issues on GitHub
3. Create a new issue with detailed information

💖 Love the project? Support it! 🚀
=================================

[](#-love-the-project-support-it-)

- 🪙 **BTC**: bc1qntms755swm3nplsjpllvx92u8wdzrvs474a0hr
- 💎 **ETH**: 0x08E27250c91540911eD27F161572aFA53Ca24C0a
- ⚡ **TRX**: TVXWaU4ScNV9RBYX5RqFmySuB4zF991QaE
- 🚀 **LTC**: LN5ApP1Yhk4iU9Bo1tLU8eHX39zDzzyZxB
- ☕ **Buy me a coffee**:
- 💝 **Sponsor**:

###  Health Score

50

—

FairBetter than 95% of packages

Maintenance88

Actively maintained with recent releases

Popularity31

Limited adoption so far

Community16

Small or concentrated contributor base

Maturity54

Maturing project, gaining track record

 Bus Factor2

2 contributors hold 50%+ of commits

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~17 days

Recently: every ~29 days

Total

9

Last Release

53d ago

Major Versions

v1.0.3 → v2.0.02025-11-24

v2.0.0 → v3.0.02025-11-27

v3.2.0 → v4.0.02026-03-20

### Community

Maintainers

![](https://www.gravatar.com/avatar/b76a5fac115d24d698d09cc939a6922bf309732ae61d0f2f215ae9e7397658ea?d=identicon)[ideaconnect](/maintainers/ideaconnect)

---

Top Contributors

[![jphooiveld-axtion](https://avatars.githubusercontent.com/u/124688698?v=4)](https://github.com/jphooiveld-axtion "jphooiveld-axtion (9 commits)")[![coff33cat](https://avatars.githubusercontent.com/u/2161184?v=4)](https://github.com/coff33cat "coff33cat (7 commits)")[![zlatkoverk](https://avatars.githubusercontent.com/u/25726655?v=4)](https://github.com/zlatkoverk "zlatkoverk (6 commits)")[![bpacholek](https://avatars.githubusercontent.com/u/3039162?v=4)](https://github.com/bpacholek "bpacholek (4 commits)")[![pimjansen](https://avatars.githubusercontent.com/u/1457219?v=4)](https://github.com/pimjansen "pimjansen (1 commits)")

###  Code Quality

TestsPHPUnit

Static AnalysisPHPStan

Type Coverage Yes

### Embed Badge

![Health badge](/badges/idct-symfony-nats-messenger/health.svg)

```
[![Health](https://phpackages.com/badges/idct-symfony-nats-messenger/health.svg)](https://phpackages.com/packages/idct-symfony-nats-messenger)
```

###  Alternatives

[sylius/sylius

E-Commerce platform for PHP, based on Symfony framework.

8.4k5.6M651](/packages/sylius-sylius)[symfony/doctrine-messenger

Symfony Doctrine Messenger Bridge

52986.7M156](/packages/symfony-doctrine-messenger)[sulu/sulu

Core framework that implements the functionality of the Sulu content management system

1.3k1.3M152](/packages/sulu-sulu)[contao/core-bundle

Contao Open Source CMS

1231.6M2.4k](/packages/contao-core-bundle)[jwage/phpamqplib-messenger

Symfony messenger transport for the php-amqplib/php-amqplib library.

84149.7k1](/packages/jwage-phpamqplib-messenger)[open-dxp/opendxp

Content &amp; Product Management Framework (CMS/PIM)

7310.3k29](/packages/open-dxp-opendxp)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
