PHPackages                             limlabs/kafka-bundle - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [HTTP &amp; Networking](/categories/http)
4. /
5. limlabs/kafka-bundle

ActiveSymfony-bundle[HTTP &amp; Networking](/categories/http)

limlabs/kafka-bundle
====================

A simple symfony bundle for kafka

1.0.2(1y ago)22.5k↓50%[1 issues](https://github.com/limmer-laboratories/kafka-bundle/issues)MITPHPPHP &gt;=8.1CI passing

Since Dec 27Pushed 1y agoCompare

[ Source](https://github.com/limmer-laboratories/kafka-bundle)[ Packagist](https://packagist.org/packages/limlabs/kafka-bundle)[ RSS](/packages/limlabs-kafka-bundle/feed)WikiDiscussions main Synced 1mo ago

READMEChangelog (3)Dependencies (2)Versions (5)Used By (0)

A Symfony Kafka Bundle
======================

[](#a-symfony-kafka-bundle)

- [Installation](#installation)
- [Usage](#usage)
    - [Configuration](#configuration)
    - [Sending messages to kafka](#sending-messages-to-kafka)
    - [Consuming kafka topics](#consuming-kafka-topics)
        - [Implementing a consumer](#implementing-a-consumer)
        - [Executing a consumer](#executing-a-consumer)

Installation
============

[](#installation)

To install the symfony kafka bundle just execute the following composer command:

```
composer require limlabs/kafka-bundle
```

Usage
=====

[](#usage)

Configuration
-------------

[](#configuration)

To use the kafka bundle you have to configure at least the `default` connection.
Example of such a configuration:

```
# config/packages/kafka.yaml

kafka:
  clients:
    default:
      brokers: 'kafka:9092' #A comma separated list of kafka brokers
      log_level: LOG_DEBUG #The rdkafka log level (PHP-Constants)
      debug: all #Which debug information should be printed by rdkafka. You can remove this to disable it
```

If you have multiple specific kafka connections, you can add multiple `clients` to this configuration.

```
# config/packages/kafka.yaml

kafka:
  clients:
    default:
      brokers: 'kafka:9092'
      log_level: LOG_DEBUG
      debug: all
    different_client:
      brokers: 'kafka2:9092,kafka3:9092'
      log_level: LOG_ERROR
```

Sending messages to kafka
-------------------------

[](#sending-messages-to-kafka)

To send message to a topic over the default configuration you can simply use the factory to get the `KafkaClient`. From the `KafkaClient` you can get the `producer` for your kafka connection. With this producer you can create a `topic` in which you can send a message with the `produce` function. The `createTopic` automatically gets an existing topic or creates a new one, if the specified topic does not exist. Here an example of this in a symfony controller:

```
namespace App\Controller;

use LimLabs\KafkaBundle\Factory\KafkaFactory;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\Routing\Annotation\Route;

class ExampleController extends AbstractController
{
    public function __construct(
        private readonly KafkaFactory  $kafkaFactory
    ) {
    }

    #[Route('/example', name: 'app_example')]
    public function index(): JsonResponse
    {
        $client = $this->kafkaFactory->getKafkaClient();
        $producer = $client->getProducer();
        $producer->createTopic('test')->produce('TestMessage');
        $producer->flush();

        [...]
    }
```

> ⚠️ Don't forget the `$producer->flush()` method call. Without it, you will lose data!

To get a `KafkaClient` for a specific connection, just set the name of the desired connection as a parameter in the `getKafkaClient` function of the factory.

```
$client = $this->kafkaFactory->getKafkaClient('different_client');
```

Consuming kafka topics
----------------------

[](#consuming-kafka-topics)

### Implementing a consumer

[](#implementing-a-consumer)

To consume kafka topics, you need to setup a class which implements the `KafkaConsumer`-Interface. This interface automatically tags the class correctly for symfony DI and forces you to implement the `consume` and the `getConsumerConfiguration` functions.
Here is a quick example of this:

```
