PHPackages                             gabrielextso/rabbitmq-bundle - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. gabrielextso/rabbitmq-bundle

ActiveSymfony-bundle[Queues &amp; Workers](/categories/queues)

gabrielextso/rabbitmq-bundle
============================

Integrates php-amqplib with Symfony &amp; RabbitMq. Formerly oldsound/rabbitmq-bundle.

02PHP

Since Apr 28Pushed 5y ago1 watchersCompare

[ Source](https://github.com/GabrielEXTSO/RabbitMqBundle)[ Packagist](https://packagist.org/packages/gabrielextso/rabbitmq-bundle)[ RSS](/packages/gabrielextso-rabbitmq-bundle/feed)WikiDiscussions master Synced today

READMEChangelogDependenciesVersions (1)Used By (0)

RabbitMqBundle
==============

[](#rabbitmqbundle)

[![Join the chat at https://gitter.im/php-amqplib/RabbitMqBundle](https://camo.githubusercontent.com/668d677b3ba3fcf5cff4adae9aced799c122faaf2a5d01ca6b3b3d4894d15282/68747470733a2f2f6261646765732e6769747465722e696d2f7068702d616d71706c69622f5261626269744d7142756e646c652e737667)](https://gitter.im/php-amqplib/RabbitMqBundle?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)

About
-----

[](#about)

The RabbitMqBundle incorporates messaging in your application via [RabbitMQ](http://www.rabbitmq.com/) using the [php-amqplib](http://github.com/php-amqplib/php-amqplib) library.

The bundle implements several messaging patterns as seen on the [Thumper](https://github.com/php-amqplib/Thumper) library. Therefore publishing messages to RabbitMQ from a Symfony controller is as easy as:

```
$msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
$this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));
```

Later when you want to consume 50 messages out of the `upload_pictures` queue, you just run on the CLI:

```
$ ./app/console rabbitmq:consumer -m 50 upload_picture
```

All the examples expect a running RabbitMQ server.

This bundle was presented at [Symfony Live Paris 2011](http://www.symfony-live.com/paris/schedule#session-av1) conference. See the slides [here](http://www.slideshare.net/old_sound/theres-a-rabbit-on-my-symfony).

[![Build Status](https://camo.githubusercontent.com/593e39356620ff54627beee6a511354482f3bc10985294f651b83989c6f3b133/68747470733a2f2f7365637572652e7472617669732d63692e6f72672f7068702d616d71706c69622f5261626269744d7142756e646c652e706e673f6272616e63683d6d6173746572)](http://travis-ci.org/php-amqplib/RabbitMqBundle)

Installation
------------

[](#installation)

### For Symfony Framework &gt;= 2.3

[](#for-symfony-framework--23)

Require the bundle and its dependencies with composer:

```
$ composer require php-amqplib/rabbitmq-bundle
```

Register the bundle:

```
// app/AppKernel.php

public function registerBundles()
{
    $bundles = array(
        new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(),
    );
}
```

Enjoy !

### For a console application that uses Symfony Console, Dependency Injection and Config components

[](#for-a-console-application-that-uses-symfony-console-dependency-injection-and-config-components)

If you have a console application used to run RabbitMQ consumers, you do not need Symfony HttpKernel and FrameworkBundle. From version 1.6, you can use the Dependency Injection component to load this bundle configuration and services, and then use the consumer command.

Require the bundle in your composer.json file:

```
{
    "require": {
        "php-amqplib/rabbitmq-bundle": "~1.6",
    }
}

```

Register the extension and the compiler pass:

```
use OldSound\RabbitMqBundle\DependencyInjection\OldSoundRabbitMqExtension;
use OldSound\RabbitMqBundle\DependencyInjection\Compiler\RegisterPartsPass;

// ...

$containerBuilder->registerExtension(new OldSoundRabbitMqExtension());
$containerBuilder->addCompilerPass(new RegisterPartsPass());
```

### Warning - BC Breaking Changes

[](#warning---bc-breaking-changes)

- Since 2012-06-04 Some default options for exchanges declared in the "producers" config section have changed to match the defaults of exchanges declared in the "consumers" section. The affected settings are:

    - `durable` was changed from `false` to `true`,
    - `auto_delete` was changed from `true` to `false`.

    Your configuration must be updated if you were relying on the previous default values.
- Since 2012-04-24 The ConsumerInterface::execute method signature has changed
- Since 2012-01-03 the consumers execute method gets the whole AMQP message object and not just the body. See the CHANGELOG file for more details.

Usage
-----

[](#usage)

Add the `old_sound_rabbit_mq` section in your configuration file:

```
old_sound_rabbit_mq:
    connections:
        default:
            host:     'localhost'
            port:     5672
            user:     'guest'
            password: 'guest'
            vhost:    '/'
            lazy:     false
            connection_timeout: 3
            read_write_timeout: 3

            # requires php-amqplib v2.4.1+ and PHP5.4+
            keepalive: false

            # requires php-amqplib v2.4.1+
            heartbeat: 0

            #requires php_sockets.dll
            use_socket: true # default false
        another:
            # A different (unused) connection defined by an URL. One can omit all parts,
            # except the scheme (amqp:). If both segment in the URL and a key value (see above)
            # are given the value from the URL takes precedence.
            # See https://www.rabbitmq.com/uri-spec.html on how to encode values.
            url: 'amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6'
    producers:
        upload_picture:
            connection:       default
            exchange_options: {name: 'upload-picture', type: direct}
            service_alias:    my_app_service # no alias by default
    consumers:
        upload_picture:
            connection:       default
            exchange_options: {name: 'upload-picture', type: direct}
            queue_options:    {name: 'upload-picture'}
            callback:         upload_picture_service
```

Here we configure the connection service and the message endpoints that our application will have. In this example your service container will contain the service `old_sound_rabbit_mq.upload_picture_producer` and `old_sound_rabbit_mq.upload_picture_consumer`. The later expects that there's a service called `upload_picture_service`.

If you don't specify a connection for the client, the client will look for a connection with the same alias. So for our `upload_picture` the service container will look for an `upload_picture` connection.

If you need to add optional queue arguments, then your queue options can be something like this:

```
queue_options: {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}
```

another example with message TTL of 20 seconds:

```
queue_options: {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}
```

The argument value must be a list of datatype and value. Valid datatypes are:

- `S` - String
- `I` - Integer
- `D` - Decimal
- `T` - Timestamps
- `F` - Table
- `A` - Array

Adapt the `arguments` according to your needs.

If you want to bind queue with specific routing keys you can declare it in producer or consumer config:

```
queue_options:
    name: "upload-picture"
    routing_keys:
      - 'android.#.upload'
      - 'iphone.upload'
```

### Important notice - Lazy Connections

[](#important-notice---lazy-connections)

In a Symfony environment all services are fully bootstrapped for each request, from version &gt;= 2.3 you can declare a service as lazy ([Lazy Services](http://symfony.com/doc/master/components/dependency_injection/lazy_services.html)). This bundle still doesn't support new Lazy Services feature but you can set `lazy: true` in your connection configuration to avoid unnecessary connections to your message broker in every request. It's extremely recommended to use lazy connections because performance reasons, nevertheless lazy option is disabled by default to avoid possible breaks in applications already using this bundle.

### Import notice - Heartbeats

[](#import-notice---heartbeats)

It's a good idea to set the `read_write_timeout` to 2x the heartbeat so your socket will be open. If you don't do this, or use a different multiplier, there's a risk the **consumer** socket will timeout.

### Dynamic Connection Parameters

[](#dynamic-connection-parameters)

Sometimes your connection information may need to be dynamic. Dynamic connection parameters allow you to supply or override parameters programmatically through a service.

e.g. In a scenario when the `vhost` parameter of the connection depends on the current tenant of your white-labeled application and you do not want (or can't) change it's configuration every time.

Define a service under `connection_parameters_provider` that implements the `ConnectionParametersProviderInterface`, and add it to the appropriate `connections` configuration.

```
connections:
    default:
        host:     'localhost'
        port:     5672
        user:     'guest'
        password: 'guest'
        vhost:    'foo' # to be dynamically overridden by `connection_parameters_provider`
        connection_parameters_provider: connection_parameters_provider_service
```

Example Implementation:

```
class ConnectionParametersProviderService implements ConnectionParametersProvider {
    ...
    public function getConnectionParameters() {
        return array('vhost' => $this->getVhost());
    }
    ...
}
```

In this case, the `vhost` parameter will be overridden by the output of `getVhost()`.

Producers, Consumers, What?
---------------------------

[](#producers-consumers-what)

In a messaging application, the process sending messages to the broker is called **producer** while the process receiving those messages is called **consumer**. In your application you will have several of them that you can list under their respective entries in the configuration.

### Producer

[](#producer)

A producer will be used to send messages to the server. In the AMQP Model, messages are sent to an **exchange**, this means that in the configuration for a producer you will have to specify the connection options along with the exchange options, which usually will be the name of the exchange and the type of it.

Now let's say that you want to process picture uploads in the background. After you move the picture to its final location, you will publish a message to server with the following information:

```
public function indexAction($name)
{
    $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
    $this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));
}
```

As you can see, if in your configuration you have a producer called **upload\_picture**, then in the service container you will have a service called **old\_sound\_rabbit\_mq.upload\_picture\_producer**.

Besides the message itself, the `OldSound\RabbitMqBundle\RabbitMq\Producer#publish()` method also accepts an optional routing key parameter and an optional array of additional properties. The array of additional properties allows you to alter the properties with which an `PhpAmqpLib\Message\AMQPMessage` object gets constructed by default. This way, for example, you can change the application headers.

You can use **setContentType** and **setDeliveryMode** methods in order to set the message content type and the message delivery mode respectively. Default values are **text/plain** for content type and **2** for delivery mode.

```
$this->get('old_sound_rabbit_mq.upload_picture_producer')->setContentType('application/json');
```

If you need to use a custom class for a producer (which should inherit from `OldSound\RabbitMqBundle\RabbitMq\Producer`), you can use the `class` option:

```
    ...
    producers:
        upload_picture:
            class: My\Custom\Producer
            connection: default
            exchange_options: {name: 'upload-picture', type: direct}
    ...
```

The next piece of the puzzle is to have a consumer that will take the message out of the queue and process it accordingly.

### Consumers

[](#consumers)

A consumer will connect to the server and start a **loop** waiting for incoming messages to process. Depending on the specified **callback** for such consumer will be the behavior it will have. Let's review the consumer configuration from above:

```
consumers:
    upload_picture:
        connection:       default
        exchange_options: {name: 'upload-picture', type: direct}
        queue_options:    {name: 'upload-picture'}
        callback:         upload_picture_service
```

As we see there, the **callback** option has a reference to an **upload\_picture\_service**. When the consumer gets a message from the server it will execute such callback. If for testing or debugging purposes you need to specify a different callback, then you can change it there.

Apart from the callback we also specify the connection to use, the same way as we do with a **producer**. The remaining options are the **exchange\_options** and the **queue\_options**. The **exchange\_options** should be the same ones as those used for the **producer**. In the **queue\_options** we will provide a **queue name**. Why?

As we said, messages in AMQP are published to an **exchange**. This doesn't mean the message has reached a **queue**. For this to happen, first we need to create such **queue** and then bind it to the **exchange**. The cool thing about this is that you can bind several **queues** to one **exchange**, in that way one message can arrive to several destinations. The advantage of this approach is the **decoupling** from the producer and the consumer. The producer does not care about how many consumers will process his messages. All it needs is that his message arrives to the server. In this way we can expand the actions we perform every time a picture is uploaded without the need to change code in our controller.

Now, how to run a consumer? There's a command for it that can be executed like this:

```
$ ./app/console rabbitmq:consumer -m 50 upload_picture
```

What does this mean? We are executing the **upload\_picture** consumer telling it to consume only 50 messages. Every time the consumer receives a message from the server, it will execute the configured callback passing the AMQP message as an instance of the `PhpAmqpLib\Message\AMQPMessage` class. The message body can be obtained by calling `$msg->body`. By default the consumer will process messages in an **endless loop** for some definition of *endless*.

If you want to be sure that consumer will finish executing instantly on Unix signal, you can run command with flag `-w`.

```
$ ./app/console rabbitmq:consumer -w upload_picture
```

Then the consumer will finish executing instantly.

For using command with this flag you need to install PHP with [PCNTL extension](http://www.php.net/manual/en/book.pcntl.php).

If you want to establish a consumer memory limit, you can do it by using flag `-l`. In the following example, this flag adds 256 MB memory limit. Consumer will be stopped five MB before reaching 256MB in order to avoid a PHP Allowed memory size error.

```
$ ./app/console rabbitmq:consumer -l 256
```

If you want to remove all the messages awaiting in a queue, you can execute this command to purge this queue:

```
$ ./app/console rabbitmq:purge --no-confirmation upload_picture
```

For deleting the consumer's queue, use this command:

```
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
```

#### Consumer Events

[](#consumer-events)

This can be useful in many scenarios. There are 3 AMQPEvents:

##### ON CONSUME

[](#on-consume)

```
class OnConsumeEvent extends AMQPEvent
{
    const NAME = AMQPEvent::ON_CONSUME;

    /**
     * OnConsumeEvent constructor.
     *
     * @param Consumer $consumer
     */
    public function __construct(Consumer $consumer)
    {
        $this->setConsumer($consumer);
    }
}
```

Let`s say you need to sleep / stop consumer/s on a new application deploy. You can listen for OnConsumeEvent (\\OldSound\\RabbitMqBundle\\Event\\OnConsumeEvent) and check for new application deploy.

##### BEFORE PROCESSING MESSAGE

[](#before-processing-message)

```
class BeforeProcessingMessageEvent extends AMQPEvent
{
    const NAME = AMQPEvent::BEFORE_PROCESSING_MESSAGE;

    /**
     * BeforeProcessingMessageEvent constructor.
     *
     * @param AMQPMessage $AMQPMessage
     */
    public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
    {
        $this->setConsumer($consumer);
        $this->setAMQPMessage($AMQPMessage);
    }
}
```

Event raised before processing a AMQPMessage.

##### AFTER PROCESSING MESSAGE

[](#after-processing-message)

```
class AfterProcessingMessageEvent extends AMQPEvent
{
    const NAME = AMQPEvent::AFTER_PROCESSING_MESSAGE;

    /**
     * AfterProcessingMessageEvent constructor.
     *
     * @param AMQPMessage $AMQPMessage
     */
    public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
    {
        $this->setConsumer($consumer);
        $this->setAMQPMessage($AMQPMessage);
    }
}
```

Event raised after processing a AMQPMessage. If the process message will throw an Exception the event will not raise.

##### IDLE MESSAGE

[](#idle-message)

```
