PHPackages                             ccinn/kafka-swoole - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [DevOps &amp; Deployment](/categories/devops)
4. /
5. ccinn/kafka-swoole

AbandonedArchivedApplication[DevOps &amp; Deployment](/categories/devops)

ccinn/kafka-swoole
==================

Kafka client for php

3157PHP

Since Dec 24Pushed 5y ago1 watchersCompare

[ Source](https://github.com/whiteCcinn/kafka-swoole)[ Packagist](https://packagist.org/packages/ccinn/kafka-swoole)[ RSS](/packages/ccinn-kafka-swoole/feed)WikiDiscussions master Synced yesterday

READMEChangelogDependenciesVersions (1)Used By (0)

kafka-swoole
============

[](#kafka-swoole)

🌈The first php-kafka client to support *multiple compression* forms

Implement all kafka protocols, providing 'HighLevel' and 'LowLevel' client apis respectively, and utilize swoole to realize collaboration and flexibly extend consumers' client

> If you would like to contribute code to help me speed up my progress, please contact me at email:

core framework：[kafka-swoole-core](https://github.com/whiteCcinn/kafka-swoole-core)

Install
-------

[](#install)

### by composer

[](#by-composer)

```
version=dev-master;composer create-project ccinn/kafka-swoole kafka-swoole ${version}
```

by git
------

[](#by-git)

```
git clone https://github.com/whiteCcinn/kafka-swoole.git && cd kafka-swoole && composer install
```

docker
------

[](#docker)

```
docker build -t kafka-swoole:latest .
docker run -it --name kafka-swoole -v $(PWD):/data/www kafka-swoole:latest bash
```

rendering
---------

[](#rendering)

### A member of the consumer group

[](#a-member-of-the-consumer-group)

```
Topic:caiwenhui	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: caiwenhui	Partition: 0	Leader: 1004	Replicas: 1004	Isr: 1004
```

[![kafka-client](https://camo.githubusercontent.com/dfe89e696c5a4829dbb623cb68bdbbbc9413d053e8723644b8f7d4859f02a2b1/687474703a2f2f672e7265636f726469742e636f2f637951724d48415761652e676966)](https://camo.githubusercontent.com/dfe89e696c5a4829dbb623cb68bdbbbc9413d053e8723644b8f7d4859f02a2b1/687474703a2f2f672e7265636f726469742e636f2f637951724d48415761652e676966)

[![kafka-client-2](https://camo.githubusercontent.com/f945e04b58f5f5eae506e3cc02c75a6112f0fc882013ae6b4dd0e8bc84554499/687474703a2f2f672e7265636f726469742e636f2f365146536a6c3776536f2e676966)](https://camo.githubusercontent.com/f945e04b58f5f5eae506e3cc02c75a6112f0fc882013ae6b4dd0e8bc84554499/687474703a2f2f672e7265636f726469742e636f2f365146536a6c3776536f2e676966)

### Multiple members of the consumer group

[](#multiple-members-of-the-consumer-group)

```
Topic:kafka-swoole	PartitionCount:4	ReplicationFactor:2	Configs:
	Topic: kafka-swoole	Partition: 0	Leader: 1003	Replicas: 1003,1002	Isr: 1003,1002
	Topic: kafka-swoole	Partition: 1	Leader: 1004	Replicas: 1004,1003	Isr: 1004,1003
	Topic: kafka-swoole	Partition: 2	Leader: 1001	Replicas: 1001,1004	Isr: 1001,1004
	Topic: kafka-swoole	Partition: 3	Leader: 1002	Replicas: 1002,1001	Isr: 1001,1002
```

- KAFKA\_CLIENT\_CONSUMER\_NUM=2
- KAFKA\_CLIENT\_CONSUMER\_NUM=4

[![kafka-client-3](https://camo.githubusercontent.com/85c1cfb393434422acc4f3aec65186764e001c2650a6497bdf18f10d38d00a35/687474703a2f2f672e7265636f726469742e636f2f52655274517a62594b492e676966)](https://camo.githubusercontent.com/85c1cfb393434422acc4f3aec65186764e001c2650a6497bdf18f10d38d00a35/687474703a2f2f672e7265636f726469742e636f2f52655274517a62594b492e676966)

Command
-------

[](#command)

### Produce

[](#produce)

`php bin/kafka-client kafka.produce [options] [--] `

```
php bin/kafka-client kafka.produce --help

Description:
  Send a message

Usage:
  kafka.produce [options] [--]

Arguments:
  message                      The message you wish to send.

Options:
  -t, --topic[=TOPIC]          Which is the topic you want to send?
  -p, --partition[=PARTITION]  Which is the topic you want to send to partition?
  -k, --key[=KEY]              Which is the topic you want to send to partition by key?
  -h, --help                   Display this help message
  -q, --quiet                  Do not output any message
  -V, --version                Display this application version
      --ansi                   Force ANSI output
      --no-ansi                Disable ANSI output
  -n, --no-interaction         Do not ask any interactive question
  -v|vv|vvv, --verbose         Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Help:
  This command will help you send separate messages to a topic..
```

### DescribeGroups

[](#describegroups)

See consumer group details

```
Description:
  See consumer group details

Usage:
  kafka.describeGroups [options]

Options:
  -t, --topic=TOPIC     Which topic is subscribed by the consumer group?
  -g, --group=GROUP     Which consumer group?
  -h, --help            Display this help message
  -q, --quiet           Do not output any message
  -V, --version         Display this application version
      --ansi            Force ANSI output
      --no-ansi         Disable ANSI output
  -n, --no-interaction  Do not ask any interactive question
  -v|vv|vvv, --verbose  Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Help:
  See consumer group details...
```

```
php bin/kafka-client kafka.describeGroups -t mulog_clean_24 -g kafka-swoole

DescribeGruops-BaseInfo
=======================

 -------------- ------------ -------------- --------------
  groupId        groupState   protocolType   protocolData
 -------------- ------------ -------------- --------------
  kafka-swoole   Stable       consumer       Range
 -------------- ------------ -------------- --------------

 --------------------------------------------------- -------------- -------------- ---------------- -----------
  memberId                                            clientId       clientHost     topcic           paritions
 --------------------------------------------------- -------------- -------------- ---------------- -----------
  kafka-swoole-44857c49-b019-439b-90dd-d71112b2c01e   kafka-swoole   /192.167.8.2   mulog_clean_24   0,1
 --------------------------------------------------- -------------- -------------- ---------------- -----------

 --------------------------------------------------- -------------- -------------- ---------------- -----------
  memberId                                            clientId       clientHost     topcic           paritions
 --------------------------------------------------- -------------- -------------- ---------------- -----------
  kafka-swoole-5714cd77-a0dd-4d29-aa20-718f9d713908   kafka-swoole   /192.167.8.2   mulog_clean_24   2,3
 --------------------------------------------------- -------------- -------------- ---------------- -----------
```

### Rpc

[](#rpc)

Support real-time acquisition of data in runtime, interaction through RPC protocol with AF\_UNIX interprocess communication

```
php bin/kafka-client rpc -h
Description:
  Built-in runtime RPC command

Usage:
  rpc

Arguments:
  type                  which you want to execute command?

Options:
  -h, --help            Display this help message
  -q, --quiet           Do not output any message
  -V, --version         Display this application version
      --ansi            Force ANSI output
      --no-ansi         Disable ANSI output
  -n, --no-interaction  Do not ask any interactive question
  -v|vv|vvv, --verbose  Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Help:
  The following are the built-in RPC command options：
  kafka_lag
  offset_checker
  block_size
  member_leader
  metadata_brokers
  metadata_topics

```

[![rpc](https://camo.githubusercontent.com/a31edc90c4b1ac8e4518ef3fe828e38f6df2bfc2/687474703a2f2f672e7265636f726469742e636f2f6e79334e33456468786e2e676966)](https://camo.githubusercontent.com/a31edc90c4b1ac8e4518ef3fe828e38f6df2bfc2/687474703a2f2f672e7265636f726469742e636f2f6e79334e33456468786e2e676966)

- kafka\_lag（Check the total difference between the current offset and the maximum offset in kafka）

```
php bin/kafka-client rpc kafka_lag
1000
```

- offset\_checker(View the details of the current offset and the offset and difference in the kafka service for each partition of the topic)

```
php bin/kafka-client rpc offset_checker
 -------------- ----------- ---------------- ------------------ -----------------
  topic          partition   current-offset   kafka-max-offset   remaining-count
 -------------- ----------- ---------------- ------------------ -----------------
  kafka-swoole   2           50223            50223              0
  kafka-swoole   3           70353            70353              0
  kafka-swoole   0           52395            52395              0
  kafka-swoole   1           50407            50407              0
 -------------- ----------- ---------------- ------------------ -----------------
```

- block\_size(If you are using storage media in indirect mode, you can use this command to see the current number of storage media)

```
php bin/kafka-client rpc block_size
254
```

- member\_leader（View the Leader of the current consumer group）

```
php bin/kafka-client rpc member_leader
 ---------------------------------------------------
  consumer-group-leaderId
 ---------------------------------------------------
  kafka-swoole-da43c9a0-b12d-46df-9941-ee80456ec9a2
 ---------------------------------------------------

 ---------------------------------------------------
  consumer-group-membersId
 ---------------------------------------------------
  kafka-swoole-6080eb8e-3bfb-4be0-a923-037bb99a2666
  kafka-swoole-da43c9a0-b12d-46df-9941-ee80456ec9a2
 ---------------------------------------------------
```

- metadata\_brokers(View available brokers for the kafka service)

```
php bin/kafka-client rpc metadata_brokers
 --------- --------- ------
  node-id   host      port
 --------- --------- ------
  1003      mkafka3   9092
  1004      mkafka4   9092
  1001      mkafka1   9092
  1002      mkafka2   9092
 --------- --------- ------
```

- metadata\_topicss(See the subscribed topic for more details)

```
php bin/kafka-client rpc metadata_topics
 -------------- ----------- ----------- --------------- -----------
  topic          partition   leader-id   replica-nodes   isr-nodes
 -------------- ----------- ----------- --------------- -----------
  kafka-swoole   2           1001        1001,1004       1001,1004
  kafka-swoole   1           1004        1004,1003       1004,1003
  kafka-swoole   3           1002        1002,1001       1002,1001
  kafka-swoole   0           1003        1003,1002       1002,1003
 -------------- ----------- ----------- --------------- -----------
```

### Consumer

[](#consumer)

`php bin/kafka-client start`

Config
------

[](#config)

### Common

[](#common)

#### Options

[](#options)

FILE: `config/common.yaml`

```
# Your kafka version
kafka.version: 0.9.0.0
# This is for bootstrapping and the producer will only use it for getting metadata
# (topics, partitions and replicas). The socket connections for sending the actual data
# will be established based on the broker information returned in the metadata. The
# format is host1:port1,host2:port2, and the list can be a subset of brokers or
# a VIP pointing to a subset of brokers.
metadata.broker.list: "mkafka1:9092,mkafka2:9092,mkafka3:9092,mkafka4:9092"

# The producer generally refreshes the topic metadata from brokers when there is a failure
# (partition missing, leader not available...). It will also poll regularly (default: every 10min
# so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure.
# If you set this to zero, the metadata will get refreshed after each message sent (not recommended)
# Important note: the refresh happen only AFTER the message is sent, so if the producer never sends
# a message the metadata is never refreshed
topic.metadata.refresh.interval.ms: 60 * 10 * 1000

# a string that uniquely identifies a set of consumers within the same consumer group
group.id: "kafka-swoole"

# Which you want to operation
# Example: topic1,topic2,topic3,topic4,topic5
topic.names: "kafka-swoole"

# The periodic time the heartbeat request is sent to tell kafka that the client still exists
heartbeat.interval.ms: 10 * 1000

# The maximum allowed time within a group for which no heartbeat is sent
# So this value must be greater than the heartbeat sending cycle：heartbeat.interval.ms
# (default: 30s)
group.keep.session.max.ms: 30 * 1000

# Select a strategy for assigning partitions to consumer streams. See ProtocolPartitionAssignmentStrategyEnum::class
# Range：
# RoundRobin：
# Sticky：
partition.assignment.strategy: "Range"

# highLevelApi the frequency in ms that the consumer offsets are committed to zookeeper
auto.commit.interval.ms: 10 * 1000

# smallest : automatically reset the offset to the smallest offset
# largest : automatically reset the offset to the largest offset
auto.offset.reset: largest
```

Support compression protocol
----------------------------

[](#support-compression-protocol)

- nomoral
- gzip
- snappy (If you need snappy compressed data, you will need to install an additional [php-ext-snappy](https://github.com/kjdev/php-ext-snappy) extension)

Protocol
--------

[](#protocol)

- Produce
- Fetch
- ListOffsets
- Metadata
- LeaderAndIsr
- StopReplica
- UpdateMetadata
- ControlledShutdown
- OffsetCommit
- OffsetFetch
- FindCoordinator
- JoinGroup
- Heartbeat
- Heartbeat
- LeaveGroup
- SyncGroup
- DescribeGroups
- ListGroups
- ListGroups
- SaslHandshake
- ApiVersions
- CreateTopics
- DeleteTopics
- DeleteRecords
- InitProducerId
- OffsetForLeaderEpoch
- AddPartitionsToTxn
- AddOffsetsToTxn
- EndTxn
- WriteTxnMarkers
- TxnOffsetCommit
- DescribeAcls
- CreateAcls
- DeleteAcls
- DescribeConfigs
- AlterConfigs
- AlterReplicaLogDirs
- DescribeLogDirs
- SaslAuthenticate
- CreatePartitions
- CreateDelegationToken
- RenewDelegationToken
- ExpireDelegationToken
- DescribeDelegationToken
- DeleteGroups
- ElectPreferredLeaders
- IncrementalAlterConfigs

### usage

[](#usage)

The idea here is that the examples are initiated based on the API protocol, not the client API. As an example, here we start a request for ListOffsetsRequest.

```
$protocol = new ListOffsetsRequest();
$partitions = [];
array_push($partitions,
    (new PartitionsListsOffsets())->setPartition(Int32::value(0))
                                  ->setMaxNumOffsets(Int32::value(10))
                                  ->setTimestamp(Int64::value(time()))
);
$topics = [];
array_push($topics,
    (new TopicsListsOffsets())->setTopic(String16::value('caiwenhui'))
                              ->setPartitions($partitions)
);
$protocol->setRequestHeader(
    (new RequestHeader())->setApiVersion(Int16::value(ProtocolVersionEnum::API_VERSION_0))
                         ->setClientId(String16::value('kafka-swoole'))
                         ->setCorrelationId(Int32::value(ProtocolEnum::LIST_OFFSETS))
                         ->setApiKey(Int16::value(ProtocolEnum::LIST_OFFSETS))
);
$protocol->setReplicaId(Int32::value(-1));
$protocol->setTopics($topics);

$payload = $protocol->pack();
$n = $socket->send($payload);

$data = $socket->recv();
$protocol->response->unpack($data);
var_dump($protocol->response,$protocol->response->toArray()); // Here you can see the response protocol of the kafka service
/*
object(Kafka\Protocol\Response\ListOffsetsResponse)#46 (3) {
  ["responses":"Kafka\Protocol\Response\ListOffsetsResponse":private]=>
  array(1) {
    [0]=>
    object(Kafka\Protocol\Response\ListOffsets\ResponsesListOffsets)#68 (2) {
      ["topic":"Kafka\Protocol\Response\ListOffsets\ResponsesListOffsets":private]=>
      object(Kafka\Protocol\Type\String16)#72 (1) {
        ["value":protected]=>
        string(9) "caiwenhui"
      }
      ["partitionResponses":"Kafka\Protocol\Response\ListOffsets\ResponsesListOffsets":private]=>
      array(1) {
        [0]=>
        object(Kafka\Protocol\Response\ListOffsets\PartitionsResponsesListOffsets)#71 (3) {
          ["partition":"Kafka\Protocol\Response\ListOffsets\PartitionsResponsesListOffsets":private]=>
          object(Kafka\Protocol\Type\Int32)#79 (1) {
            ["value":protected]=>
            int(0)
          }
          ["errorCode":"Kafka\Protocol\Response\ListOffsets\PartitionsResponsesListOffsets":private]=>
          object(Kafka\Protocol\Type\Int16)#78 (1) {
            ["value":protected]=>
            int(3)
          }
          ["offsets":"Kafka\Protocol\Response\ListOffsets\PartitionsResponsesListOffsets":private]=>
          array(0) {
          }
        }
      }
    }
  }
  ["responseHeader":protected]=>
  object(Kafka\Protocol\Response\Common\ResponseHeader)#58 (1) {
    ["correlationId":"Kafka\Protocol\Response\Common\ResponseHeader":private]=>
    object(Kafka\Protocol\Type\Int32)#64 (1) {
      ["value":protected]=>
      int(2)
    }
  }
  ["size":protected]=>
  object(Kafka\Protocol\Type\Int32)#63 (1) {
    ["value":protected]=>
    int(33)
  }
}

array(3) {
  ["responses"]=>
  array(1) {
    [0]=>
    array(2) {
      ["topic"]=>
      string(9) "caiwenhui"
      ["partitionResponses"]=>
      array(1) {
        [0]=>
        array(3) {
          ["partition"]=>
          int(0)
          ["errorCode"]=>
          int(3)
          ["offsets"]=>
          array(0) {
          }
        }
      }
    }
  }
  ["responseHeader"]=>
  array(1) {
    ["correlationId"]=>
    int(2)
  }
  ["size"]=>
  int(33)
}
*/
```

Env
---

[](#env)

```
# APP
APP_NAME=kafka-swoole
## zh_CN,en_US
APP_LANGUGE=en_US

# SERVER
# The server just only receive rpc request select data in memory
SERVER_IP=127.0.0.1
SERVER_PORT=9501
SERVER_REACTOR_NUM=1
SERVER_WORKER_NUM=1
SERVER_MAX_REQUEST=50

# KAFKA_CLIENT_CONSUMER_NUM dynamically changes this parameter based on the partition of the subscribed topic
KAFKA_CLIENT_CONSUMER_NUM=2

# KAFKA_CLIENT
# Client Process
# KAFKA_CLIENT_API_MODE："HIGH_LEVEL" / "LOW LEVEL"
# KAFKA_CLIENT_CONSUMER_NUM: Must be less than the maximum partition in topic
KAFKA_CLIENT_API_MODE=LOW_LEVEL

# REDIS/FILE/DIRECTLY
# If you choose "Directly" mode, the number of processing logical processes is equal to the minimum number of kafka client processes.
# The KAFKA_CUSTOM_PROCESS_NUM parameter is ignored.
# Make sure your consumption logic consumes as much data as possible, otherwise the rate of consumption will be lower than the rate of production.

# The process generated by KAFKA_CUSTOM_PROCESS_NUM gets messages from the storage medium
KAFKA_MESSAGE_STORAGE=REDIS

# Number of message processing processes
KAFKA_SINKER_PROCESS_NUM=2

# Which is your storage redis config
KAFKA_STORAGE_REDIS=POOL_REDIS_0

# Redis stores the persistent key
KAFKA_STORAGE_REDIS_KEY=${APP_NAME}:storage:redis:messages

# Redis persists the maximum number of messages
KAFKA_STORAGE_REDIS_LIMIT=40000

# Redis Pool
# `POOL_REDIS_NUM` is number，which begin offset is 0
POOL_REDIS_NUM=1
POOL_REDIS_0_MAX_NUMBER=5
POOL_REDIS_0_MAX_IDLE=3
POOL_REDIS_0_MIN_IDLE=0
POOL_REDIS_0_HOST=mredis
POOL_REDIS_0_PORT=60379
POOL_REDIS_0_AUTH=uXUxGIyprkel1nYWhCyoCYAT4CNCUW2mXkVcDfhTqetnYSD7
POOL_REDIS_0_DB=0
# other redis config ...

```

Unit-test
---------

[](#unit-test)

Take the project directory as the root directory.

```
php vendor/bin/phpunit tests/Protocol/

```

```
PHPUnit 7.5.16 by Sebastian Bergmann and contributors.

Runtime:       PHP 7.1.28
Configuration: /www5/kafka-swoole/phpunit.xml.dist

......................                                            22 / 22 (100%)

Time: 64 ms, Memory: 6.00 MB

OK (22 tests, 22 assertions)
```

References
----------

[](#references)

- [Apache.kafka.protocol](http://kafka.apache.org/protocol.html)
- [Kafka.ConsumerConfig](https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/consumer/ConsumerConfig.scala)
- [Kafka.ProducerConfig](https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/producer/ProducerConfig.scala)

###  Health Score

21

—

LowBetter than 19% of packages

Maintenance20

Infrequent updates — may be unmaintained

Popularity16

Limited adoption so far

Community10

Small or concentrated contributor base

Maturity31

Early-stage or recently created project

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

### Community

Maintainers

![](https://www.gravatar.com/avatar/69c5499d6c0cfaf088d63b3f9cb7ccff477dbd76c3d38536d78c6e960d7b4fc6?d=identicon)[ccinn](/maintainers/ccinn)

---

Top Contributors

[![whiteCcinn](https://avatars.githubusercontent.com/u/13690966?v=4)](https://github.com/whiteCcinn "whiteCcinn (146 commits)")

---

Tags

clientdockergziphighlevelkafkalowlevelphpsnappyswoole

### Embed Badge

![Health badge](/badges/ccinn-kafka-swoole/health.svg)

```
[![Health](https://phpackages.com/badges/ccinn-kafka-swoole/health.svg)](https://phpackages.com/packages/ccinn-kafka-swoole)
```

###  Alternatives

[deployer/deployer

Deployment Tool

11.0k25.4M207](/packages/deployer-deployer)[appwrite/server-ce

End to end backend server for frontend and mobile apps.

55.3k84.2k](/packages/appwrite-server-ce)[pragmarx/health

Laravel Server &amp; App Health Monitor and Notifier

2.0k1.0M2](/packages/pragmarx-health)[felixfbecker/language-server-protocol

PHP classes for the Language Server Protocol

22476.7M6](/packages/felixfbecker-language-server-protocol)[heroku/heroku-buildpack-php

Toolkit for starting a PHP application locally, with or without foreman, using the same config for PHP and Apache2/Nginx as on Heroku

8161.3M10](/packages/heroku-heroku-buildpack-php)[tiamo/phpas2

PHPAS2 is a php-based implementation of the EDIINT AS2 standard

4674.7k](/packages/tiamo-phpas2)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
