PHPackages                             kosmosafive/messenger-kafka - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. kosmosafive/messenger-kafka

ActiveSymfony-bundle[Queues &amp; Workers](/categories/queues)

kosmosafive/messenger-kafka
===========================

Symfony Messenger Kafka Transport

1.0.0(4mo ago)031Apache-2.0PHPPHP ^8.4

Since Feb 11Pushed 4mo agoCompare

[ Source](https://github.com/kosmosafive/messenger-kafka)[ Packagist](https://packagist.org/packages/kosmosafive/messenger-kafka)[ RSS](/packages/kosmosafive-messenger-kafka/feed)WikiDiscussions master Synced today

READMEChangelogDependencies (15)Versions (3)Used By (0)

Symfony Messenger Kafka Transport
=================================

[](#symfony-messenger-kafka-transport)

This bundle aims to provide a simple Kafka transport for Symfony Messenger. Kafka REST Proxy support coming soon.

Installation
------------

[](#installation)

### Applications that use Symfony Flex

[](#applications-that-use-symfony-flex)

Open a command console, enter your project directory and execute:

```
$ composer require kosmosafive/messenger-kafka
```

### Applications that don't use Symfony Flex

[](#applications-that-dont-use-symfony-flex)

After adding the composer requirement, enable the bundle by adding it to the list of registered bundles in the `config/bundles.php` file of your project:

```
return [
    // ...
    Kosmosafive\Kafka\KosmosafiveKafkaBundle::class => ['all' => true],
];
```

Configuration
-------------

[](#configuration)

### DSN

[](#dsn)

Specify a DSN starting with either `kafka://` or `kafka+ssl://`. Multiple brokers are separated by `,`.

- `kafka://my-local-kafka:9092`
- `kafka+ssl://my-staging-kafka:9093`
- `kafka+ssl://prod-kafka-01:9093,kafka+ssl://prod-kafka-02:9093,kafka+ssl://prod-kafka-03:9093`

### Example

[](#example)

The configuration options for `kafka_conf` and `topic_conf` can be found [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). It is highly recommended to set `enable.auto.offset.store` to `false` for consumers. Otherwise, every message will be acknowledged, regardless of any error thrown by the message handlers.

```
framework:
    messenger:
        transports:
            producer:
                dsn: '%env(KAFKA_URL)%'
                # serializer: App\Infrastructure\Messenger\MySerializer
                options:
                    flushTimeout: 10000
                    flushRetries: 5
                    topic:
                        name: 'events'
                    kafka_conf:
                        security.protocol: 'sasl_ssl'
                        ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
                        sasl.username: '%env(KAFKA_SASL_USERNAME)%'
                        sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
                        sasl.mechanisms: 'SCRAM-SHA-256'
            consumer:
                dsn: '%env(KAFKA_URL)%'
                # serializer: App\Infrastructure\Messenger\MySerializer
                options:
                    commitAsync: true
                    receiveTimeout: 10000
                    topic:
                        name: "events"
                    kafka_conf:
                        enable.auto.offset.store: 'false'
                        group.id: 'my-group-id' # should be unique per consumer
                        security.protocol: 'sasl_ssl'
                        ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
                        sasl.username: '%env(KAFKA_SASL_USERNAME)%'
                        sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
                        sasl.mechanisms: 'SCRAM-SHA-256'
                        max.poll.interval.ms: '45000'
                    topic_conf:
                        auto.offset.reset: 'earliest'
```

Serializer
----------

[](#serializer)

You will most likely want to implement your own Serializer. Please see:

The fields `key`, `headers`, and `body` are available in the `decode()` and `encode()` methods.

```
