PHPackages                             alvarorosado/event-driven-kafka-messenger-transport - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. alvarorosado/event-driven-kafka-messenger-transport

ActiveSymfony-bundle[Queues &amp; Workers](/categories/queues)

alvarorosado/event-driven-kafka-messenger-transport
===================================================

A Symfony bundle for integrating Kafka with Messenger, following a event driven approach

0.1.9(8mo ago)62221MITPHPPHP ^8.0

Since Jun 2Pushed 8mo ago1 watchersCompare

[ Source](https://github.com/AlvaroRosado/event-driven-kafka-messenger-transport)[ Packagist](https://packagist.org/packages/alvarorosado/event-driven-kafka-messenger-transport)[ RSS](/packages/alvarorosado-event-driven-kafka-messenger-transport/feed)WikiDiscussions main Synced 1mo ago

READMEChangelog (3)Dependencies (12)Versions (20)Used By (0)

Event Driven Kafka Messenger Transport
======================================

[](#event-driven-kafka-messenger-transport)

A custom transport for Symfony Messenger specifically designed to work with Apache Kafka as an event streaming platform.

Compatibility
-------------

[](#compatibility)

This transport is compatible with:

- \*\*Symfony 5.x || 6.\*x || 7.\*x
- **PHP 8.0+**

Why another Kafka transport?
----------------------------

[](#why-another-kafka-transport)

Existing packages for Kafka + Symfony Messenger are outdated or don't cover advanced event streaming use cases.

This transport is designed for:

- **Flexibility**: Granular configuration for producers and consumers
- **Simplicity**: Automatic JSON serialization without additional configuration
- **Multi-topic**: Produce to multiple topics with a single configuration
- **Selective Consumption**: Consume specific event types from topics containing multiple event types (Design your topics by event streams is now possible!)

Installation
------------

[](#installation)

```
composer require alvarorosado/event-driven-kafka-messenger-transport
```

Environment Variables
---------------------

[](#environment-variables)

```
# .env
KAFKA_DSN=ed+kafka://localhost:9092
```

Optional Security Parameters in DSN
-----------------------------------

[](#optional-security-parameters-in-dsn)

```
# With SASL authentication
KAFKA_EVENTS_MESSENGER_TRANSPORT_DSN=ed+kafka://localhost:9092?security_protocol=SASL_PLAINTEXT&username=myuser&password=mypass&sasl_mechanisms=PLAIN

# With SSL/TLS
KAFKA_EVENTS_MESSENGER_TRANSPORT_DSN=ed+kafka://localhost:9092?security_protocol=SSL

# Without authentication (default)
KAFKA_EVENTS_MESSENGER_TRANSPORT_DSN=ed+kafka://localhost:9092
```

Why `ed+kafka://` instead of `kafka://`?
----------------------------------------

[](#why-edkafka-instead-of-kafka)

The `ed+kafka://` DSN prefix allows this transport to coexist with other Kafka packages in the same project. This enables gradual migration and safe testing without conflicts - you can keep your existing Kafka transport while evaluating this one.

Configuration File
------------------

[](#configuration-file)

Create the global configuration file for Kafka settings:

```
# config/packages/event_drive_kafka_transport.yaml
event_driven_kafka_transport:
  consumer:
    commit_async: true
    consume_timeout_ms: 500
    config:
      group.id: 'default-group'
      auto.offset.reset: 'earliest'
  producer:
    config:
      enable.idempotence: 'false'
```

Quick Start
-----------

[](#quick-start)

### Basic Configuration

[](#basic-configuration)

```
# config/packages/messenger.yaml
framework:
  messenger:
    transports:
      kafka_events:
        dsn: '%env(KAFKA_DSN)%'
        options:
          topics: ['user_events']
          consumer:
            config:
              group.id: '%env(APP_ENV)%-app-events'
    routing:
      'App\Message\UserRegistered': kafka_events
```

*Works like any standard Symfony Messenger transport. Messages are serialized using PHP's native serialization and routed using Symfony's traditional routing system.*

### Advanced Configuration

[](#advanced-configuration)

```
framework:
  messenger:
    transports:
      kafka_events:
        dsn: '%env(KAFKA_DSN)%'
        options:
          topics: ['user_events']
          json_serialization:
            enabled: true  # Enables advanced mode
          consumer:
            routing:
              - name: 'user_registered'
                class: 'App\Message\UserRegistered'
              - name: 'user_updated'
                class: 'App\Message\UserUpdated'
            config:
              group.id: '%env(APP_ENV)%-app-events'
```

*When producing, messages are automatically serialized to JSON and sent to Kafka with the message body as JSON and Messenger metadata stored in Kafka headers. When consuming, the transport examines the message type and deserializes it to the corresponding PHP class based on the routing configuration.*

**⚠️ Important**: To use advanced mode, you **must implement the Hook interface** and define `KafkaIdentifierStamp` for each message type. This identifier is used as the JSON key for message type mapping during consumption. See the [Stamp System](#%EF%B8%8F-stamp-system) section below for complete implementation details.

### 🎯 Selective Event Streaming

[](#-selective-event-streaming)

Process only the events you need from a topic with multiple types:

```
# Topic: user_events (contains: user_registered, user_updated, user_deleted)
framework:
  messenger:
    transports:
      kafka_events:
        dsn: '%env(KAFKA_DSN)%'
        options:
          topics: ['user_events']
          json_serialization:
            enabled: true
          consumer:
            routing:
              # Only process registrations and updates
              - name: 'user_registered'
                class: 'App\Message\UserRegistered'
              - name: 'user_updated'
                class: 'App\Message\UserUpdated'
            # user_deleted is automatically ignored
```

**Advantages:**

- Unconfigured messages are automatically committed
- They don't accumulate as lag
- Multiple services can process different subsets of the same topic

### 🏷️ Stamp System

[](#️-stamp-system)

Control Kafka behavior through Stamps in a custom Hook. **This Hook implementation is required for advanced mode** to properly handle JSON serialization and message routing.

**Recommended Pattern - Base Message Class:**

```
abstract class Message
{
    abstract public function identifier(): string;

    public function key(): ?string
    {
        return null; // Optional: Override to provide partition key
    }
}

class UserRegistered extends BaseKafkaMessage
{
    public function identifier(): string
    {
        return 'user_registered';
    }
}
```

**Example of Hook Implementation:**

```
