PHPackages                             coolpenguins/think-queue-rabbitmq - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. coolpenguins/think-queue-rabbitmq

ActiveThink-extend[Queues &amp; Workers](/categories/queues)

coolpenguins/think-queue-rabbitmq
=================================

为 ThinkPHP5.1 队列增加 RabbitMQ 驱动

v2.0.3(1y ago)014Apache-2.0PHPPHP ^7.1.3

Since Sep 6Pushed 1y ago1 watchersCompare

[ Source](https://github.com/coolpenguins/think-queue-rabbitmq)[ Packagist](https://packagist.org/packages/coolpenguins/think-queue-rabbitmq)[ RSS](/packages/coolpenguins-think-queue-rabbitmq/feed)WikiDiscussions main Synced 1mo ago

READMEChangelogDependencies (5)Versions (8)Used By (0)

think-queue-rabbitmq for ThinkPHP5.1
====================================

[](#think-queue-rabbitmq-for-thinkphp51)

TP5 队列没有 RabbitMQ 驱动十分不便，借鉴了 Laravel 的 RabbitMQ 驱动包 [vyuldashev/laravel-queue-rabbitmq](https://github.com/vyuldashev/laravel-queue-rabbitmq)，为 TP 增加 RabbitMQ 驱动

更新记录
----

[](#更新记录)

> 2023-09-06 修复$job-&gt;release($delay)后$job-&gt;attempts()始终为1的bug

安装
--

[](#安装)

> composer require coolpenguins/think-queue-rabbitmq

配置
--

[](#配置)

> 本驱动包基于 `think-queue`，配置文件位于 `config/queue.php`

### 驱动配置

[](#驱动配置)

```

return [
    'connector' => 'coolpenguins\\queue\\connector\\RabbitMQ',

    'dsn' => env('RABBITMQ_DSN', null),

    'host' => env('RABBITMQ_HOST', '127.0.0.1'),
    'port' => env('RABBITMQ_PORT', 5672),

    'vhost' => env('RABBITMQ_VHOST', '/'),
    'login' => env('RABBITMQ_LOGIN', 'guest'),
    'password' => env('RABBITMQ_PASSWORD', 'guest'),

    'queue' => env('RABBITMQ_QUEUE', 'default'),

    'options' => [

        'exchange' => [

            'name' => env('RABBITMQ_EXCHANGE_NAME'),

            /*
            * Determine if exchange should be created if it does not exist.
            */
            'declare' => env('RABBITMQ_EXCHANGE_DECLARE', true),

            /*
            * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html
            */
            'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT),
            'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
            'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
            'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
            'arguments' => env('RABBITMQ_EXCHANGE_ARGUMENTS'),
        ],

        'queue' => [

            /*
            * Determine if queue should be created if it does not exist.
            */
            'declare' => env('RABBITMQ_QUEUE_DECLARE', true),

            /*
            * Determine if queue should be binded to the exchange created.
            */
            'bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true),

            /*
            * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html
            */
            'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
            'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
            'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
            'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
            'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'),
        ],
    ],

    /*
     * Determine the number of seconds to sleep if there's an error communicating with rabbitmq
     * If set to false, it'll throw an exception rather than doing the sleep for X seconds.
     */
    'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', 5),

    /*
     * Optional SSL params if an SSL connection is used
     */
    'ssl_params' => [
        'ssl_on' => env('RABBITMQ_SSL', false),
        'cafile' => env('RABBITMQ_SSL_CAFILE', null),
        'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
        'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
        'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
        'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
    ],

];

```

创建任务类
-----

[](#创建任务类)

> 单模块项目推荐使用 `app\job` 作为任务类的命名空间 多模块项目可用使用 `app\module\job` 作为任务类的命名空间 也可以放在任意可以自动加载到的地方

任务类不需继承任何类，如果这个类只有一个任务，那么就只需要提供一个`fire`方法就可以了，如果有多个小任务，就写多个方法，下面发布任务的时候会有区别
每个方法会传入两个参数 `think\queue\Job $job`（当前的任务对象） 和 `$data`（发布任务时自定义的数据）

还有个可选的任务失败执行的方法 `failed` 传入的参数为`$data`（发布任务时自定义的数据）

### 下面写两个例子

[](#下面写两个例子)

```
namespace app\job;

use think\queue\Job;

class Job1{

    public function fire(Job $job, $data){

            //....这里执行具体的任务

             if ($job->attempts() > 3) {
                  //通过这个方法可以检查这个任务已经重试了几次了
             }

            //如果任务执行成功后 记得删除任务，不然这个任务会重复执行，直到达到最大重试次数后失败后，执行failed方法
            $job->delete();

            // 也可以重新发布这个任务
            $job->release($delay); //$delay为延迟时间

    }

    public function failed($data){

        // ...任务达到最大重试次数后，失败了
    }

}

```

```

namespace app\lib\job;

use think\queue\Job;

class Job2{

    public function task1(Job $job, $data){

    }

    public function task2(Job $job, $data){

    }

    public function failed($data){

    }

}

```

发布任务
----

[](#发布任务)

> `think\Queue::push($job, $data = '', $queue = null)` 和 `think\Queue::later($delay, $job, $data = '', $queue = null)` 两个方法，前者是立即执行，后者是在`$delay`秒后执行

`$job` 是任务名
单模块的，且命名空间是`app\job`的，比如上面的例子一,写`Job1`类名即可
多模块的，且命名空间是`app\module\job`的，写`model/Job1`即可
其他的需要些完整的类名，比如上面的例子二，需要写完整的类名`app\lib\job\Job2`
如果一个任务类里有多个小任务的话，如上面的例子二，需要用@+方法名`app\lib\job\Job2@task1`、`app\lib\job\Job2@task2`

`$data` 是你要传到任务里的参数

`$queue` 队列名，指定这个任务是在哪个队列上执行，同下面监控队列的时候指定的队列名,可不填

监听任务并执行
-------

[](#监听任务并执行)

> php think queue:listen

> php think queue:work --daemon（不加--daemon为执行单个任务）

两种，具体的可选参数可以输入命令加 --help 查看

> 可配合supervisor使用，保证进程常驻

###  Health Score

25

—

LowBetter than 37% of packages

Maintenance37

Infrequent updates — may be unmaintained

Popularity6

Limited adoption so far

Community7

Small or concentrated contributor base

Maturity42

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~62 days

Recently: every ~93 days

Total

7

Last Release

604d ago

Major Versions

1.1.1 → v2.0.02024-09-12

### Community

Maintainers

![](https://www.gravatar.com/avatar/8ec6e94148c4e7f578091c63bb9fd906126aea8b3a7f65f6effdf1a4bc34c01c?d=identicon)[coolpenguins](/maintainers/coolpenguins)

---

Top Contributors

[![coolpenguins](https://avatars.githubusercontent.com/u/24853762?v=4)](https://github.com/coolpenguins "coolpenguins (10 commits)")

### Embed Badge

![Health badge](/badges/coolpenguins-think-queue-rabbitmq/health.svg)

```
[![Health](https://phpackages.com/badges/coolpenguins-think-queue-rabbitmq/health.svg)](https://phpackages.com/packages/coolpenguins-think-queue-rabbitmq)
```

###  Alternatives

[topthink/think-queue

The ThinkPHP6 Queue Package

640675.0k75](/packages/topthink-think-queue)[nuwber/rabbitevents

The Nuwber RabbitEvents package

120515.8k3](/packages/nuwber-rabbitevents)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
