PHPackages                             oro/message-queue - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. oro/message-queue

Abandoned → [oro/platform](/?search=oro%2Fplatform)ArchivedLibrary[Queues &amp; Workers](/categories/queues)

oro/message-queue
=================

ORO Message Queue Component

6322PHP

Since Apr 17Pushed 8y ago15 watchersCompare

[ Source](https://github.com/oroinc/OroMessageQueueComponent)[ Packagist](https://packagist.org/packages/oro/message-queue)[ RSS](/packages/oro-message-queue/feed)WikiDiscussions master Synced 3w ago

READMEChangelogDependenciesVersions (1)Used By (0)

OroMessageQueue Component
=========================

[](#oromessagequeue-component)

*Note:* This article is published in the Oro documentation library.

The component incorporates a message queue in your application via different transports. It contains several layers.

The lowest layer is called Transport and provides an abstraction of transport protocol. The Consumption layer provides tools to consume messages, such as cli command, signal handling, logging, extensions. It works on top of transport layer.

The Client layer provides an ability to start producing/consuming messages with as little configuration as possible.

- [External Links](#external-links)
- [What is a Message Queue](#what-is-a-message-queue)
- [Dictionary](#dictionary)
- [Message Processors](#message-processors)
- [Jobs](#jobs)
- [Flow](#flow)
- [Usage](#usage)

External Links
--------------

[](#external-links)

- [What is Message Queue](http://www.ibm.com/support/knowledgecenter/SSFKSJ_9.0.0/com.ibm.mq.pro.doc/q002620_.htm)
- [Message Queue Benefits](https://www.iron.io/top-10-uses-for-message-queue/) (most of them are applicable for Oro Message Queue Component)
- [Rabbit MQ Introduction](https://www.rabbitmq.com/tutorials/tutorial-one-php.html)

What Is a Message Queue
-----------------------

[](#what-is-a-message-queue)

Message queues provide an asynchronous communications protocol, meaning that the sender and the receiver of the message do not need to interact with the message queue at the same time. Messages placed onto the queue are stored until the recipient retrieves them. A message does not have information about previous and next messages.

### Therefore, Message Queues Should Be Used If:

[](#therefore-message-queues-should-be-used-if)

- A process can be executed asynchronously.
- A process does not affect User Experience.
- Processes need to be executed in parallel for faster performance.
- You need a guarantee of processing.
- You need scalability.

### Publish/Subscribe Messaging

[](#publishsubscribe-messaging)

OroMessageQueue uses *Publish/subscribe messaging*. It means that the sending application publishes (sends) a message with a specific *topic* and a *consumer* finds a subscriber(s) for the topic. Publish/subscribe messaging allows decoupling of the information provider from the consumers of that information. The sending application and the receiving application do not need to know anything about each other for the information to be sent and received.

Dictionary
----------

[](#dictionary)

- **Message** - An information message which contains a *message topic* that indicates which *message processor(s)* will process it and a *message body* - array of parameters required for the processing, for example an entity id or a channel name. Messages are created and sent by a *message producer* and put to the "tail" of the *message queue*. When the message comes up, it is processed by a *consumer* using a *message processor*. Messages also contain a number of additional settings (see [Message settings](#message-settings)).
- **Message Queue** - A FIFO queue that holds *queue messages* until they are processed. There can be one or more queues. If we use only one queue, it is much easier. If there are several queues, it is much more difficult but more flexible sometimes.
- **Consumer** - A component which takes messages from the queue and processes them. It processes one message at a time: once one message has finished being processed, the next message follows. For each message, the consumer runs a *message processor* subscribed to the *message topic* (if one exists). If there are several processors subscribed to the same topic, they can be run in parallel (actually messages are sent via broker and if the broker sees that a message has several receivers, it clones the message to create an individual message for each receiver). There can be more than one consumer and they can work on different servers. It can be done to increase the performance. When implementing a message processor, a developer should remember that *there can be several consumers working on different servers*.
- **Message Processor** - Processes the queue messages (i.e. contains a code that should run when a consumer processes a message with the specified topic).
- **Message Topic** - Some identifier that indicates which message processor should be executed for the message. One processor can subscribe to several topics. Also, there can be several processes subscribed to the same topic.
- **Job** - A message processor can process a message directly or create a job. Jobs are created in the db and allow monitoring of the processes status, start and end time, interrupt processes. Also, if we split a process into a set of parallel processes, jobs allow monitoring and control of the whole set. See details in the [Jobs](#jobs) section.

### Message Settings

[](#message-settings)

- **Topic** - Refers to the term 'Message Topic' above.
- **Body** - A string or json encoded array with some data.
- **Priority** - Can be `MessagePriority::VERY_LOW`, `MessagePriority::LOW`, `MessagePriority::NORMAL`, `MessagePriority::HIGH`, `MessagePriority::VERY_HIGH`. Recognizing priority is simple: there are five queues, one queue per priority. Consumers process messages from the VERY\_HIGH queue. If there are no messages in the VERY\_HIGH queue, consumers process messages from the HIGH queue, etc. Consequently, if all other queues are empty, consumer processes messages from the VERY\_LOW queue.
- **Expire** - A number of seconds after which the message should be removed from the queue without processing.
- **Delay** - A number of seconds that the message should be delayed for before it is sent to a queue.

Message Processors
------------------

[](#message-processors)

**Message Processors** are classes that process queue messages. They implement `MessageProcessorInterface`. In addition, they usually subscribe to the specific topics and implement `TopicSubscriberInterface`.

A `process(MessageInterface $message, SessionInterface $session)` method describes the actions that should be performed when a message is received. It can perform the actions directly or create a job. It can also produce a new message to run another processor asynchronously.

### Processing Status

[](#processing-status)

The received message can be processed, rejected, and re-queued. An exception can also be thrown.

**Message Processor will return `self::ACK` in the following cases:**

- If a message was processed successfully.
- If the created job returned `true`.

It means that the message was processed successfully and is removed from the queue.

**Message Processor will return `self::REJECT` in the following cases:**

- If a message is broken.
- If the created job returned `false`.

It means that the message was not processed and is removed from the queue because it is unprocessable and will never become processable (e.g. a required parameter is missing or another permanent error appears).

**There could be two options:**

- The message became unprocessable as a result of normal work. For example, when the message was sent to an the entity that existed at the moment of sending, but somebody deleted it. The entity will not appear again and we can reject the message. It is normal workflow, so user intervention is not required.
- The message became unprocessable due a failure. For example, when an entity id was invalid or missing. This is abnormal behavior, the message should also be rejected, but the processor requires user attention (e.g. log a critical error or even throw an exception).

**If a message cannot be processed temporarily**, for example, in case of connection timeout due server overload, the `process`method should return `self::REQUEUE`. The message will be returned to the queue again and will be processed later. **This will also happen if an exception is thrown during processing or job running**.

**The workflow of re-queuing messages (processor returns `self::REQUEUE`) is the following:**

1. A consumer processes a message (runs the `process` method of the message processor).
2. The `process` method returns `self::REQUEUE`.
3. The consumer puts the message (i.e. a copy of the message) to the end of the queue setting the `redelivery` flag to true.
4. The consumer continues message processing (the requeued message is at the end of the queue).
5. When the turn of the requeued message comes, the `DelayRedeliveredMessageExtension` works and sets a delay for the requeued message.
6. The time set in the delay passes and the message is processed again.

**The workflow of re-queuing messages when an exception is thrown inside a message processor is slightly different:**

1. A consumer processes a message (runs `process` method of the message processor).
2. An exception is thrown inside the `process` method.
3. The consumer logs the exception and puts the message (i.e. a copy of the message) to the end of the queue setting the `redelivery` flag to true. Then the consumer fails with the exception.
4. The consumer should be re-run at this stage. It can be done manually or automatically with an utility like [supervisord](http://supervisord.org/). Manual re-run is preferred for developing as developers should review the exceptions thrown on the message processing. Automatic re-run is preferred for regression testing or prod.
5. The consumer continues message processing (the failing message is at the end of the queue).
6. When the turn of the failing message comes, the `DelayRedeliveredMessageExtension` works and sets a delay for the failing message.
7. After the delay time passes, the message is processed again and the consumer can fail again.

### Example

[](#example)

A processor receives a message with the entity id. It finds the entity and changes its status without creating any job.

```
    /**
     * {@inheritdoc}
     */
    public function process(MessageInterface $message, SessionInterface $session)
    {
        $body = JSON::decode($message->getBody());

        if (! isset($body['id'])) {
            $this->logger->critical(
                sprintf('Got invalid message, id is empty: "%s"', $message->getBody()),
                ['message' => $message]
            );

            return self::REJECT;
        }

        $em = $this->getEntityManager();
        $repository = $em->getRepository(SomeEntity::class);

        $entity = $repository->find($body['id']);

        if(! $entity) {
            $this->logger->error(
                sprintf('Cannot find an entity with id: "%s"', $body['id']),
                ['message' => $message]
            );

            return self::REJECT;
        }

        $entity->setStatus('success');
        $em->persist($entity);
        $em->flush();

        return self::ACK;
      }
```

Overall, there can be three cases:

- The processor received a message with an entity id. The entity was found. The process method of the processor changed the entity status and returned self::ACK.
- The processor received a message with an entity id. The entity was not found. This is possible if the entity was deleted when the message was in the queue (i.e. after it was sent but before it was processed). This is expected behavior, but the processor rejects the message because the entity does not exist and will not appear later. Please note that we use error logging level.
- The processor received a message with an empty entity id. This is unexpected behavior. There are definitely bugs in the code that sent the message. We also reject the message but using critical logging level to inform that user intervention is required.

Jobs
----

[](#jobs)

A message processor can be implemented with or without creating jobs.

There is no ideal criteria to help decide whether a job should be created or not. A developer should decide each time which approach is better in this case.

Here are a few recommendations:

### We Can Skip a Job Creation If:

[](#we-can-skip-a-job-creation-if)

- We have an easy fast-executing action such as status changing etc.
- Our action looks like an event listener.

### We should always create jobs if:

[](#we-should-always-create-jobs-if)

- The action is complicated and can be executed for a long time.
- We need to monitor execution status.
- We need to run an unique job i.e. do not allow running a job with the same name until the previous job has finished.
- We need to run a step-by-step action i.e. the message flow has several steps (tasks) which run one after another.
- We need to split a job for a set of sub-jobs to run in parallel and monitor the status of the whole task.

Jobs are usually run with JobRunner.

### JobRunner

[](#jobrunner)

JobRunner creates and runs a job. Usually one of the following methods is used:

#### runUnique

[](#rununique)

`public function runUnique($ownerId, $name, \Closure $runCallback)`

Runs the `$runCallback`. It does not allow another job with the same name to be run at the same time.

#### createDelayed

[](#createdelayed)

`public function createDelayed($name, \Closure $startCallback)`

A sub-job which runs asynchronously (sending its own message). It can only run inside another job.

#### runDelayed

[](#rundelayed)

`public function runDelayed($jobId, \Closure $runCallback)`

This method is used inside a processor for a message which was sent with createDelayed.

The `$runCallback` closure usually returns `true` or `false`, the job status depends on the returned value. See [Jobs statuses](#jobs-statuses) section for the details.

To reuse existing processor logic in scope of job it may be decorated with `DelayedJobRunnerDecoratingProcessor` which will execute runDelayed, pass control to given processor and then handle result in format applicable for `runDelayed`

### A Dependent Job

[](#a-dependent-job)

Use a dependent job when your job flow has several steps but you want to send a new message when all steps are finished.

In the example below, a root job is created. As soon as its work is completed, it sends two messages with the 'topic1' and 'topic2' topics to the queue.

```
class MessageProcessor implements MessageProcessorInterface
{
    /**
     * @var JobRunner
     */
    private $jobRunner;

    /**
     * @var DependentJobService
     */
    private $dependentJob;

    public function process(MessageInterface $message, SessionInterface $session)
    {
        $data = JSON::decode($message->getBody());

        $result = $this->jobRunner->runUnique(
            $message->getMessageId(),
            'oro:index:reindex',
            function (JobRunner $runner, Job $job) use ($data) {
                // register two dependent jobs
                // next messages will be sent to queue when that job and all children are finished
                $context = $this->dependentJob->createDependentJobContext($job->getRootJob());
                $context->addDependentJob('topic1', 'message1');
                $context->addDependentJob('topic2', 'message2', MessagePriority::VERY_HIGH);

                $this->dependentJob->saveDependentJob($context);

                // some work to do

                return true; // if you want to ACK message or false to REJECT
            }
        );

        return $result ? self::ACK : self::REJECT;
    }
}
```

The dependant jobs can be added only to root jobs (i.e. to the jobs created with `runUnique`, not `runDelayed`).

### Jobs Structure

[](#jobs-structure)

A two-level job hierarchy is created for the process where:

- A root job can have a few child jobs.
- A child job can have one root job.
- A child job cannot have child jobs of its own.
- A root job cannot have a root job of its own.
- If we use just `runUnique`, then a parent and a child jobs with the same name are created.
- If we use `runUnique` and `createDelayed` inside it, then a parent and a child job for `runUnique` are created. Then each run of `createDelayed` adds another child for the runUnique parent.

### Job Statuses

[](#job-statuses)

- **Single job:** When a message is being processed by a consumer and a JobRunner method `runUnique` is called without creating any child jobs:
    - The root job is created and the closure passed in params runs. The job gets `Job::STATUS_RUNNING` status, the job `startedAt` field is set to the current time.
    - If the closure returns `true`, the job status is changed to `Job::STATUS_SUCCESS`, the job `stoppedAt` field is changed to the current time.
    - If the closure returns `false` or throws an exception, the job status is changed to `Job::STATUS_FAILED`, the job `stoppedAt` field is changed to the current time.
    - If someone interrupts the job, it stops working and gets `Job::STATUS_CANCELLED` status, the job `stoppedAt` field is changed to the current time.
    - If new unique job is created, but the previous job has not finished, its execution time is checked. If the execution time is longer than the configured time\_before\_stale, (see [Stale jobs](#stale-jobs)) Job::STATUS\_STALE status is set.
- **Child jobs:** When a message is being processed by a consumer, a JobRunner method `runUnique` is called which creates child jobs with `createDelayed`:
    - The root job is created and the closure passed in params runs. The job gets `Job::STATUS_RUNNING` status, the job `startedAt` field is set to the current time.
    - When the JobRunner method `createDelayed` is called, the child jobs are created and get the `Job::STATUS_NEW` statuses. The messages for the jobs are sent to the message queue.
    - When a message for a child job is being processed by a consumer and a JobRunner method `runDelayed` is called, the closure runs and the child jobs get `Job::STATUS_RUNNING` status.
    - If the closure returns `true`, the child job status is changed to `Job::STATUS_SUCCESS`, the job `stoppedAt` field is changed to the current time.
    - If the closure returns `false` or throws an exception, the child job status is changed to `Job::STATUS_FAILED`, the job `stoppedAt` field is changed to the current time.
    - When all child jobs are stopped, the root job status is changed according to the child jobs statuses.
    - If someone interrupts a child job, it stops working and gets `Job::STATUS_CANCELLED` status, the job `stoppedAt` field is changed to the current time.
    - If someone interrupts the root job, the child jobs that are already running finish their work and get the statuses according to the work result (see the description above). The child jobs that are not run yet are cancelled and get `Job::STATUS_CANCELLED` statuses.
    - If the root job status changes to Job::STATUS\_STALE, its children automatically get the same status. (see [Stale jobs](#stale-jobs))
- **Also:** If a jobs closure returns `true`, the process method which runs this job should return `self::ACK`. If a job closure returns `false`, the process method which runs this job should return `self::REJECT`.

### Stale Jobs

[](#stale-jobs)

It is not possible to create two unique jobs with the same name. That's why if one unique job is not able to finish its work, it can block another job. To handle such a situation you can use Stale Jobs functionality.

By default JobProcessor uses NullJobConfigurationProvider, so unique job will never be "stale". If you want to change that behaviour you need to create your own provider that implements JobConfigurationProviderInterface.

Method JobConfigurationProvider::getTimeBeforeStaleForJobName($jobName); should return number of seconds, after which job will be considered as "stale". If you don't want job to be staled, return null or -1.

In example below all jobs will be treated as "stale" after one hour.

```
