PHPackages                             printed/rabbitmq-queue-bundle - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. printed/rabbitmq-queue-bundle

ActiveLibrary[Queues &amp; Workers](/categories/queues)

printed/rabbitmq-queue-bundle
=============================

A helper wrapper for RabbitMQ producers and consumers

5.3.2(1y ago)234.3k↑62.5%3[1 PRs](https://github.com/printedcom/rabbitmq-queue-bundle/pulls)MITPHPPHP &gt;=7.0

Since Sep 2Pushed 1y ago3 watchersCompare

[ Source](https://github.com/printedcom/rabbitmq-queue-bundle)[ Packagist](https://packagist.org/packages/printed/rabbitmq-queue-bundle)[ Docs](https://printed.com)[ RSS](/packages/printed-rabbitmq-queue-bundle/feed)WikiDiscussions develop Synced today

READMEChangelog (3)Dependencies (17)Versions (39)Used By (0)

printed/rabbitmq-queue-bundle
=============================

[](#printedrabbitmq-queue-bundle)

A RabbitMQ wrapper bundle for Symfony that aims to improve your experience with consumers and producers. The bundle piggybacks off of the `php-amqplib/rabbitmq-bundle` bundle.

Notable features
----------------

[](#notable-features)

- Cancellable queue tasks
- Queue tasks completeness progress tracking
- An opinionated, zero-downtime deployment procedure, focused on supervisord
- (via `php-amqplib/rabbitmq-bundle`) Queue consumer's graceful max execution time
- Queue tasks reattempts

Setup &amp; Dependencies
------------------------

[](#setup--dependencies)

- PHP `>=7.0`
-  `^3.4|^4.0`
-  `~2.5`
-  `~1.11`
-  `~3.4`
-  `~1.6`

We assume that you are familiar with the `php-amqplib/rabbitmq-bundle` configuration and setup.

### Special note for memcached users

[](#special-note-for-memcached-users)

Please open `vendor/doctrine/cache/lib/Doctrine/Common/Cache/MemcachedCache.php` file in your project and see whether you can find the following piece of code:

```
protected function doContains($id)
{
    return false !== $this->memcached->get($id)
        || $this->memcached->getResultCode() !== Memcached::RES_NOTFOUND;
}
```

If you do, then consider upgrading `doctrine/cache` version to at least `1.7.0`, otherwise `CacheQueueMaintenanceStrategy` might be saying that the maintenance mode is up when there was any connection issues to your memcached server.

### Bundle configuration

[](#bundle-configuration)

```
printedcom_rabbitmq_queue_bundle:
  options:
    # Name of the service that acts as a default producer in RabbitMQ. See below this code snippet for details.
    default_rabbitmq_producer_name: 'default_rabbitmq_producer'

    # Name of the service that implements the queue maintenance mode. Use one of the following:
    #
    # 1. 'printed.bundle.queue.service.queue_maintenance.cache_queue_maintenance_strategy' (recommended)
    # 2. 'printed.bundle.queue.service.queue_maintenance.filesystem_queue_maintenance_strategy'
    #
    # To understand the difference, see the comments in the source code.
    queue_maintenance_strategy__service_name: 'printed.bundle.queue.service.queue_maintenance.cache_queue_maintenance_strategy'

    # Name of a cache service that implements the requirements outlined in the CacheQueueMaintenanceStrategy
    cache_queue_maintenance_strategy__cache_service_name: 'rabbitmq_queue_bundle_cache'

    # Name of the service that implements the New Deployments Detection feature. Choose one of the following:
    #
    # 1. 'printed.bundle.queue.service.new_deployments_detector.noop_strategy' - disables this functionality
    # 2. 'printed.bundle.queue.service.new_deployments_detector.cache_strategy'
    new_deployments_detector_strategy__service_name: 'printed.bundle.queue.service.new_deployments_detector.cache_strategy'

    # Name of a cache service that implements the requirements outlined in the CacheQueueMaintenanceStrategy
    new_deployments_detector_strategy__cache_service_name: 'rabbitmq_queue_bundle_cache'

    # Exit code used to exit a worker when it's detected that it's running old code
    consumer_exit_code__running_using_old_code: 15

    # With tools like supervisord, it's important to have consumers running without exiting for a specified amount of time
    # in order to prove that the script started successfully. This is a problem if a consumer manages to start, connect to rabbitmq
    # and fail due to exception being thrown during execution of the task faster than the specified amount of time. The following
    # option makes the consumer not fail too fast. You essentially want to put your supervisord's "startsecs" value here (in seconds).
    # Bear in mind that the underlying code will always add 1 second to whatever value you put here in order to compensate for
    # fraction of seconds that aren't taken into account during evaluating of how long the script has been running for. This means that
    # you just need to put the value from supervisord's config "as is" without worrying about race conditions.
    #
    # You can disable this feature by either setting this option to null or not mentioning this option at all. I.e. by default this option
    # is disabled.
    minimal_runtime_in_seconds_on_consumer_exception: 1

    rabbitmq_user: '%rabbitmq_user%'
    rabbitmq_password: '%rabbitmq_pass%'

    # Pass '/' or don't set this option if you don't know what rabbtimq vhost is.
    rabbitmq_vhost: '%rabbitmq_vhost%'

    # This is used only by commands that call the rabbit management api. You don't need to do
    # anything with this key if you don't use those commands.
    rabbitmq_api_base_url: 'http://%rabbitmq_host%:15672'
```

- `rabbitmq-queue-bundle.default_rabbitmq_producer_name` You are expected to have at least one RabbitMQ producer with the following config:

```
producers:
    default:
        connection:       default
        service_alias:    default_rabbitmq_producer
```

The value of the `service_alias` should be provided in the `rabbitmq-queue-bundle.default_rabbitmq_producer_name` parameter. That config creates a RabbitMQ producer which dispatches tasks to appropriate queues by queue names. This is also known as the "default" producer in RabbitMQ.

### Important notice: Use dedicated EntityManager for your consumers.

[](#important-notice-use-dedicated-entitymanager-for-your-consumers)

AbstractQueueConsumer should ideally use a dedicated and separate EntityManager to process QueueTasks. This is to prevent problems with flushing changes to the QueueTasks when the application's default EntityManager stopped being operational (e.g. due to the "Is already closed" error, or due to the entity manager's unit of work having been cleared).

To achieve this, use a multi entity manager configuration for the DoctrineBundle, and then supply the dedicated entity manager to the constructor of your consumers.

Example configuration:

```
# DoctrineBundle configuration
doctrine:
  # (...)
  orm:
    # (...)
    default_entity_manager: default
    entity_managers:
      default:
        auto_mapping: true

      queue_consumer:
        mappings:
          Printed\Bundle\Queue:
            type: annotation
            prefix: Printed\Bundle\Queue\Entity
            dir: '%kernel.project_dir%/vendor/printed/rabbitmq-queue-bundle/src/Printed/Bundle/Queue/Entity'

# Registering your queue consumer e.g. in services.yml
acme.queue.consumer.my_queue_task1:
  class: 'Acme\Bundle\Queue\Consumer\MyQueueTask1QueueConsumer'
  arguments:
    - '@doctrine.orm.default_entity_manager'
    - '@validator'
    - '@monolog.logger'
    - '@Psr\Container\ContainerInterface'
    - '@printed.bundle.queue.service.service_container_parameters'
    - '@doctrine.orm.queue_consumer_entity_manager'
  tags:
    - container.service_subscriber
```

Usage
-----

[](#usage)

Assuming you have the configuration done for the previous bundles and running a basic Symfony demo application the steps below should be fairly easy to follow. Small changes can be made where needed but the examples are to help get the point across.

### Example rabbitmq.yml config

[](#example-rabbitmqyml-config)

```
old_sound_rabbit_mq:
    connections:
        default:
            host: '%rabbitmq_host%'
            port: '%rabbitmq_port%'
            user: '%rabbitmq_user%'
            password: '%rabbitmq_pass%'
            vhost: '/'
            lazy: true
            connection_timeout: 3

            # this needs to be 2x of the heartbeat option
            read_write_timeout: 7200

            # don't forget that you should enable tcp keepalive in rabbitmq as well: https://www.rabbitmq.com/networking.html#socket-gen-tcp-options
            keepalive: true

            # keep this value high because https://github.com/php-amqplib/RabbitMqBundle/issues/301
            heartbeat: 3600

    producers:
        default:
            connection:       default
            service_alias:    default_rabbitmq_producer

    consumers:
        upload_picture: &consumer_template
            connection:       default
            queue_options:    { name: 'upload-picture' }
            callback:         upload_picture_service
            qos_options:      { prefetch_size: 0, prefetch_count: 1, global: false }
            graceful_max_execution:
                timeout: 1800
                exit_code: 10

        render_treasure_map:
