PHPackages                             robiningelbrecht/drupal-amqp-rabbitmq - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. robiningelbrecht/drupal-amqp-rabbitmq

ActiveProject

robiningelbrecht/drupal-amqp-rabbitmq
=====================================

Drupal AMQP example

611[1 issues](https://github.com/robiningelbrecht/drupal-amqp-rabbitmq/issues)PHP

Since May 10Pushed 4y ago1 watchersCompare

[ Source](https://github.com/robiningelbrecht/drupal-amqp-rabbitmq)[ Packagist](https://packagist.org/packages/robiningelbrecht/drupal-amqp-rabbitmq)[ RSS](/packages/robiningelbrecht-drupal-amqp-rabbitmq/feed)WikiDiscussions master Synced 1mo ago

READMEChangelogDependenciesVersions (1)Used By (0)

Drupal AMQP example
===================

[](#drupal-amqp-example)

 [![RabbitMQ](https://github.com/robiningelbrecht/drupal-amqp-rabbitmq/raw/master/readme/rabbitmq.png)](https://github.com/robiningelbrecht/drupal-amqp-rabbitmq/raw/master/readme/rabbitmq.png)

[![CI/CD](https://github.com/robiningelbrecht/drupal-amqp-rabbitmq/actions/workflows/ci.yml/badge.svg)](https://github.com/robiningelbrecht/drupal-amqp-rabbitmq/actions/workflows/ci.yml)[![codecov.io](https://camo.githubusercontent.com/2c1908b5c2e0af849763db000ac31282b8f60f47412d308733307aaff5fa070d/68747470733a2f2f636f6465636f762e696f2f67682f726f62696e696e67656c6272656368742f64727570616c2d616d71702d7261626269746d712f6272616e63682f6d61737465722f67726170682f62616467652e7376673f746f6b656e3d51555a78755a34395634)](https://codecov.io/gh/robiningelbrecht/drupal-amqp-rabbitmq)[![License](https://camo.githubusercontent.com/285ffcea550e2364602c746d509025ffc402a3535c583433383485e90fb51b40/68747470733a2f2f696d672e736869656c64732e696f2f6769746875622f6c6963656e73652f726f62696e696e67656c6272656368742f636f6e74696e756f75732d696e746567726174696f6e2d6578616d706c653f636f6c6f723d343238663765266c6f676f3d6f70656e253230736f75726365253230696e6974696174697665266c6f676f436f6c6f723d7768697465)](https://github.com/robiningelbrecht/drupal-amqp-rabbitmq/blob/master/LICENSE)[![PHP](https://camo.githubusercontent.com/dcbc3879866fef2a272f364a5a30c01d961488deb5a195505091e469c042afb8/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f7068702d762f726f62696e696e67656c6272656368742f64727570616c2d616d71702d7261626269746d712f6465762d6d61737465723f636f6c6f723d373737626233266c6f676f3d706870266c6f676f436f6c6f723d7768697465)](https://php.net/)

---

This repository aims to illustrate how to set up AMQP within Drupal. It contains a base structure with some working examples that use CommandHandlers to handle AMQP messages.

Installation
------------

[](#installation)

- Start by installing [Docker](https://docs.docker.com/get-docker/) and [Lando](https://docs.lando.dev/getting-started/)
- Clone this repository `git clone git@github.com:robiningelbrecht/drupal-amqp-rabbitmq.git`
- Run `lando start` to build the necessary docker containers
- Run `lando composer install` to download vendor dependencies
- Make sure following config is added to `settings.php`

```
$databases['default']['default'] = [
  'database' => 'drupal9',
  'username' => 'drupal9',
  'password' => 'drupal9',
  'prefix' => '',
  'host' => 'database',
  'port' => '',
  'namespace' => 'Drupal\\Core\\Database\\Driver\\mysql',
  'driver' => 'mysql',
];
$settings['config_sync_directory'] = '../config/sync';

$settings['amqp_credentials'] = [
  'host' => '172.21.0.3', // The AMQP host IP address is outputted in your CLI while running `lando start`
  'port' => '5672',
  'username' => 'guest',
  'password' => 'guest',
  'vhost' => '/',
  'api' => 'http://rabbit.lndo.site/',
];
```

- Import the database dump by running `lando drush sql-cli < init.sql`

The basic idea and setup
------------------------

[](#the-basic-idea-and-setup)

There are basically 3 important terms to keep in mind:

- **Worker**: A specific class that processes a message, also handles failures in case it could not process the message
- **Queue**: A class that represents a RabbitMQ queue, allows for messages to be pushed to the corresponding queue. Each queue is linked to a worker
- **Consumer**: Process that consumes a specific queue and its messages. Each queue can have zero or more consumers

The `amqp` module provides a basic framework that allows you to

- Define queues and workers
- Push messages to queues
- Consume queues with a *drush* command

[![RabbitMQ](https://github.com/robiningelbrecht/drupal-amqp-rabbitmq/raw/master/readme/rmq-drupal.svg)](https://github.com/robiningelbrecht/drupal-amqp-rabbitmq/raw/master/readme/rmq-drupal.svg)

Pushing messages and consuming them
-----------------------------------

[](#pushing-messages-and-consuming-them)

The `amqp` module contains a `SimpleQueue` and a `SimpleQueueWorker`. Let's take a look at an example of pushing and consuming messages:

[![Consume - Push example](https://github.com/robiningelbrecht/drupal-amqp-rabbitmq/raw/master/readme/consume-push-example.gif)](https://github.com/robiningelbrecht/drupal-amqp-rabbitmq/raw/master/readme/consume-push-example.gif)

Adding a new queue
------------------

[](#adding-a-new-queue)

It's recommended to add a queue for each type of task, for example:

- Sending out notifications: `send-notification-queue`
- Migrating articles: `migrate-article-queue`
- Calculate product prices: `calculate-product-price-queue`
- ...

This approach ensures that tasks of one type cannot block other ones. It also has the advantage that you can log failed messages on the corresponding failed queues of each queue:

- `send-notification-queue-failed`
- `migrate-article-queue-failed`
- `calculate-product-price-queue-failed`

To declare a new queue, just add a new entry to your `services.yml` and tag it with `ampq_queue`:

```
  Drupal\your_module\Queue\NewQueue:
    autowire: true
    tags:
      - { name: amqp_queue }
```

Make sure this class extends `BaseQueue`, so you don't have to bother queueing messages yourself.

@TODO: Explain how to push message to Q

### Push a message to it's corresponding failed Q

[](#push-a-message-to-its-corresponding-failed-q)

If, fore some reason, a message could not be processed, you might want to log it somewhere. A "failed queue" could be a solution here.
To push a message to it's corresponding failed queue, you can use the `FailedQueueFactory`:

```
  $this->failedQueueFactory->buildFor($queue)->queue(message);
```

This factory can for example be used in the `processFailure` callback of your worker:

```
  public function processFailure(Envelope $envelope, AMQPMessage $message, \Throwable $exception, Queue $queue): void
  {
    /** @var Command $command */
    $command = $envelope;
    $command->setMetaData([
      'exceptionMessage' => $exception->getMessage(),
      'traceAsString' => $exception->getTraceAsString(),
    ]);

    $failedQueue = $this->failedQueueFactory->buildFor($queue)->queue($command);
  }
```

**note**: a failed queue has no worker attached to it, and thus, cannot be consumed. This means that the messages will stay on the queue until they are manually deletd.

### Use a delayed Q to postpone consuming a message

[](#use-a-delayed-q-to-postpone-consuming-a-message)

In some more advanced use cases you might want to delay the consumption of messsages, for example:

- a digist mail that summarizes all content changes occured in the last 30 minutes
- requeue a failed message automatically after 15 seconds
- ...

You can achieve this by pushing the message to it's correspondng delayed queue:

```
  $this->delayedQueueFactory->buildWithDelayForQueue(15, $queue)->queue($message);
```

For a delayed queue to work properly you'll have to do two things:

- Add a new exchange with the name `dlx`
- Make sure the queue is defined as a binding on the `dlx` exchange, where the routing key of the binding is the command queue name to where it has to be routed.

 [![Dlx binding example](https://github.com/robiningelbrecht/drupal-amqp-rabbitmq/raw/master/readme/dlx-binding-example.png)](https://github.com/robiningelbrecht/drupal-amqp-rabbitmq/raw/master/readme/dlx-binding-example.png)

Define a new CommandHandler
---------------------------

[](#define-a-new-commandhandler)

I like to use Commands and CommandHandlers to persist changes to the database. That is basically what the `cqrs` module is for. It provides a simple framework that

- Allows you to define new commands and their corresponding command handlers
- Allows you to push messages to command queues
- Provides a command worker and dispatcher to process the commands comming in from the different queues

To add a new command (and command handler), just add a new entry to your `services.yml`and tag it with `cqrs_command_handler`:

```
  Drupal\your_module\DoSomething\DoSomethingCommandHandler:
    autowire: true
    tags:
      - { name: cqrs_command_handler }
```

Real-time migration example
---------------------------

[](#real-time-migration-example)

The example module contains... an example (deuh) that shows how to implement a "real-time" migration for the content type "Breaking news".

Navigate to `admin/content/generate-migration-message`. This form allows you to push a migration message to a queue. It simulates how a third party could push a message to a Drupal migration queue where it will get picked up by a consumer. The migration framework will then do the heavy lifting.

[![Real time migration](https://github.com/robiningelbrecht/drupal-amqp-rabbitmq/raw/master/readme/real-time-migration.gif)](https://github.com/robiningelbrecht/drupal-amqp-rabbitmq/raw/master/readme/real-time-migration.gif)

Run consumers as background processes
-------------------------------------

[](#run-consumers-as-background-processes)

Generally you want to run consumers as a background process and keep them "alive" for as long your server is up. This can be done using `systemd`, but I choose to use [supervisord](http://supervisord.org/)

> Supervisor is a client/server system that allows its users to monitor and control a number of processes on UNIX-like operating systems.

To register all consumers as a process, just run `lando consumers-start`. This will spin up supervisord and automatically create the necessary consumers for all of you queues.

When adding/removing queues or when updating queue config, you need to run `lando consumers-restart`for your new settings to be picked up.

**Important**: Whenever you make changes to you code, make sure to run the restart command as well, as you don't want your consumers to be running with old code.

### Check the status of your consumers

[](#check-the-status-of-your-consumers)

You can just run `lando consumers-status`, this should output something like this:

```
ampq-consume-queue-one:ampq-consume-queue-one-00   RUNNING   pid 1219, uptime 0:00:06
ampq-consume-queue-one:ampq-consume-queue-one-01   RUNNING   pid 1215, uptime 0:00:07
ampq-consume-queue-one:ampq-consume-queue-two-01   RUNNING   pid 1216, uptime 0:00:07

```

###  Health Score

15

—

LowBetter than 3% of packages

Maintenance17

Infrequent updates — may be unmaintained

Popularity7

Limited adoption so far

Community8

Small or concentrated contributor base

Maturity26

Early-stage or recently created project

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

### Community

Maintainers

![](https://www.gravatar.com/avatar/8c460d3d078ca95a2f552ae1eb15d4fb916781d264149a1d5ca503fe69967b91?d=identicon)[robiningelbrecht](/maintainers/robiningelbrecht)

---

Top Contributors

[![robiningelbrecht](https://avatars.githubusercontent.com/u/203894?v=4)](https://github.com/robiningelbrecht "robiningelbrecht (24 commits)")

### Embed Badge

![Health badge](/badges/robiningelbrecht-drupal-amqp-rabbitmq/health.svg)

```
[![Health](https://phpackages.com/badges/robiningelbrecht-drupal-amqp-rabbitmq/health.svg)](https://phpackages.com/packages/robiningelbrecht-drupal-amqp-rabbitmq)
```

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
