PHPackages                             volcengine/ve-rocketmq-php-sdk - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. volcengine/ve-rocketmq-php-sdk

AbandonedArchivedLibrary

volcengine/ve-rocketmq-php-sdk
==============================

PHP SDK for volcengine RocketMQ

0.1.1(2y ago)272MITPHPPHP &gt;=5.6

Since Mar 31Pushed 2y ago1 watchersCompare

[ Source](https://github.com/volcengine/ve-rocketmq-php-sdk)[ Packagist](https://packagist.org/packages/volcengine/ve-rocketmq-php-sdk)[ RSS](/packages/volcengine-ve-rocketmq-php-sdk/feed)WikiDiscussions master Synced 1mo ago

READMEChangelog (2)Dependencies (1)Versions (5)Used By (0)

RocketMQ PHP SDK
================

[](#rocketmq-php-sdk)

火山引擎消息队列 RocketMQ版PHP SDK 是基于 HTTP-Proxy 的RocketMQ客户端。 该SDK可通过RocketMQ实例的Http Proxy 接入点连接实例，实现消息的生产与消费。

安装
==

[](#安装)

```
composer require volcengine/ve-rocketmq-php-sdk
```

快速开始
====

[](#快速开始)

创建客户端。
------

[](#创建客户端)

初始化一个RocketMQ客户端需要准备好火山引擎RocketMQ实例的HTTP Proxy接入点、accessKey和secretKey。

```
use RMQ\Client;

//  HTTP Proxy 接入点
$endpoint = "";
// 密钥
$accessKey = "";
$secretKey = "";

// 实例化客户端
$client = new Client($endpoint, $accessKey, $secretKey);
```

生产
--

[](#生产)

### 创建生产者

[](#创建生产者)

调用client实例的 `createProducer()` 方法即可创建一个生产者实例。

```
// 创建一个生产者
$producer = $client->createProducer();
```

### 创建消息

[](#创建消息)

一条消息拥有很多属性topic、body、tag、key等，可以使用 `Message` 实例化一个消息的信息对象并可在这个对象上设置这些属性。

```
use RMQ\Message;

// 目标topic
$topic ="topic_name";
// 消息的内容
$messageContent = "content."

// 实例化一个消息
$msg = new Message($topic, $messageContent);
// 设置消息的tag值
$msg->setTag("tag_a");
// 设置ShardingKey
$msg->setShardingKey("my_key");
// 设置自定义属性
$msg->putProperty("property_name", "test");
```

### 生产消息

[](#生产消息)

调用producer实例的 `publishMessage()` 方法就能发布一条消息。在发布消息前还需要调用 `open()` 方法在服务端开启一个生产者实例， 在不需要发送消息时可以调用 `close()` 方法销毁。

```
$producer->open();

$msg = new Message("topic_name", "hello!");
$messageInfo = $producer->publishMessage($msg);
$producer->close();

var_dump(messageInfo);
```

消费
--

[](#消费)

### 创建消费者

[](#创建消费者)

调用client实例的 `createConsumer()` 方法即可创建一个消费者实例, 创建消费者时必须指定消费者的GroupID。

```
$groupID = ""; // 消费组ID

$consumer = $client->createConsumer($groupID, [
  // 每次调用consumeMessage最多拉取12条消息
  "max_message_number" => 12,
  // 在消息达到max_message_number之前，请求在服务端挂起的最大等待时长（单位ms）
  "max_wait_time"      => 3000
]);
```

### 消费消息

[](#消费消息)

调用消费者的 `consumeMessage()` 能拉取一批消息。在拉取消息并被使用后，需要调用 `ackMessages()` 对消息的消费状态进行确认，未被确认或确认消费失败的消息都会被重复消费。

```
use RMQ\Model\MessageInfo;

$consumer = $client->createConsumer($groupID);
// 订阅topic_a 全部消息1
$consumer->subscribe("topic_a");
// 订阅topic_b tag为A的消息
$consumer->subscribe("topic_b", "A");

$consumer->open();

// 拉取消息
$messages = $consumer->consumeMessage();

$acksHandles = [];
foreach ($messages as $msg) {
    $body = $msg->body;
    echo "message bode: $body \n";
    array_push($acksHandles, $msg->msgHandle);
}
// 确认消息的消费情况
// ackMessages第一个参数是确认消费成功的消息的msgHandle
// ackMessages第二个参数是确认消费失败的消息的msgHandle
$consumer->ackMessages($acksHandles, []);

$consumer->close();
```

进阶指引
====

[](#进阶指引)

持续生产消息
------

[](#持续生产消息)

服务端会对每一个客户端创建一个生产者实例，在客户端生产频率较低时，可能会出现服务端生产者实例被释放导致生产消息失败的情况。

```
$producer->open();
// 在open后等待60秒
sleep(60);
// 下面的方法调用会失败，因为服务端的生产者实例已超时被销毁掉
$producer->publishMessage($msg);
```

所以在持续生产消息时需要捕获这类异常并重新调用 `open()` 方法重新在服务端开启一个生产者实例，SDK 提供了一个专门用来捕获该类错误的Exception在`RMQ\Exception\MQTokenTimeoutException` 。如下demo，对部分消息等待一个很长的时间，这些消息发送时就会捕获到超时错误。

```
use RMQ\Exception\MQTokenTimeoutException;

for ($i = 0; $i < 10; $i++) {
  if ($i % 2 == 0) {
    sleep(60 * 10); // 偶数消息等待10分钟
  }

  $message = new Message("topic_name", "hello!");
  try {
    // 发送消息
    $producer->publishMessage($message);
  } catch (MQTokenTimeoutException $e) {
    // token失效的情况需要重连
    $producer->open();
    // 对消息重发
    $producer->publishMessage($message);
  } catch (RuntimeException $e) {
    // 其他错误情况
    echo $e . "\n";
  }
}
```

持续消费消息
------

[](#持续消费消息)

持续消息实际上就是一个轮询不断拉取消息，如果每次拉取消息的间隔过长也可能出现超时的情况，所以也需要捕获超时错误并重新调用 `open()` 方法。

```
$consumer->open();

while (true) {
  try {
    // 拉取消息
    $messages    = $consumer->consumeMessage();
    $acksHandles = [];
    foreach ($messages as $msg) {
        $body = $msg->body;
        echo "message bode: $body \n";
        array_push($acksHandles, $msg->msgHandle);
    }
    // 确认消费状态
    $consumer->ackMessages($acksHandles, []);
  } catch (MQTokenTimeoutException $e) {
    // token失效的情况需要重连
    $consumer->open();
  } catch (RuntimeException $e) {
    // 其他错误
    echo $e;
  }
}
```

延时消息
----

[](#延时消息)

延时是消息的一个属性，可通过`Message`类的`putProperty()`方法来设置消息的延时属性。

### 定时投递消息

[](#定时投递消息)

通过指定的具体的毫秒时间戳定时投递消息。

```
use RMQ\Message;

$msg = new Message("topic_name", "content.");
// 消息投递的具体毫秒时间戳（当前时间延迟30秒）
$postTime = time() * 1000 + 30000;
// 将延时属性设置到Property中
$msg->putProperty("__STARTDELIVERTIME", "$currentTimeStamp");
```

### 延时等级消息

[](#延时等级消息)

`Message` 类有 `setDelayLevel()` 方法可设置消息的延时属性。可设置1-18等级.

```
$msg2 = new Message("topic_name", "content");

$msg2->setDelayLevel(5)
```

`setDelayLevel()` 方法背后实际上使用的还是`putProperty()`方法。使用`setDelayLevel()` 实际效果和下面一致。

```
$level = 5

$msg2 = new Message("topic_name", "content");

$msg2->putProperty("__DelayTimeLevel", "$level");
```

###  Health Score

19

—

LowBetter than 10% of packages

Maintenance20

Infrequent updates — may be unmaintained

Popularity11

Limited adoption so far

Community7

Small or concentrated contributor base

Maturity33

Early-stage or recently created project

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~123 days

Total

2

Last Release

1012d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/bb1183944f5749bff39f331748f61121d9ba69133037622a9b18d9979f126167?d=identicon)[volcengine](/maintainers/volcengine)

---

Top Contributors

[![songshuangfei](https://avatars.githubusercontent.com/u/26037833?v=4)](https://github.com/songshuangfei "songshuangfei (22 commits)")

### Embed Badge

![Health badge](/badges/volcengine-ve-rocketmq-php-sdk/health.svg)

```
[![Health](https://phpackages.com/badges/volcengine-ve-rocketmq-php-sdk/health.svg)](https://phpackages.com/packages/volcengine-ve-rocketmq-php-sdk)
```

###  Alternatives

[neuron-core/neuron-ai

The PHP Agentic Framework.

1.8k245.3k20](/packages/neuron-core-neuron-ai)[tencentcloud/tencentcloud-sdk-php

TencentCloudApi php sdk

3731.2M42](/packages/tencentcloud-tencentcloud-sdk-php)[aedart/athenaeum

Athenaeum is a mono repository; a collection of various PHP packages

255.2k](/packages/aedart-athenaeum)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
