PHPackages                             m6web/kafka-bundle - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [HTTP &amp; Networking](/categories/http)
4. /
5. m6web/kafka-bundle

AbandonedArchivedSymfony-bundle[HTTP &amp; Networking](/categories/http)

m6web/kafka-bundle
==================

Kafka bundle on top of rdkafka extension

v0.10.3(8y ago)83.5k5MITPHPPHP &gt;=7.0

Since Dec 20Pushed 8y ago53 watchersCompare

[ Source](https://github.com/M6Web/KafkaBundle)[ Packagist](https://packagist.org/packages/m6web/kafka-bundle)[ RSS](/packages/m6web-kafka-bundle/feed)WikiDiscussions master Synced 2mo ago

READMEChangelog (8)Dependencies (10)Versions (9)Used By (0)

KafkaBundle
===========

[](#kafkabundle)

Configuration and use of KafkaBundle are based on the [RdKafka extension](https://arnaud-lb.github.io/php-rdkafka/phpdoc/book.rdkafka.html). To consume messages, we decided to use the high level consumer.

[Kafka documentation](http://kafka.apache.org/documentation.html)

Installation
------------

[](#installation)

### For Symfony

[](#for-symfony)

```
{
    "require": {
        "m6web/kafka-bundle": "~0.1",
    }
}

```

Register the bundle:

```
// app/AppKernel.php

public function registerBundles()
{
    $bundles = array(
        new M6Web\Bundle\KafkaBundle\M6WebKafkaBundle(),
    );
}
```

Install the bundle:

```
$ composer update m6web/kafka-bundle

```

Usage
-----

[](#usage)

Add the `m6_web_kafka` section in your configuration file.

By default, the sf3 event dispatcher will throw an event on each command. To disable this feature:

```
m6_web_kafka:
   event_dispatcher: false
```

Here a configuration example:

[Librdkafka global configuration properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)

```
m6_web_kafka:
    event_dispatcher: true
    producers:
       producer1:
           configuration:
               timeout.ms: 1000
               queue.buffering.max.ms: 0 # Maximum time, in milliseconds, for buffering data on the producer queue. 1000ms by default.
           brokers:
               - '127.0.0.1'
               - '10.05.05.19'
           log_level: 3
           events_poll_timeout: 2000 #ms
           topics:
               batman:
                   configuration:
                       retries: 3
                   strategy_partition: 2
               catwoman:
                   configuration:
                       retries: 3
                   strategy_partition: 2

    consumers:
        consumer1:
            configuration:
                metadata.broker.list: '127.0.0.1'
                group.id: 'myConsumerGroup'
                enable.auto.commit: 0
            topicConfiguration:
                auto.offset.reset: 'smallest'
            timeout_consuming_queue: 200
            topics:
                - batman
                - catwoman
```

Note that we decided to use the high level consumer. So you can set the "group.id" option in the consumer configuration.

```
configuration:
  metadata.broker.list: '127.0.0.1'
  group.id: 'myConsumerGroup'
```

For the producers, we have one topic configuration for each topic:

```
 topics:
   batman:
       configuration:
           retries: 3
       strategy_partition: '2'
   catwoman:
       configuration:
           retries: 3
       strategy_partition: '2'
```

Whereas for the consumers, we have one topic configuration for all topics:

```
topicConfiguration:
    auto.offset.reset: 'smallest'
timeout_consuming_queue: 200
topics:
    - batman
    - catwoman
```

### Producer

[](#producer)

A producer will be used to send messages to the server.

In the Kafka Model, messages are sent to topics partitioned and stored on brokers. This means that in the configuration for a producer you will have to specify the brokers and the topics. You can optionnaly configure the log level and the strategy partitioner.

Because of RdKafka extension limitations, you cannot configure the partitions number or replication factor from the bundle. You must do that from the command line.

After setting your producers with their options, you will be able to produce a message using the `produce` method :

```
$producer->produce('message', RD_KAFKA_PARTITION_UA, '12345');
```

- The first argument is the message to send.
- The second argument is the partition where to produce the message. By default, the value is `RD_KAFKA_PARTITION_UA` which means that the message will be sent to a random partition.
- The third argument is a key if the strategy partitioner is by key.

The `RD_KAFKA_PARTITION_UA` constant is used according the strategy partitioner.

- If the strategy partitioner is `random` (`RD_KAFKA_MSG_PARTITIONER_RANDOM`), messages are assigned to partitions randomly.
- If the stratefy partitioner is `consistent` (`RD_KAFKA_MSG_PARTITIONER_CONSISTENT`) with a key defined, messages are assigned to the partition whose the id maps the hash of the key. If there is no key defined, messages are assigned to the same partition.

### Consumer

[](#consumer)

A consumer will be used to get a message from different topics. You can choose to set only one topic by consumer.

In the Kafka Model, messages are consumed from topics partitioned and stored on brokers. This means that for a consumer you will have to specify the brokers and topics in the configuration.

To consume messages, you will have to use the `consume` method to consume a message:

```
$consumer->consume();
```

The messages will be automatically committed except if there is an error. But you can choose not to do it by adding an argument as following:

```
$consumer->consume(false);
```

You can decide to commit manually your message with:

```
$consumer->commit();
```

It will commit the last consumed message.

It will give you an object `\RdKafka\Message` with information about the message : payload, topic, or partition for instance. It is the `\RdKafka\Message` from the [RdKafka extension](https://arnaud-lb.github.io/php-rdkafka/phpdoc/book.rdkafka.html).

In case there is no more message, it will give you a *No more message* string. In case there is a time out, it will give you a *Time out* string.

### Exceptions list

[](#exceptions-list)

- EntityNotSetException
- KafkaException
- LogLevelNotSetException
- NoBrokerSetException

###  Health Score

32

—

LowBetter than 72% of packages

Maintenance20

Infrequent updates — may be unmaintained

Popularity25

Limited adoption so far

Community22

Small or concentrated contributor base

Maturity53

Maturing project, gaining track record

 Bus Factor2

2 contributors hold 50%+ of commits

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~26 days

Total

8

Last Release

3244d ago

### Community

Maintainers

![](https://avatars.githubusercontent.com/u/2069361?v=4)[Patrick](/maintainers/Bedrock)[@Bedrock](https://github.com/Bedrock)

![](https://avatars.githubusercontent.com/u/5878620?v=4)[Fabien de Saint pern](/maintainers/fabdsp)[@fabdsp](https://github.com/fabdsp)

---

Top Contributors

[![NastasiaSaby](https://avatars.githubusercontent.com/u/8245071?v=4)](https://github.com/NastasiaSaby "NastasiaSaby (6 commits)")[![lnahiro](https://avatars.githubusercontent.com/u/14995154?v=4)](https://github.com/lnahiro "lnahiro (3 commits)")[![Oliboy50](https://avatars.githubusercontent.com/u/2571084?v=4)](https://github.com/Oliboy50 "Oliboy50 (2 commits)")[![fabdsp](https://avatars.githubusercontent.com/u/5878620?v=4)](https://github.com/fabdsp "fabdsp (1 commits)")[![jb-reynaud](https://avatars.githubusercontent.com/u/390431?v=4)](https://github.com/jb-reynaud "jb-reynaud (1 commits)")

---

Tags

bundlekafka

### Embed Badge

![Health badge](/badges/m6web-kafka-bundle/health.svg)

```
[![Health](https://phpackages.com/badges/m6web-kafka-bundle/health.svg)](https://phpackages.com/packages/m6web-kafka-bundle)
```

###  Alternatives

[lexik/jwt-authentication-bundle

This bundle provides JWT authentication for your Symfony REST API

2.6k58.7M210](/packages/lexik-jwt-authentication-bundle)[php-http/httplug-bundle

Symfony integration for HTTPlug

38921.0M54](/packages/php-http-httplug-bundle)[fresh/centrifugo-bundle

Provides communication with web-socket server Centrifugo in Symfony applications.

83375.3k](/packages/fresh-centrifugo-bundle)[swoole-bundle/swoole-bundle

Open/Swoole Symfony Bundle

6650.4k](/packages/swoole-bundle-swoole-bundle)[m6web/guzzle-http-bundle

Symfony bundle on top of Guzzle

17511.5k2](/packages/m6web-guzzle-http-bundle)[cypresslab/patch-manager

A library to manage patch requests

16117.4k](/packages/cypresslab-patch-manager)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
