PHPackages                             akson/messenger-kafka - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. akson/messenger-kafka

ActiveSymfony-bundle[Queues &amp; Workers](/categories/queues)

akson/messenger-kafka
=====================

Symfony Messenger Kafka Transport

v0.13.2(3y ago)02.2kMITPHPPHP ^7.1.3|8.\*

Since Jun 18Pushed 3y agoCompare

[ Source](https://github.com/cergius/messenger-kafka)[ Packagist](https://packagist.org/packages/akson/messenger-kafka)[ RSS](/packages/akson-messenger-kafka/feed)WikiDiscussions master Synced 1mo ago

READMEChangelog (1)Dependencies (16)Versions (20)Used By (0)

Symfony Messenger Kafka Transport
=================================

[](#symfony-messenger-kafka-transport)

[![License](https://camo.githubusercontent.com/db555095cb457622c51a02168b2031832c29b2d7d4d4df8bc167881055b991ce/68747470733a2f2f696d672e736869656c64732e696f2f6769746875622f6c6963656e73652f4b6f6e7374616e74696e436f6465732f6d657373656e6765722d6b61666b612e737667)](LICENSE)[![Packagist](https://camo.githubusercontent.com/2fe2a96adf24d02a6f780d32c3f20416e797325db3d7479088e3c326a965b154/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f64742f6b6f636f2f6d657373656e6765722d6b61666b612e737667)](https://packagist.org/packages/koco/messenger-kafka)[![Maintainability](https://camo.githubusercontent.com/95b4ed8bcdbba9813b5033b2d9c0cdd7346716f067a9fd1eafadd685409cd76e/68747470733a2f2f6170692e636f6465636c696d6174652e636f6d2f76312f6261646765732f37666133643264613661383238613637366633352f6d61696e7461696e6162696c697479)](https://codeclimate.com/github/KonstantinCodes/messenger-kafka/maintainability)[![CircleCI](https://camo.githubusercontent.com/c457f049a909ee6676d07a837d5a7ad9ccdf4ed244a6afeb4944f83be0a0c279/68747470733a2f2f636972636c6563692e636f6d2f67682f4b6f6e7374616e74696e436f6465732f6d657373656e6765722d6b61666b612e7376673f7374796c653d737667)](https://circleci.com/gh/KonstantinCodes/messenger-kafka)[![Tests](https://github.com/KonstantinCodes/messenger-kafka/workflows/Tests/badge.svg)](https://github.com/KonstantinCodes/messenger-kafka/actions)

This bundle aims to provide a simple Kafka transport for Symfony Messenger. Kafka REST Proxy support coming soon.

Installation
------------

[](#installation)

### Applications that use Symfony Flex

[](#applications-that-use-symfony-flex)

Open a command console, enter your project directory and execute:

```
$ composer require koco/messenger-kafka
```

### Applications that don't use Symfony Flex

[](#applications-that-dont-use-symfony-flex)

After adding the composer requirement, enable the bundle by adding it to the list of registered bundles in the `config/bundles.php` file of your project:

```
return [
    // ...
    Koco\Kafka\KocoKafkaBundle::class => ['all' => true],
];
```

Configuration
-------------

[](#configuration)

### DSN

[](#dsn)

Specify a DSN starting with either `kafka://` or `kafka+ssl://`. Multiple brokers are separated by `,`.

- `kafka://my-local-kafka:9092`
- `kafka+ssl://my-staging-kafka:9093`
- `kafka+ssl://prod-kafka-01:9093,kafka+ssl://prod-kafka-02:9093,kafka+ssl://prod-kafka-03:9093`

### Example

[](#example)

The configuration options for `kafka_conf` and `topic_conf` can be found [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). It is highly recommended to set `enable.auto.offset.store` to `false` for consumers. Otherwise, every message will be acknowledged, regardless of any error thrown by the message handlers.

```
framework:
    messenger:
        transports:
            producer:
                dsn: '%env(KAFKA_URL)%'
                # serializer: App\Infrastructure\Messenger\MySerializer
                options:
                    flushTimeout: 10000
                    flushRetries: 5
                    topic:
                        name: 'events'
                    kafka_conf:
                        security.protocol: 'sasl_ssl'
                        ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
                        sasl.username: '%env(KAFKA_SASL_USERNAME)%'
                        sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
                        sasl.mechanisms: 'SCRAM-SHA-256'
            consumer:
                dsn: '%env(KAFKA_URL)%'
                # serializer: App\Infrastructure\Messenger\MySerializer
                options:
                    commitAsync: true
                    receiveTimeout: 10000
                    topic:
                        name: "events"
                    kafka_conf:
                        enable.auto.offset.store: 'false'
                        group.id: 'my-group-id' # should be unique per consumer
                        security.protocol: 'sasl_ssl'
                        ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
                        sasl.username: '%env(KAFKA_SASL_USERNAME)%'
                        sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
                        sasl.mechanisms: 'SCRAM-SHA-256'
                        max.poll.interval.ms: '45000'
                    topic_conf:
                        auto.offset.reset: 'earliest'
```

Message Formats
---------------

[](#message-formats)

You will most likely want to implement your own Serializer. Please see:

```
