PHPackages                             octopus-utf8/kafka - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Utility &amp; Helpers](/categories/utility)
4. /
5. octopus-utf8/kafka

ActiveLibrary[Utility &amp; Helpers](/categories/utility)

octopus-utf8/kafka
==================

Rdkafka编辑器类提示、class类使用封装

41PHP

Since Aug 20Pushed 6y ago1 watchersCompare

[ Source](https://github.com/rungao/phpkafka)[ Packagist](https://packagist.org/packages/octopus-utf8/kafka)[ RSS](/packages/octopus-utf8-kafka/feed)WikiDiscussions master Synced yesterday

READMEChangelogDependenciesVersions (1)Used By (0)

stubs目录
-------

[](#stubs目录)

封装开源的kwn/php-rdkafka-stubs库，加入后kafka相关的类、函数使用会有对应的编辑提示， 支持主流的phostrom等编辑器

class目录
-------

[](#class目录)

1、rdkafka封装的PHP类库，支持自定义主题生产，消费支持高阶、低阶的消费，能方面的进行重跑数据支持。 2、支持提供错误的回调方法封装，方面将错误信息写入到业务对应的日志目录中。

[![kafka version support](https://camo.githubusercontent.com/1df6a7b773fc79ce74e180f19c04d3a04852138025bc3a5a57577462c8fc653b/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f6b61666b612d302e38253230302e39253230312e30253230312e312532306f72253230312e312532422d627269676874677265656e2e737667)](#) [![php version support](https://camo.githubusercontent.com/734f540f048f6670f415e9e24d62eea40f1e25add9980f2b9f3df426d330db17/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f7068702d352e332532422d677265656e2e737667)](#) [![librdkafka version support](https://camo.githubusercontent.com/45485b9508692364df5b9dec174d4050649913d3d3e4550e8029e18487b317d9/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f6c696272646b61666b612d332e302e352532422d79656c6c6f77677265656e2e737667)](#) [![php-librdkafka](https://camo.githubusercontent.com/38175fe86b062a48d63b0d2913b3af86896c5914c47e29eaba1cfa9852d52052/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f7068702d2d6c696272646b61666b612d332e302e352532422d6f72616e67652e737667)](#)

扩展安装
----

[](#扩展安装)

> pecl install rdkafka

库安装
---

[](#库安装)

> composer require OctopusUtf8/Kafka

使用方法
----

[](#使用方法)

### 高阶消费者示例

[](#高阶消费者示例)

```
use Octopus\Consumer;
$config = [
    'brokers' => 'localhost:9092',
    'log_level' => LOG_DEBUG
];
$consumer = new Consumer($config);
$consumer->setConsumerGroup('ts_click_group')
    ->setBrokerServer($config['brokers'])
    ->setTopic('ts_click')
    ->subscribe('ts_click')
    ->consumer(function($msg){
        var_dump($msg);
    });
```

### 低阶消费者示例

[](#低阶消费者示例)

```
use Octopus\Consumer;
$config = [
    'brokers' => 'localhost:9092',
    'log_level' => LOG_DEBUG
];
$consumer = new Consumer($config);
$consumer->setConsumerGroup('ts_click_group')
    ->setBrokerServer($config['brokers'])
    // 自定义设置分区，消费开始点
    ->setTopic('ts_click', 0)
    ->subscribe('ts_click', Consumer::LOW_LEVEL)
    ->consumer(function($msg){
        // 实体业务处理代码
        var_dump($msg);
    });
```

### 生产者示例

[](#生产者示例)

```
use Octopus\Producer;
$config = [
    'brokers' => 'localhost:9092',
    'log_level' => LOG_DEBUG
];
$producer = new Octopus\Producer($config);
$producer->setBrokerServer()
    ->setProducerTopic('ts_click')
    ->producer($msg);
```

初始化类更多配置支持
----------

[](#初始化类更多配置支持)

```
$config = [
    // consumer超时时间(s)
    'timeout' > 12,
    // 生产的dr回调
    'dr_msg_cb' => [$this, 'defaultDrMsg'],
    // 错误回调
    'error_cb' => [$this, 'defaultErrorCb'],
    // 负载回调，你可以用匿名方法自定义
    'rebalance_cb' => [$this, 'defaultRebalance']
];

# 更多配置，参考https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
```

###  Health Score

18

—

LowBetter than 8% of packages

Maintenance20

Infrequent updates — may be unmaintained

Popularity5

Limited adoption so far

Community4

Small or concentrated contributor base

Maturity36

Early-stage or recently created project

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

### Community

Maintainers

![](https://www.gravatar.com/avatar/b2f9c2dfad90089ab4256b11881283f839d32fe52b140e3532a61fe88127f253?d=identicon)[rungao](/maintainers/rungao)

### Embed Badge

![Health badge](/badges/octopus-utf8-kafka/health.svg)

```
[![Health](https://phpackages.com/badges/octopus-utf8-kafka/health.svg)](https://phpackages.com/packages/octopus-utf8-kafka)
```

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
