PHPackages                             typo3/jobqueue-common - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. typo3/jobqueue-common

Abandoned → [flowpack/jobqueue-common](/?search=flowpack%2Fjobqueue-common)Neos-package[Queues &amp; Workers](/categories/queues)

typo3/jobqueue-common
=====================

Common functionality for a JobQueue in Flow applications.

3.4.1(1y ago)273.0k27[3 issues](https://github.com/Flowpack/jobqueue-common/issues)[3 PRs](https://github.com/Flowpack/jobqueue-common/pulls)1MITPHPPHP ^7.4 || ^8.0

Since Nov 5Pushed 7mo ago7 watchersCompare

[ Source](https://github.com/Flowpack/jobqueue-common)[ Packagist](https://packagist.org/packages/typo3/jobqueue-common)[ RSS](/packages/typo3-jobqueue-common/feed)WikiDiscussions main Synced 3w ago

READMEChangelog (10)Dependencies (2)Versions (32)Used By (1)

Flowpack.JobQueue.Common
========================

[](#flowpackjobqueuecommon)

Neos Flow package that allows for asynchronous and distributed execution of tasks.

### Table of contents

[](#table-of-contents)

- [Quickstart](#quickstart-tldr)
- [Introduction](#introduction)
- [Message Queue](#message-queue)
- [Job Queue](#job-queue)
- [Command Line Interface](#command-line-interface)
- [Signals &amp; Slots](#signal--slots)
- [License](#license)
- [Contributions](#contributions)

Quickstart (TL;DR)
------------------

[](#quickstart-tldr)

1. **Install this package using composer:**

```
composer require flowpack/jobqueue-common

```

(or by adding the dependency to the composer manifest of an installed package)

2. **Configure a basic queue by adding the following to your `Settings.yaml`:**

```
Flowpack:
  JobQueue:
    Common:
      queues:
        'some-queue':
          className: 'Flowpack\JobQueue\Common\Queue\FakeQueue'
```

3. **Initialize the queue (if required)**

With

```
./flow queue:setup some-queue

```

you can setup the queue and/or verify its configuration. In the case of the `FakeQueue` that step is not required.

*Note:* The `queue:setup` command won't remove any existing messages, there is no harm in calling it multiple times

4. **Annotate any *public* method you want to be executed asynchronously:**

```
use Flowpack\JobQueue\Common\Annotations as Job;

class SomeClass {

    /**
     * @Job\Defer(queueName="some-queue")
     */
    public function sendEmail($emailAddress)
    {
        // send some email to $emailAddress
    }
}
```

or use attributes instead of annotations (PHP 8.0 and later):

```
use Flowpack\JobQueue\Common\Annotations as Job;

class SomeClass {

    #[Job\Defer(queueName: "some-queue")]
    public function sendEmail($emailAddress)
    {
        // send some email to $emailAddress
    }
}
```

*Note:* The method needs to be *public* and it must not return anything

5. **Start the worker (if required)**

With the above code in place, whenever the method `SomeClass::sendEmail()` is about to be called that method call is converted into a job that is executed asynchronously\[1\].

Unless you use the `FakeQueue` like in the example, a so called `worker` has to be started, to listen for new jobs and execute them::

```
./flow flowpack.jobqueue.common:job:work some-queue --verbose

```

Introduction
------------

[](#introduction)

To get started let's first define some terms:

 Message  A piece of information passed between programs or systems, sometimes also referred to as "Event".
 In the JobQueue packages we use messages to transmit `Jobs`.  Message Queue  According to [Wikipedia](https://en.wikipedia.org/wiki/Message_queue) "message queues \[...\] are software-engineering components used for inter-process communication (IPC), or for inter-thread communication within the same process".
 In the context of the JobQueue packages we refer to "Message Queue" as a [FIFO](https://en.wikipedia.org/wiki/FIFO_(computing_and_electronics)) buffer that distributes messages to one or more consumers, so that every message is only processed once.  Job  A unit of work to be executed (asynchronously).
 In the JobQueue packages we use the Message Queue to store serialized jobs, so it acts as a "Job stream".  Job Manager  Central authority allowing adding and fetching jobs to/from the Message Queue.  Worker  The worker watches a queue and triggers the job execution.
 This package comes with a `job:work` command that does this (see below)  submit  New messages are \*submitted\* to a queue to be processed by a worker  reserve  Before a message can be processed it has to be \*reserved\*.
 The queue guarantees that a single message can never be reserved by two workers (unless it has been released again)  release  A reserved message can be \*released\* to the queue to be processed at a later time.
 The \*JobManager\* does this if Job execution failed and the `maximumNumberOfReleases` setting for the queue is greater than zero  abort  If a message could not be processed successfully it is \*aborted\* marking it \*failed\* in the respective queue so that it can't be reserved again.
 The \*JobManager\* aborts a message if Job execution failed and the message can't be released (again)  finish  If a message was processed successfully it is marked \*finished\*.
 The \*JobManager\* finishes a message if Job execution succeeded. Message Queue
-------------

[](#message-queue)

The `Flowpack.JobQueue.Common` package comes with a *very basic* Message Queue implementation `Flowpack\JobQueue\Common\Queue\FakeQueue` that allows for execution of Jobs using sub requests. It doesn't need any 3rd party tools or server loops and works for basic scenarios. But it has a couple of limitations to be aware of:

1. It is not actually a queue, but dispatches jobs immediately as they are queued. So it's not possible to distribute the work to multiple workers
2. The `JobManager` is not involved in processing of jobs so the jobs need to take care of error handling themselves.
3. For the same reason [Signals](#signal--slots) are *not* emitted for the `FakeQueue`.
4. With Flow 3.3+ The `FakeQueue` supports a flag `async`. Without that flag set, executing jobs *block* the main thread!

For advanced usage it is recommended to use one of the implementing packages like one of the following:

- [Flowpack.JobQueue.Doctrine](https://github.com/Flowpack/jobqueue-doctrine)
- [Flowpack.JobQueue.Beanstalkd](https://github.com/Flowpack/jobqueue-beanstalkd)
- [Flowpack.JobQueue.Redis](https://github.com/Flowpack/jobqueue-redis)

### Configuration

[](#configuration)

This is the simplest configuration for a queue:

```
Flowpack:
  JobQueue:
    Common:
      queues:
        'test':
          className: 'Flowpack\JobQueue\Common\Queue\FakeQueue'
```

With this a queue named `test` will be available.

*Note:* For reusable packages you should consider adding a vendor specific prefixes to avoid collisions. We recommend to use a classname or the package name with the function name (e.g. Flowpack.ElasticSearch.ContentRepositoryQueueIndexer.

### Queue parameters

[](#queue-parameters)

The following parameters are supported by all queues:

ParameterTypeDefaultDescriptionclassNamestring-FQN of the class implementing the queuemaximumNumberOfReleasesinteger3Max. number of times a message is re-
released to the queue if a job failedexecuteIsolatedbooleanFALSEIf TRUE jobs for this queue are executed in a separate Thread. This makes sense in order to avoid memory leaks and side-effectsoutputResultsbooleanFALSEIf TRUE the full output (stdout + stderr) of the respective job is forwarded to the stdout of its "parent" (only applicable if `executeIsolated` is `true`)queueNamePrefixstring-Optional prefix for the internal queue name,
allowing to re-use the same backend over multiple installationsoptionsarray-Options for the queue.
Implementation specific (see corresponding package)releaseOptionsarray-Options that will be passed to `release()` when a job failed.
Implementation specific (see corresponding package)A more complex example could look something like:

```
Flowpack:
  JobQueue:
    Common:
      queues:
        'email':
          className: 'Flowpack\JobQueue\Beanstalkd\Queue\BeanstalkdQueue'
          maximumNumberOfReleases: 5
          executeIsolated: true
          outputResults: true
          queueNamePrefix: 'staging-'
          options:
            client:
              host: 127.0.0.11
              port: 11301
            defaultTimeout: 50
          releaseOptions:
            priority: 512
            delay: 120
        'log':
          className: 'Flowpack\JobQueue\Redis\Queue\RedisQueue'
          options:
            defaultTimeout: 10
```

As you can see, you can have multiple queues in one installations. That allows you to use different backends/options for queues depending on the requirements.

### Presets

[](#presets)

If multiple queries share common configuration **presets** can be used to ease readability and maintainability:

```
Flowpack:
  JobQueue:
    Common:
      presets:
        'staging-default':
          className: 'Flowpack\JobQueue\Doctrine\Queue\DoctrineQueue'
          queueNamePrefix: 'staging-'
          options:
            pollInterval: 2
      queues:
        'email':
          preset: 'staging-default'
          options:
            tableName: 'queue_email' # default table name would be "flowpack_jobqueue_messages_email"
        'log':
          preset: 'staging-default'
          options:
            pollInterval: 1 # overrides "pollInterval" of the preset
```

This will configure two `DoctrineQueue`s "email" and "log" with some common options but different table names and poll intervals.

Job Queue
---------

[](#job-queue)

The job is an arbitrary class implementing `Flowpack\JobQueue\Common\Job\JobInterface`. This package comes with one implementation `StaticMethodCallJob` that allows for invoking a public method (see [Quickstart](#quickstart-tldr)) but often it makes sense to create a custom Job:

```
