PHPackages                             imiphp/imi-kafka - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. imiphp/imi-kafka

ActiveLibrary[Queues &amp; Workers](/categories/queues)

imiphp/imi-kafka
================

支持在 imi 框架中使用 Kafka 客户端

v2.1.18(2y ago)3364MulanPSL-2.0PHP

Since May 4Pushed 2y ago1 watchersCompare

[ Source](https://github.com/imiphp/imi-kafka)[ Packagist](https://packagist.org/packages/imiphp/imi-kafka)[ RSS](/packages/imiphp-imi-kafka/feed)WikiDiscussions 2.0 Synced today

READMEChangelog (10)Dependencies (4)Versions (39)Used By (0)

imi-kafka
=========

[](#imi-kafka)

[![Latest Version](https://camo.githubusercontent.com/caa2483b1ccc1a243414967a10a1b701640c54d225bec0f087abd3e3113dcf0a/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f762f696d697068702f696d692d6b61666b612e737667)](https://packagist.org/packages/imiphp/imi-kafka)[![Php Version](https://camo.githubusercontent.com/4a5c2ab20974058a8bab53ecb30ac4c2e6bb961df6229b7386fdc097ab53dfa8/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f7068702d2533453d372e342d627269676874677265656e2e737667)](https://secure.php.net/)[![Swoole Version](https://camo.githubusercontent.com/f4210afc3f396a720a75802010218d847f3ba4f7ae5182c570b1fb2504ec9ec3/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f73776f6f6c652d2533453d342e372e302d627269676874677265656e2e737667)](https://github.com/swoole/swoole-src)[![IMI License](https://camo.githubusercontent.com/579cdb8e8856ddff0dfb07de2fff6a58bf4b5571a17eaf4d05a06e59be013fad/68747470733a2f2f696d672e736869656c64732e696f2f6769746875622f6c6963656e73652f696d697068702f696d692d6b61666b612e737667)](https://github.com/imiphp/imi-kafka/blob/master/LICENSE)

介绍
--

[](#介绍)

支持在 imi 框架中使用 Kafka 客户端

支持消息发布和消费

本组件基于 [龙之言](https://longlang.org/) 组织的 [longlang/phpkafka](https://github.com/longyan/phpkafka) 组件，该组件由宇润主导开发。

> 本仓库仅用于浏览，不接受 issue 和 Pull Requests，请前往：

Composer
--------

[](#composer)

本项目可以使用composer安装，遵循psr-4自动加载规则，在你的 `composer.json` 中加入下面的内容:

```
{
    "require": {
        "imiphp/imi-kafka": "~2.0.0"
    }
}
```

然后执行 `composer update` 安装。

使用说明
----

[](#使用说明)

可以参考 `example` 目录示例，包括完整的消息发布和消费功能。

在项目 `config/config.php` 中配置：

```
[
    'components'    =>  [
        // 引入组件
        'Kafka'   =>  'Imi\Kafka',
    ],
]
```

连接池配置：

```
[
    'pools'    =>    [
        'kafka'    => [
            'sync'    => [
                'pool'    => [
                    'class'        => \Imi\Kafka\Pool\KafkaSyncPool::class,
                    'config'       => [
                        'maxResources'    => 10,
                        'minResources'    => 0,
                    ],
                ],
                'resource'    => [
                    'bootstrapServers' => KAFKA_BOOTSTRAP_SERVERS,
                    'groupId'          => 'test',
                    // 其它配置请参考：https://github.com/longyan/phpkafka/blob/master/doc/consumer.md#%E9%85%8D%E7%BD%AE%E5%8F%82%E6%95%B0
                ],
            ],
            'async'    => [
                'pool'    => [
                    'class'        => \Imi\Kafka\Pool\KafkaCoroutinePool::class,
                    'config'       => [
                        'maxResources'    => 10,
                        'minResources'    => 1,
                    ],
                ],
                'resource'    => [
                    'bootstrapServers' => KAFKA_BOOTSTRAP_SERVERS,
                    'groupId'          => 'test',
                ],
            ],
        ],
    ]
]
```

默认连接池：

```
[
    'beans' =>  [
        'Kafka'  =>  [
            'defaultPoolName'   =>  'kafka',
        ],
    ],
]
```

### 生产者

[](#生产者)

```
use Imi\Kafka\Pool\KafkaPool;
use longlang\phpkafka\Producer\ProduceMessage;

// 获取生产者对象
$producer = KafkaPool::getInstance();

// 发送
$producer->send('主题 Topic', '消息内容');
// send 方法定义
// public function send(string $topic, ?string $value, ?string $key = null, array $headers = [], ?int $partitionIndex = null, ?int $brokerId = null): void

// 批量发送
$producer->sendBatch([
    new ProduceMessage($topic, 'v1', 'k1'),
    new ProduceMessage($topic, 'v2', 'k2'),
]);
// sendBatch 方法定义
// public function sendBatch(ProduceMessage[] $messages, ?int $brokerId = null): void
```

### 消费者

[](#消费者)

**消费者类：**

```
