PHPackages                             myvon/reactphp-rdkafka - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [HTTP &amp; Networking](/categories/http)
4. /
5. myvon/reactphp-rdkafka

ActiveLibrary[HTTP &amp; Networking](/categories/http)

myvon/reactphp-rdkafka
======================

RdKafka Consumer and Producer implementation with ReactPHP Event Loop

1.0(3y ago)11.4k↓80%MITPHP

Since Feb 15Pushed 3y ago1 watchersCompare

[ Source](https://github.com/myvon/reactphp-rdkafka)[ Packagist](https://packagist.org/packages/myvon/reactphp-rdkafka)[ RSS](/packages/myvon-reactphp-rdkafka/feed)WikiDiscussions main Synced 3w ago

READMEChangelogDependencies (3)Versions (2)Used By (0)

reactphp-rdkafka
================

[](#reactphp-rdkafka)

RdKafka implementation with ReactPHP EventLoop.

This library implement [PHP RDKafka](https://github.com/arnaud-lb/php-rdkafka) from [Arnaud-lb](https://github.com/arnaud-lb/) with [react/event-loop](https://github.com/reactphp/event-loop) and [react/stream](https://github.com/reactphp/stream) and provide a non-blocking event-driven Consumer and Producer.

[![Latest Version on Packagist](https://camo.githubusercontent.com/6a2030d7c440eb7e9bb89488af544de7f7d7e883b22af9dbe80a953a9f12dcce/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f762f6d79766f6e2f72656163747068702d72646b61666b612e7376673f7374796c653d666c61742d737175617265)](https://packagist.org/packages/myvon/reactphp-rdkafka)[![Tests](https://github.com/myvon/reactphp-rdkafka/actions/workflows/run-test.yml/badge.svg)](https://github.com/myvon/reactphp-rdkafka/actions/workflows/run-test.yml)[![Total Downloads](https://camo.githubusercontent.com/f9d75412672e2fff1a1e04e45c5cbd0d98342b1414a0fd2bac06259a18fcf628/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f64742f6d79766f6e2f72656163747068702d72646b61666b612e7376673f7374796c653d666c61742d737175617265)](https://packagist.org/packages/myvon/reactphp-rdkafka)

How it works
============

[](#how-it-works)

This package use periodic timers from [react/event-loop](https://github.com/reactphp/event-loop) to consume messages at regular interval. To avoid blocking, timeout is set to 0 when consuming.

It also use [react/stream](https://github.com/reactphp/stream) to receive and send message in an event-driven way. Consuming message is done by listening to the `data` event of the steam. Producing message is done by writing data to the corresponding stream. See `Consuming Messages` and `Producing Messages` below.

Installation
============

[](#installation)

You can install the package via composer:

```
composer require myvon/reactphp-kafka
```

Be sure to have the [PHP RDKafka](https://github.com/arnaud-lb/php-rdkafka) extension installed on your server.

Consuming Messages
==================

[](#consuming-messages)

To consume message start by creating a `Configuration` object by passing it the name of your application (used for `group.id` configuration of kafka, see [Consumer group id (general)](https://github.com/arnaud-lb/php-rdkafka#consumer-group-id-general) for more information) and the list of your brokers :

```
use Myvon\Kafka\Configuration;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);
```

Then, you can create an "Myvon\\Kafka\\Consumer" instance:

```
use Myvon\Kafka\Consumer;

$consumer = new Consumer($configuration->consumer());
```

Using `Configuration::consumer()` generate an `RdKafka\Conf` instance with correct configuration for a Consumer.

You can then start consuming message by calling the `start` method of the consumer and passing it the list of topics you want to subscribe :

```
$stream = $consumer->start(['topic']);
```

This method will return an `ThroughStream` instance, allowing you to listen to the `data` event to receive messages :

```
use Myvon\Kafka\Configuration;
use Myvon\Kafka\Consumer;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);
$consumer = new Consumer($configuration->consumer());
$stream = $consumer->start(['topic']);

$stream->on('data', function($data) {
    $topic = $data['topic'];
    $message = $data['payload'];

    //... do whatever you want here
});
```

The `$data` parameter will contain the following keys :

- `topic`: contain the name of the topic the message come from
- `payload` : contain the message received

Handling consumer errors
========================

[](#handling-consumer-errors)

The consumer will write to the stream every message received with error `RD_KAFKA_RESP_ERR_NO_ERROR`.

`RD_KAFKA_RESP_ERR__TIMED_OUT` and `RD_KAFKA_RESP_ERR__PARTITION_EOF` will be ignored.

Every other error will be sent through the `error` event :

```
$stream->on("error", function(Exception $exception) {
    $errorStr = $exception->getMessage();
    $errorCode = $exception->getCode();
    // handle the error here
});
```

This package does not handle errors, it simply pass it to your application. It's up to you to handle it.

Consumer timeout and periodic timer
===================================

[](#consumer-timeout-and-periodic-timer)

By default the timeout passed to the `consume` method of `RdKafka\KafkaConsumer` it set to 0. This prevents the method to block the execution of the script. If you want to set a timeout anyway, you can do it by passing the desired timeout (in ms) to the `setConsumeTimeout` method :

```
$consumer->setConsumeTimeout(1000); // 1 second
```

Be aware that this will affect the EventLoop !

BY default the consumer will look for messages every 1 second. You can set this timer by passing the new timer to the `setTimerPeriod` method :

```
$consumer->setTimerPeriod(0.1); // 100 ms
```

Notice: It internally use the `addPeriodicTimer` method of the EventLoop so the timer is in second.

Accessing the KafkaConsumer instance
====================================

[](#accessing-the-kafkaconsumer-instance)

If you need to access the KafkaConsumer instance directly, you can do it by calling `getConsumer`:

```
$kafkaConsumer = $consumer->getConsumer();
```

Producing Messages
==================

[](#producing-messages)

Like the Consumer, you need to create the configuration object and pass it to `Myvon\Kafka\Producer` when instantiating it :

```
use Myvon\Kafka\Configuration;
use Myvon\Kafka\Producer;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);
$producer = new Producer($configuration->producer());
$streams = $producer->start(['topicName']);
```

You can pass multiple topic to the `start` method. Call `start` will return one instance of `ThroughStream` by topic you want to publish in. You can access the stream of a given topic by :

- Accessing it through the data returned by the `start` method : `$streams['topicName']`
- Retrieving the `ThroughStream` instance of a given topic by calling `getStream('topicName')`

You can then write message to the stream, which will by produced to the corresponding topic :

```
use Myvon\Kafka\Configuration;
use Myvon\Kafka\Producer;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);
$producer = new Producer($configuration->producer());
$streams = $producer->start(['myFirstTopic', 'mySecondTopic']);

$streams['myFirstTopic']->write('Hello First Topic !');
$streams['mySecondTopic']->write('Hello Second Topic !');

$producer->getStream('myFirstTopic')->write('Hello sent with getStream !');
```

Notice: the producer will ensure every message is sent by calling `poll()` every 500ms. This delay can be changed by calling `setPollInterval`. When streams are closed, the producer `poll` every message and `flush` them to be sure noting is lost.

I don't want to use the EventLoop when producing message
========================================================

[](#i-dont-want-to-use-the-eventloop-when-producing-message)

Sometime, you want to produce message directly without using an eventloop. This can be done by passing false as second argument to the `start` method :

```
use Myvon\Kafka\Configuration;
use Myvon\Kafka\Producer;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);
$producer = new Producer($configuration->producer());
$streams = $producer->start(['myFirstTopic', 'mySecondTopic'], false);

$streams['myFirstTopic']->write('Hello First Topic !');
$streams['mySecondTopic']->write('Hello Second Topic !');

$producer->getStream('myFirstTopic')->write('Hello sent with getStream !');
```

This will deactivate every usage of the loop in the producer and your code will immediately exit after the last line. To avoid loosing message, streams are closed on the destruction of the class, thanks to the `__destruct()` method.

Gracefully stopping Consumer and Producer
=========================================

[](#gracefully-stopping-consumer-and-producer)

If you want to stop the Consumer or the Producer before your code end, you can call the `stop()` method of each class. This will remove every periodic timer and close all streams.

By doing this you will receive the `end` and `close` events of every stream used by the Consumer or the Producer.

Using custom configuration option
=================================

[](#using-custom-configuration-option)

You can pass custom configuration option to the `RdKafka\Conf` instance generated by passing them as an array to the thrid arguments of the `Myvon\Kafka\Configuration` constructor :

```
$configuration = new Configuration('appName', ['127.0.0.1:9092'], ['enable.partition.eof' => 'false']);
```

Using a custom loop
===================

[](#using-a-custom-loop)

If you don't want the Consumer or the Producer to use the default loop, you can pass it as second arguments of the constructor of each class :

```
$loop = new AnotherLoopInstance();
$producer = new Producer($configuration->producer(), $loop);
$consumer = new Consumer($configuration->consumer(), $loop);
```

Notice: passing a loop to the producer will force loop utilization even if you pass `false` as second argument of `start`

Testing
-------

[](#testing)

```
composer test
```

Contributing
------------

[](#contributing)

Please see [CONTRIBUTING](CONTRIBUTING.md) for details.

License
-------

[](#license)

The MIT License (MIT). Please see [License File](LICENSE.md) for more information.

###  Health Score

26

—

LowBetter than 41% of packages

Maintenance20

Infrequent updates — may be unmaintained

Popularity20

Limited adoption so far

Community7

Small or concentrated contributor base

Maturity45

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Unknown

Total

1

Last Release

1223d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/954447d10f755d5a91a6a68ed467ea6173512d3d9b57e99d2ff34bd1e61ec215?d=identicon)[myvon](/maintainers/myvon)

---

Top Contributors

[![myvon](https://avatars.githubusercontent.com/u/16448032?v=4)](https://github.com/myvon "myvon (9 commits)")

---

Tags

reactphpkafkardkafkamyvonreactphp-kafka

###  Code Quality

TestsPest

### Embed Badge

![Health badge](/badges/myvon-reactphp-rdkafka/health.svg)

```
[![Health](https://phpackages.com/badges/myvon-reactphp-rdkafka/health.svg)](https://phpackages.com/packages/myvon-reactphp-rdkafka)
```

###  Alternatives

[react/http

Event-driven, streaming HTTP client and server implementation for ReactPHP

78127.2M452](/packages/react-http)[nmred/kafka-php

Kafka client for php

1.4k1.7M19](/packages/nmred-kafka-php)[clue/http-proxy-react

Async HTTP proxy connector, tunnel any TCP/IP-based protocol through an HTTP CONNECT proxy server, built on top of ReactPHP

492.3M34](/packages/clue-http-proxy-react)[clue/zlib-react

Streaming zlib compressor and decompressor for ReactPHP, supporting compression and decompression of GZIP, ZLIB and raw DEFLATE formats.

3168.2k3](/packages/clue-zlib-react)[clue/soap-react

Simple, async SOAP webservice client library, built on top of ReactPHP

64121.9k2](/packages/clue-soap-react)[leroy-merlin-br/metamorphosis

Kafka package for laravel applications

4229.8k](/packages/leroy-merlin-br-metamorphosis)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
