PHPackages                             nanasen/php-rdkafka-class - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Utility &amp; Helpers](/categories/utility)
4. /
5. nanasen/php-rdkafka-class

ActiveLib[Utility &amp; Helpers](/categories/utility)

nanasen/php-rdkafka-class
=========================

class of rdkafka

1.0.4(7y ago)062PHPPHP &gt;=5.3.9

Since Oct 16Pushed 7y ago1 watchersCompare

[ Source](https://github.com/nanasen/php-rdkafka-class)[ Packagist](https://packagist.org/packages/nanasen/php-rdkafka-class)[ RSS](/packages/nanasen-php-rdkafka-class/feed)WikiDiscussions master Synced yesterday

READMEChangelog (4)DependenciesVersions (6)Used By (0)

php-rdkafka-class
-----------------

[](#php-rdkafka-class)

rdkafka的下php-rdkafka的php类库

[![kafka version support](https://camo.githubusercontent.com/1df6a7b773fc79ce74e180f19c04d3a04852138025bc3a5a57577462c8fc653b/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f6b61666b612d302e38253230302e39253230312e30253230312e312532306f72253230312e312532422d627269676874677265656e2e737667)](#) [![php version support](https://camo.githubusercontent.com/734f540f048f6670f415e9e24d62eea40f1e25add9980f2b9f3df426d330db17/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f7068702d352e332532422d677265656e2e737667)](#) [![librdkafka version support](https://camo.githubusercontent.com/45485b9508692364df5b9dec174d4050649913d3d3e4550e8029e18487b317d9/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f6c696272646b61666b612d332e302e352532422d79656c6c6f77677265656e2e737667)](#) [![php-librdkafka](https://camo.githubusercontent.com/38175fe86b062a48d63b0d2913b3af86896c5914c47e29eaba1cfa9852d52052/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f7068702d2d6c696272646b61666b612d332e302e352532422d6f72616e67652e737667)](#)

PS:原作者--&gt;

目录
--

[](#目录)

1. [安装](#%E5%AE%89%E8%A3%85)
2. [使用](#%E4%BD%BF%E7%94%A8)
    - [消费](#%E6%B6%88%E8%B4%B9)
    - [生产](#%E7%94%9F%E4%BA%A7)
3. [更多配置](#%E6%9B%B4%E5%A4%9A%E9%85%8D%E7%BD%AE)

安装
--

[](#安装)

> 具体查看librdkafk和php-rdkafka

使用
--

[](#使用)

### 消费

[](#消费)

```
# setTopic('qkl01', 0, $offset)  设置-1，从最后一次服务器记录一次消费开始消费
$offset = -1; //开始消费点
$consumer = new \Msg\Kafka\Consumer(['ip'=>'192.168.216.122']);
$consumer->setConsumerGroup('test-110-sxx1')
     ->setBrokerServer('192.168.216.122')
     ->setConsumerTopic()
     ->setTopic('qkl01', 0, $offset)
     ->subscribe(['qkl01'])
     ->consumer(function($msg){
         var_dump($msg);
     });

```

### 生产

[](#生产)

```
$config = [
    'ip'=>'127.0.0.1',
    'dr_msg_cb' => function($kafka, $message) {
        // var_dump((array)$message);
        //todo
        //do biz something, don't exit() or die()
    }
];
$producer = new \Msg\Kafka\Producer($config);
$topic = $producer->setBrokerServer()->setProducerTopic('test');
// $rst = $producer->setBrokerServer()
//                  ->setProducerTopic('test')
//                  ->producer('test', 90);
$start = microtime(true);
for($i = 0; $i < 10; ++$i){
    $topic->producer('test'.$i, 90);
}

while($producer->getOutQLength() > 0){
    $producer->poll(50);
}

```

更多配置
----

[](#更多配置)

```
$defaultConfig = [
    'ip'=>'127.0.0.1',  #默认服务器地址
    'log_path'=> sys_get_temp_dir(),  #日志默认地址
    'dr_msg_cb' => [$this, 'defaultDrMsg'],  #生产的dr回调
    'error_cb' => [$this, 'defaultErrorCb'],  #错误回调
    'rebalance_cb' => [$this, 'defaultRebalance']  #负载回调，你可以用匿名方法自定义
];

# broker相关配置，你可以参考Configuration.md
$brokerConfig = [
    'request.required.acks'=> -1,
    'auto.commit.enable'=> 1,
    'auto.commit.interval.ms'=> 100,
    'offset.store.method'=> 'broker',
    'offset.store.path'=> sys_get_temp_dir(),
    'auto.offset.reset'=> 'smallest',
];

```

### defaultDrMsg

[](#defaultdrmsg)

```
function defaultDrMsg($kafka, $message) {
    file_put_contents($this->config['log_path'] . "/dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
}

```

### defaultErrorCb

[](#defaulterrorcb)

```
function defaultErrorCb($kafka, $err, $reason) {
    file_put_contents($this->config['log_path'] . "/err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
}

```

### defaultRebalance

[](#defaultrebalance)

```
function defaultRebalance(\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null)
{
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            if (is_null($this->getCurrentTopic())) {
                $kafka->assign();
            } else {
                $kafka->assign([
                    new \RdKafka\TopicPartition( $this->getCurrentTopic(), $this->getPartition($this->getCurrentTopic()), $this->getOffset($this->getCurrentTopic()) )
                ]);
            }
            break;

        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            echo "Revoke: ";
            var_dump($partitions);
            $kafka->assign(NULL);
            break;

        default:
            throw new \Exception($err);
    }
}

```

###  Health Score

27

—

LowBetter than 47% of packages

Maintenance20

Infrequent updates — may be unmaintained

Popularity8

Limited adoption so far

Community9

Small or concentrated contributor base

Maturity61

Established project with proven stability

 Bus Factor1

Top contributor holds 94.4% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~7 days

Total

5

Last Release

2788d ago

### Community

Maintainers

![](https://avatars.githubusercontent.com/u/9363587?v=4)[nanasen](/maintainers/nanasen)[@nanasen](https://github.com/nanasen)

---

Top Contributors

[![nanasen](https://avatars.githubusercontent.com/u/9363587?v=4)](https://github.com/nanasen "nanasen (17 commits)")[![qkl9527](https://avatars.githubusercontent.com/u/35894136?v=4)](https://github.com/qkl9527 "qkl9527 (1 commits)")

### Embed Badge

![Health badge](/badges/nanasen-php-rdkafka-class/health.svg)

```
[![Health](https://phpackages.com/badges/nanasen-php-rdkafka-class/health.svg)](https://phpackages.com/packages/nanasen-php-rdkafka-class)
```

###  Alternatives

[wireui/breadcrumbs

Breadcrumbs component for Laravel, Livewire, TallStack

3818.0k](/packages/wireui-breadcrumbs)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
