PHPackages                             ybrenlib/rocketmq - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. ybrenlib/rocketmq

ActiveLibrary

ybrenlib/rocketmq
=================

1.0.0(5y ago)123.1k↓100%3[3 issues](https://github.com/zhongwenyu/rocketMQ/issues)1MITPHP

Since Jan 15Pushed 5y ago2 watchersCompare

[ Source](https://github.com/zhongwenyu/rocketMQ)[ Packagist](https://packagist.org/packages/ybrenlib/rocketmq)[ RSS](/packages/ybrenlib-rocketmq/feed)WikiDiscussions master Synced 1mo ago

READMEChangelog (1)Dependencies (1)Versions (2)Used By (1)

rocketmq-php-client
===================

[](#rocketmq-php-client)

rocketmq客户端-php

producer

普通消息
====

[](#普通消息)

$defaultMQProducer = new DefaultMQProducer("producerGroupUser");
$defaultMQProducer-&gt;setNamesrvAddr("192.168.8.240:9876");
$defaultMQProducer-&gt;start();
$message = new Message("topic\_test" , "测试消息");
// 设置消费唯一键
$message-&gt;setKeys("FD816678378ABA7080BD3D7B39D4D29D");
try{
// 发送普通消息
$sendResult = $defaultMQProducer-&gt;send($message);
} finally {
$defaultMQProducer-&gt;shutdown();
}

延时消息
====

[](#延时消息)

$defaultMQProducer = new DefaultMQProducer("producerGroupUser");
$defaultMQProducer-&gt;setNamesrvAddr("192.168.8.240:9876");
$defaultMQProducer-&gt;start();
$message = new Message("topic\_test" , "测试消息");
// 设置消费唯一键
$message-&gt;setKeys("FD816678378ABA7080BD3D7B39D4D29D");
// 设置延时级别
$message-&gt;setDelayTimeLevel(DelayLevel::Level\_1m);
try{
// 发送延时消息
$sendResult = $defaultMQProducer-&gt;send($message);
} finally {
$defaultMQProducer-&gt;shutdown();
}

顺序消息
====

[](#顺序消息)

$defaultMQProducer = new DefaultMQProducer("producerGroupUser");
$defaultMQProducer-&gt;setNamesrvAddr("192.168.8.240:9876");
$defaultMQProducer-&gt;start();
$message = new Message("topic\_test" , "测试消息");
// 设置消费唯一键
$message-&gt;setKeys("FD816678378ABA7080BD3D7B39D4D29D");
try{
// 发送顺序消息
$userId = "12910737";
$sendResult = $defaultMQProducer-&gt;send($message , function ($messageQueueList , $message) use ($userId){ // 指定队列
return $userId % count($messageQueueList);
});
} finally { $defaultMQProducer-&gt;shutdown();
}

事务消息
====

[](#事务消息)

$transactionMQProducer = new TransactionMQProducer("producerGroupUser");
$transactionMQProducer-&gt;setNamesrvAddr("192.168.8.240:9876");
$transactionMQProducer-&gt;start();
$message = new Message("topic\_test" , "测试消息");
// 设置消费唯一键
$message-&gt;setKeys("FD816678378ABA7080BD3D7B39D4D29D");
// 设置事务回查
$transactionMQProducer-&gt;setTransactionListener(new TransactionCheckRollback());
try{
// 发送事务消息
$sendResult = $transactionMQProducer-&gt;sendMessageInTransaction($message);
// 提交事务消息
$rp = $transactionMQProducer-&gt;rollback($sendResult);
} finally {
$transactionMQProducer-&gt;shutdown();
}

comsumer
1.定义消费处理对象（顺序消息实现接口MessageListenerOrderly，其他消息实现接口MessageListenerConcurrently）
注意：
（1）$msg中的topic不一定是所消费的topic，也有可能是重试的topic，若需要获取topic，最好是在new MessageListener的时候传进去
（2）$msg中的msgId不可作为消息唯一标识，应使用getKeys()
class MessageListener implements MessageListenerConcurrently
{
/\*\*

- @param MessageExt\[\] $msgs
- @param ConsumeConcurrentlyContext $context
- @return string
    \*/
    function consumeMessage(array $msgs, ConsumeConcurrentlyContext $context)
    {
    foreach ($msgs as $msg){
    var\_dump(date("H:i:s.").substr(intval(microtime(true)\*1000) , 10) . " handle message : " . $msg-&gt;getKeys() . " " . json\_encode($msg-&gt;getProperties()) . " ". $msg-&gt;getBody());
    }
    return ConsumeConcurrentlyStatus::RECONSUME\_LATER;
    }

}

2.swoole worker启动时添加消费启动
$defaultMQConsumer = new DefaultMQConsumer("test\_test");
$defaultMQConsumer-&gt;setNamesrvAddr("192.168.0.106:9876");
$defaultMQConsumer-&gt;subscribe("topic\_test" , "\*");
$defaultMQConsumer-&gt;setMessageListener(new MessageListener());
$defaultMQConsumer-&gt;setConsumeFromWhere(ConsumeFromWhere::CONSUME\_FROM\_FIRST\_OFFSET);
$defaultMQConsumer-&gt;start();

###  Health Score

29

—

LowBetter than 60% of packages

Maintenance15

Infrequent updates — may be unmaintained

Popularity27

Limited adoption so far

Community9

Small or concentrated contributor base

Maturity53

Maturing project, gaining track record

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Unknown

Total

1

Last Release

1940d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/04450cb843f23fe56445b00ce8ebe4875edb0473bb10b24826d4d3adf8bf9f7e?d=identicon)[zhongwenyu1987](/maintainers/zhongwenyu1987)

### Embed Badge

![Health badge](/badges/ybrenlib-rocketmq/health.svg)

```
[![Health](https://phpackages.com/badges/ybrenlib-rocketmq/health.svg)](https://phpackages.com/packages/ybrenlib-rocketmq)
```

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
