PHPackages                             menumbing/async-queue - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. menumbing/async-queue

ActiveLibrary[Queues &amp; Workers](/categories/queues)

menumbing/async-queue
=====================

A async queue component for hyperf.

02.2k↑155.6%2PHP

Since May 2Pushed 2mo agoCompare

[ Source](https://github.com/menumbing/async-queue)[ Packagist](https://packagist.org/packages/menumbing/async-queue)[ RSS](/packages/menumbing-async-queue/feed)WikiDiscussions master Synced 2d ago

READMEChangelogDependenciesVersions (1)Used By (0)

Menumbing Async Queue
=====================

[](#menumbing-async-queue)

An enhanced async queue component for the [Hyperf](https://hyperf.io) framework. Supports multiple drivers (Redis, AMQP, Sync) with failed message tracking, retry strategies, and monitoring capabilities.

Installation
------------

[](#installation)

```
composer require menumbing/async-queue
```

### Optional Dependencies

[](#optional-dependencies)

PackagePurpose`hyperf/amqp`Required to use the AMQP driver`hyperf/database`Required to use the database failed recorder`hyperf/di`Required to use annotationsPublish the configuration file:

```
php bin/hyperf.php vendor:publish menumbing/async-queue
```

This publishes `config/autoload/async_queue.php` and the failed messages migration.

Configuration
-------------

[](#configuration)

### Basic Configuration

[](#basic-configuration)

```
// config/autoload/async_queue.php
return [
    'pools' => [
        'default' => [
            'driver' => \Hyperf\AsyncQueue\Driver\RedisDriver::class,
            'auto_register_process' => true,
            'redis' => [
                'pool' => 'default',
            ],
            'channel' => '{queue}',
            'timeout' => 2,
            'retry_seconds' => 5,
            'handle_timeout' => 10,
            'processes' => 1,
            'concurrent' => [
                'limit' => 10,
            ],
            'max_messages' => 0,
        ],
    ],

    'failed' => [
        'recorder' => \Hyperf\AsyncQueue\Failed\RedisFailedQueueRecorder::class,
        'options' => [
            'pool' => 'default',
            'group' => env('APP_NAME', 'hyperf'),
        ],
    ],

    'debug' => [
        'before' => true,
        'after' => true,
        'failed' => true,
        'retry' => true,
    ],
];
```

### Common Options

[](#common-options)

OptionTypeDefaultDescription`driver``string``RedisDriver::class`The queue driver class`auto_register_process``bool``true`Auto-register the consumer process`channel``string``'queue'`Channel name used as prefix for queue keys/exchanges`retry_seconds``int|array``5`Delay in seconds before retrying a failed message. Use an array for progressive delays (e.g., `[1, 5, 10]`)`handle_timeout``int``10`Maximum seconds to handle a single message`processes``int``1`Number of consumer processes`concurrent.limit``int|null``null`Max concurrent message processing. `null` means no limit`max_messages``int``0`Max messages to consume before process exits. `0` means unlimited`pool``string`(required)The pool name, set automatically by the driver factory### Redis Driver Options

[](#redis-driver-options)

OptionTypeDefaultDescription`redis.pool``string``'default'`The Redis connection pool name`timeout``int``2`Blocking pop timeout in seconds### AMQP Driver Configuration

[](#amqp-driver-configuration)

To use the AMQP driver, install the required package:

```
composer require hyperf/amqp
```

Then configure a pool with the AMQP driver:

```
'pools' => [
    'default' => [
        'driver' => \Hyperf\AsyncQueue\Driver\Amqp\AmqpDriverAdapter::class,
        'auto_register_process' => true,
        'channel' => 'queue', // Exchange name
        'retry_seconds' => [1, 5, 10, 30],
        'handle_timeout' => 10,
        'processes' => 1,
        'concurrent' => [
            'limit' => 10,
        ],
        'max_messages' => 0,
        'amqp' => [
            // AMQP connection pool name (matches hyperf/amqp config)
            'pool' => 'default',

            // Exchange type: Type::DIRECT, Type::TOPIC, Type::FANOUT
            'exchange_type' => \Hyperf\Amqp\Message\Type::DIRECT,

            // Route failed messages to a separate exchange/queue
            'reroute_failed' => false,

            // Use the rabbitmq-delayed-message-exchange plugin for delays
            // Set to false to use TTL + Dead Letter Queue approach instead
            'use_delayed_exchange' => true,

            // QoS prefetch count — automatically set to concurrent.limit (default: 1)
            // This ensures RabbitMQ only delivers as many messages as the consumer can handle concurrently
            // 'prefetch_count' is not configurable; set concurrent.limit instead

            // Queue declaration options
            'queue_durable' => true,

            // Queue auto deletion options
            'queue_auto_delete' => false,

            // Exchange auto deletion options
            'exchange_auto_delete' => false,

            // Additional queue arguments (e.g., x-max-length, x-message-ttl)
            'queue_arguments' => [],

            // Queue name
            'queue' => 'payment.completed.notification-service',

            // Routing key for message routing
            'routing_key' => 'payment.completed',

            // --- Optional overrides (see "Exchange and Queue Naming" below) ---
            // 'delay_exchange' => 'payment.delayed',
            // 'delay_queue' => 'payment.completed.notification-service.delay',
            // 'failed_exchange' => 'payment.failed',
            // 'failed_queue' => 'payment.completed.notification-service.failed',
            // 'failed_routing_key' => 'default',
        ],
    ],
],
```

#### AMQP Options Reference

[](#amqp-options-reference)

OptionTypeDefaultDescription`amqp.pool``string``'default'`AMQP connection pool name`amqp.queue``string|null``null`Queue name. When `null`, uses `{exchange}.{app_name}``amqp.routing_key``string|null``null`Routing key for message routing. When `null`, uses `{exchange}``amqp.exchange_type``Type``Type::DIRECT`Exchange type: `DIRECT`, `TOPIC`, or `FANOUT``amqp.reroute_failed``bool``false`Route failed messages to a dedicated failed exchange/queue`amqp.use_delayed_exchange``bool``true`Use `rabbitmq-delayed-message-exchange` plugin. Set `false` for TTL+DLQ approach`amqp.queue_durable``bool``true`Whether the queue survives broker restart`amqp.queue_auto_delete``bool``false`Whether the queue is deleted when the last consumer disconnects`amqp.exchange_auto_delete``bool``false`Whether exchanges are deleted when all bound queues are removed`amqp.queue_arguments``array``[]`Additional AMQP queue arguments`amqp.delay_exchange``string|null``null`Delay exchange name. When `null`, uses `{channel}.delayed``amqp.delay_queue``string|null``null`Delay queue name. When `null`, uses `{queue}.delay``amqp.failed_exchange``string|null``null`Failed exchange name. When `null`, uses `{channel}.failed``amqp.failed_queue``string|null``null`Failed queue name. When `null`, uses `{queue}.failed``amqp.failed_routing_key``string|null``null`Failed routing key. When `null`, uses the pool name### Failed Message Recorders

[](#failed-message-recorders)

Two built-in recorders are available:

**Redis Recorder** (default):

```
'failed' => [
    'recorder' => \Hyperf\AsyncQueue\Failed\RedisFailedQueueRecorder::class,
    'options' => [
        'pool' => 'default',
        'group' => env('APP_NAME', 'hyperf'),
    ],
],
```

**Database Recorder** (requires `hyperf/database`):

```
'failed' => [
    'recorder' => \Hyperf\AsyncQueue\Failed\DatabaseFailedQueueRecorder::class,
],
```

Make sure to run the migration:

```
php bin/hyperf.php migrate
```

### Debug Logging

[](#debug-logging)

Enable debug logging per event type:

```
'debug' => [
    'before' => true,   // Log before message handling
    'after' => true,    // Log after successful handling
    'failed' => true,   // Log failed messages
    'retry' => true,    // Log message retries
],
```

Creating Jobs
-------------

[](#creating-jobs)

Extend the `Job` class and implement the `handle()` method:

```
