PHPackages                             anktx/kafka-client - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [HTTP &amp; Networking](/categories/http)
4. /
5. anktx/kafka-client

ActiveLibrary[HTTP &amp; Networking](/categories/http)

anktx/kafka-client
==================

PHP wrapper for RdKafka

0.3.2(3mo ago)036↓50%BSD-3-ClausePHPPHP ^8.4

Since Jul 19Pushed 3mo ago1 watchersCompare

[ Source](https://github.com/anktx/kafka-client)[ Packagist](https://packagist.org/packages/anktx/kafka-client)[ RSS](/packages/anktx-kafka-client/feed)WikiDiscussions master Synced 1mo ago

READMEChangelogDependencies (5)Versions (10)Used By (0)

PHP Kafka Client
================

[](#php-kafka-client)

Обёртка над `ext-rdkafka` для работы с Apache Kafka на PHP. Библиотека предоставляет простой и удобный интерфейс для продюсирования и консьюминга сообщений.

Требования
----------

[](#требования)

- PHP 8.4+
- ext-rdkafka

Установка
---------

[](#установка)

```
composer require anktx/kafka-client
```

Быстрый старт
-------------

[](#быстрый-старт)

### Producer

[](#producer)

```
use Anktx\Kafka\Client\Config\ProducerConfig;
use Anktx\Kafka\Client\Config\Enum\CompressionType;
use Anktx\Kafka\Client\KafkaProducer;
use Anktx\Kafka\Client\KafkaMessage\KafkaProducerMessage;

$producer = new KafkaProducer(
    new ProducerConfig(
        brokers: 'kafka:9092',
        compressionType: CompressionType::snappy,
    )
);

$producer->produce(
    new KafkaProducerMessage(
        topic: 'events',
        body: json_encode(['event' => 'order_created', 'id' => 123]),
        key: 'order-123',
        headers: ['source' => 'api'],
    )
);

$producer->flush();
```

### Consumer

[](#consumer)

```
use Anktx\Kafka\Client\Config\ConsumerConfig;
use Anktx\Kafka\Client\Config\Enum\OffsetReset;
use Anktx\Kafka\Client\ConsumeResult\KafkaConsumeTimeout;
use Anktx\Kafka\Client\KafkaConsumer;
use Anktx\Kafka\Client\KafkaMessage\KafkaConsumerMessage;
use Anktx\Kafka\Client\TopicSubscription\TopicSubscription;
use Anktx\Kafka\Client\TopicSubscription\TopicSubscriptionList;

$consumer = new KafkaConsumer(
    new ConsumerConfig(
        brokers: 'kafka:9092',
        groupId: 'order-processor',
        instanceId: 'worker-1',
        offsetReset: OffsetReset::latest,
    )
);

$consumer->subscribe(
    new TopicSubscriptionList(
        new TopicSubscription(topic: 'events'),
    )
);

while (true) {
    $result = $consumer->consume();

    if ($result instanceof KafkaConsumerMessage) {
        echo $result->body . "\n";
        // ... обработка сообщения ...

        $consumer->commit($result);
    }
}
```

### Message Stream

[](#message-stream)

Для более чистого кода используйте генератор:

```
use Anktx\Kafka\Client\KafkaMessageStream;

$stream = new KafkaMessageStream($consumer);

foreach ($stream->stream() as $message) {
    // Только сообщения, без обработки таймаутов/EOF
    echo $message->body . "\n";
    $consumer->commit($message);
}
```

Стратегии опроса (Poll Strategies)
----------------------------------

[](#стратегии-опроса-poll-strategies)

При отправке сообщений они попадают в локальную очередь, а затем асинхронно отправляются в Kafka. Метод `poll()` обслуживает эту очередь — обрабатывает отчёты о доставке и освобождает память. Если не вызывать `poll()`, очередь может переполниться.

Стратегии определяют, когда вызывать `poll()`:

```
use Anktx\Kafka\Client\PollStrategy\TimeoutPollStrategy;
use Anktx\Kafka\Client\PollStrategy\ProbabilityPollStrategy;

// Опрос каждые N секунд
$producer = new KafkaProducer(
    $config,
    new TimeoutPollStrategy(pollIntervalSec: 1),
);

// Опрос с вероятностью N (0.0 - 1.0)
$producer = new KafkaProducer(
    $config,
    new ProbabilityPollStrategy(probability: 0.1),
);
```

**Доступные стратегии:**

- `NeverPoolStrategy` — не вызывать `poll()` (по умолчанию, подходит для низкой нагрузки)
- `TimeoutPollStrategy` — вызывать `poll()` каждые N секунд
- `ProbabilityPollStrategy` — вызывать `poll()` с вероятностью N (например, 10% вызовов)

Конфигурация
------------

[](#конфигурация)

### ProducerConfig

[](#producerconfig)

```
$config = new ProducerConfig(
    brokers: string,                    // Обязательно
    queueBufferingMaxKBytes: int,       // По умолчанию: 20480
    batchSize: int,                     // По умолчанию: 102400
    lingerMs: int,                      // По умолчанию: 10
    compressionType: CompressionType,   // По умолчанию: snappy
    isDebug: bool,                      // По умолчанию: false
    logger: LoggerInterface,            // По умолчанию: NullLogger
);
```

### ConsumerConfig

[](#consumerconfig)

```
$config = new ConsumerConfig(
    brokers: string,                    // Обязательно
    groupId: string,                    // Обязательно
    instanceId: string,                 // Обязательно
    offsetReset: OffsetReset,           // По умолчанию: earliest
    autoCommitMs: ?int,                 // По умолчанию: null (ручной коммит)
    sessionTimeoutMs: ?int,             // По умолчанию: null
    isDebug: bool,                      // По умолчанию: false
    logger: LoggerInterface,            // По умолчанию: NullLogger
);
```

Типы возвращаемых значений
--------------------------

[](#типы-возвращаемых-значений)

Метод `consume()` возвращает union type:

- `KafkaConsumerMessage` — успешно полученное сообщение
- `KafkaConsumeTimeout` — таймаут (нет новых сообщений)
- `KafkaPartitionEof` — достигнут конец партиции

Пример обработки:

```
$result = $consumer->consume(1000);

if ($result instanceof KafkaConsumerMessage) {
    // Обработка сообщения
    $consumer->commit($result);
} elseif ($result instanceof KafkaConsumeTimeout) {
    // Нет сообщений, можно продолжить работу
}
```

Структура проекта
-----------------

[](#структура-проекта)

```
src/
├── Config/                          # Конфигурация
│   ├── ConsumerConfig.php           # Конфигурация консьюмера
│   ├── ProducerConfig.php           # Конфигурация продюсера
│   └── Enum/                        # Перечисления
│       ├── CompressionType.php      # Типы компрессии (snappy, gzip, lz4, zstd)
│       └── OffsetReset.php          # Стратегия сброса оффсета (latest, earliest)
│
├── ConsumeResult/                   # Результаты консьюминга
│   ├── KafkaConsumeTimeout.php      # Таймаут (нет сообщений)
│   └── KafkaPartitionEof.php        # Достигнут конец партиции
│
├── Exception/                       # Исключения
│   ├── Business/                    # Бизнес-логика
│   ├── Kafka/                       # Ошибки Kafka
│   └── Logic/                       # Логические ошибки
│
├── KafkaMessage/                    # Сообщения
│   ├── AbstractMessage.php          # Базовый класс
│   ├── KafkaConsumerMessage.php     # Сообщение консьюмера
│   └── KafkaProducerMessage.php     # Сообщение продюсера
│
├── PollStrategy/                    # Стратегии опроса очереди
│   ├── PollStrategy.php             # Интерфейс стратегии
│   ├── NeverPoolStrategy.php        # Не вызывать poll()
│   ├── ProbabilityPollStrategy.php  # Вызывать с вероятностью N
│   └── TimeoutPollStrategy.php      # Вызывать каждые N секунд
│
├── TopicSubscription/               # Подписки на топики
│   ├── TopicSubscription.php        # Одна подписка
│   └── TopicSubscriptionList.php    # Список подписок
│
├── KafkaConsumer.php                # Главный класс консьюмера
├── KafkaProducer.php                # Главный класс продюсера
└── KafkaMessageStream.php           # Генератор для стриминга сообщений

```

Обработка исключений
--------------------

[](#обработка-исключений)

Библиотека использует иерархию исключений:

```
Exception
├── KafkaException                    # Ошибки Kafka
│   ├── KafkaConnectionException      # Потеряно соединение
│   ├── KafkaConsumerException        # Ошибка консьюмера
│   └── KafkaProducerException        # Ошибка продюсера
├── LogicException                    # Логические ошибки
│   └── NotSubscribedException        # Не подписан на топики
└── BusinessException                 # Бизнес-логика
    ├── EmptySubscriptionsException   # Пустой список подписок
    └── TopicHasNoPartitionException  # Топик не имеет партиций

```

Пример обработки:

```
try {
    $producer->produce($message);
    $producer->flush();
} catch (KafkaConnectionException $e) {
    // Потеряно соединение с Kafka
} catch (KafkaProducerException $e) {
    // Ошибка отправки сообщения
}
```

###  Health Score

39

—

LowBetter than 86% of packages

Maintenance82

Actively maintained with recent releases

Popularity8

Limited adoption so far

Community4

Small or concentrated contributor base

Maturity53

Maturing project, gaining track record

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~71 days

Recently: every ~139 days

Total

9

Last Release

96d ago

PHP version history (2 changes)0.1.0PHP ^8.1

0.2.0PHP ^8.4

### Community

Maintainers

![](https://www.gravatar.com/avatar/7415f6a14a4b4ce4c04c84b6e1e9b7ca84b5ea6b899abff07f57d0632a911e08?d=identicon)[anktx](/maintainers/anktx)

---

Tags

kafkaphpwrapperclientkafkaredpanda

###  Code Quality

TestsPHPUnit

Static AnalysisPHPStan

Code StylePHP CS Fixer

Type Coverage Yes

### Embed Badge

![Health badge](/badges/anktx-kafka-client/health.svg)

```
[![Health](https://phpackages.com/badges/anktx-kafka-client/health.svg)](https://phpackages.com/packages/anktx-kafka-client)
```

###  Alternatives

[php-http/httplug

HTTPlug, the HTTP client abstraction for PHP

2.6k307.6M679](/packages/php-http-httplug)[php-http/discovery

Finds and installs PSR-7, PSR-17, PSR-18 and HTTPlug implementations

1.3k309.5M1.2k](/packages/php-http-discovery)[php-http/client-common

Common HTTP Client implementations and tools for HTTPlug

1.1k225.5M571](/packages/php-http-client-common)[nmred/kafka-php

Kafka client for php

1.5k1.7M18](/packages/nmred-kafka-php)[react/http

Event-driven, streaming HTTP client and server implementation for ReactPHP

78026.4M414](/packages/react-http)[smi2/phpclickhouse

PHP ClickHouse Client

84310.1M71](/packages/smi2-phpclickhouse)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
