PHPackages                             jenwachter/php-redis-queue - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. jenwachter/php-redis-queue

ActiveLibrary

jenwachter/php-redis-queue
==========================

1.7.1(6mo ago)43.6k5MITPHPPHP &gt;=8.0

Since Sep 12Pushed 6mo ago2 watchersCompare

[ Source](https://github.com/jenwachter/php-redis-queue)[ Packagist](https://packagist.org/packages/jenwachter/php-redis-queue)[ Docs](http://github.com/jenwachter/php-redis-queue)[ RSS](/packages/jenwachter-php-redis-queue/feed)WikiDiscussions main Synced 1mo ago

READMEChangelog (5)Dependencies (4)Versions (17)Used By (0)

PHP Redis Queue
===============

[](#php-redis-queue)

A simple background server queue utilizing Redis and PHP.

- [Requirements](#requirements)
- [Installation](#installation)
- [How it works](#how-it-works)
- [Quick example](#quick-example)
- [Documentation](#documentation)
    - [Worker](#worker)
    - [Client](#client)
    - [CLI](#cli)

Requirements
------------

[](#requirements)

- PHP &gt;= 8.0
- Redis &gt;= 6.0

Installation
------------

[](#installation)

To install the library, you will need to use [Composer](https://getcomposer.org/download/) in your project.

```
composer require jenwachter/php-redis-queue
```

How it works
------------

[](#how-it-works)

A *worker* defines a queue, listens for jobs to get pushed into the queue, and performs work to complete each job in the queue. A single queue can handle multiple types of jobs, specified by different callbacks. For example, you can create a queue to handle file uploads that can both upload and delete files using `upload` and `delete` callbacks. Ideally, workers are run as a process on a server.

A *client* pushes jobs into a queue, optionally with data needed by the worker to complete the job. For example, for a job that uploads files, passing the path of the file would be helpful.

When a client pushes a job into a queue, it waits in the queue until it reaches the top. Once it does, the worker:

1. Removes the job from the queue and adds it to a processing queue.
2. Determines if there is an available callback for this job. If there isn't, the job is considered failed.
3. Calls a *before* callback for the job type, if defined.
4. Calls the main callback for the job type.
    1. If the callback does not throw an exception, it is considered successful. If the callback throws an exception, it is considered failed.
    2. The job is removed from the processing queue and added to either the failed or success list.
5. Calls an *after* callback for the job type, if defined.
6. If the job is part of a [job group](#job-groups) and all jobs within the group have completed, the worker calls the `group_after` callback, if defined.
7. The queue moves on to the next job or waits until another is added.

Quick example
-------------

[](#quick-example)

In this quick example, we'll set up a worker that handles processing that needs to be done when uploading and deleting files. We'll then create a client to send jobs to the worker.

### Create a worker

[](#create-a-worker)

If using the default settings of the `work()` method (blocking), only one worker (queue) can be run in a single file; however, a single worker can handle multiple types of jobs. For example, here we create the `files` worker that can handle processing that needs to be done when both a file is uploaded and deleted:

*files.php*

```
$predis = new Predis\Client();
$worker = new PhpRedisQueue\QueueWorker($predis, 'files');

$worker->addCallback('upload', function (array $data) {
  // create various crop sizes
  // optimize
  // ...
});

$worker->addCallback('delete', function (array $data) {
  // delete all created crop sizes
  // ...
});

$worker->work();
```

### Run the worker

[](#run-the-worker)

To run the worker, you can run it manually on a server:

```
$ php files.php
```

But once the script exits or the connection closes, the worker will also stop running. This is helpful for testing during development, but not so much in a production environment.

To ensure your worker is always running, run the worker as a process using a system such as [Supervisor](http://supervisord.org/).

### Create a client

[](#create-a-client)

Continuing with our files example, the following client code would be executed after a file is uploaded or deleted in the system.

```
// trigger: user uploads a file...

$predis = new Predis\Client();
$client = new PhpRedisQueue\Client($predis);

// gather data required by worker upload callback
$data = [
  'path' => '/path/to/uploaded/file',
];

// push the job to the files queue
$client->push('files', 'upload', $data);
```

### Job groups

[](#job-groups)

You can also group jobs into a Job Group, which enables you to use the `group_after` callback when all jobs in the group have completed. Jobs added to a job group can be assigned to any queue. Refer to the [Job Group documentation](#job-group) for more details.

**client.php**

```
$predis = new Predis\Client();
$client = new PhpRedisQueue\Client($predis);

// create a job group
$group = $client->createJobGroup();

// add jobs to the group
$group->push('queuename', 'jobname');
$group->push('another-queuename', 'jobname');
$group->push('queuename', 'jobname');
$group->push('queuename', 'jobname');

// add jobs in the group to the queue
$group->queue();
```

**worker.php**

```
$predis = new Predis\Client();
$worker = new PhpRedisQueue\QueueWorker($predis, 'queuename');

$worker->addCallback('jobname', fn (array $data) true);

$worker->addCallback('group_after', function ($group, $success) {
  // respond to group completion
});

$worker->work();
```

Documentation
-------------

[](#documentation)

### Worker

[](#worker)

#### Initialization

[](#initialization)

```
$predis = new Predis\Client();
$worker = new PhpRedisQueue\QueueWorker($predis, 'queuename');
```

*Note: queuename cannot be an integer.*

#### Configuration

[](#configuration)

Some parts of the worker can be customized by sending an array of options as the third argument.

```
$predis = new Predis\Client();
$worker = new PhpRedisQueue\QueueWorker($predis, 'queuename', [
  'logger' => new Logger(),
]);
```

##### Available options:

[](#available-options)

- **default\_socket\_timeout**: timeout (in seconds) for the worker, if using the default blocking functionality. Default: -1 (no timeout)
- **logger**: a logger that implements `Psr\Log\LoggerInterface`. Default: null
- **wait**: number of seconds to wait in between job processing. Default: 1

### Methods

[](#methods)

#### **`addCallback(string $name, callable $callable)`**

[](#addcallbackstring-name-callable-callable)

Attaches a callback to the worker. Available callbacks:

- ``: Runs the job. Example: `upload`
- `_before`: Runs before the job begins. Example: `upload_before`
- `_after`: Runs after the job is complete. Example: `upload_after`
- `group_after`: Runs after a group of jobs have completed.

Returns: Null.

Arguments:

- `$name`: Name of a hook that corresponds to one of three stages of the job's processing. See above for the format.
- `$callable`: Function to attach to the given hook. Arguments are as follows:
    - `(array $data)`
        - `$data`: Array of data passed to the job by the client
    - `_before(array $data)`
        - `$data`: Array of data passed to the job by the client
    - `_after(array $data, bool $success)`
        - `$data`: Array of data passed to the job by the client. Exception data from failed jobs is available in `$data['context']`
        - `$success`: Job status; success (`true`) or failure (`false`)
    - `group_after(JobGroup $group, bool $success)`
        - `$group`: Job group model
        - `$success`: Group status; all jobs in the group completed successfully (`true`) or one or more jobs in the group failed (`false`)

#### **`work(bool $block = true)`**

[](#workbool-block--true)

Instructs the worker to begin listening to the queue.

Returns: Null.

Arguments:

- `$block`: Should the work method be blocking?

### Client

[](#client)

#### Initialization

[](#initialization-1)

```
$predis = new Predis\Client();
$client = new \PhpRedisQueue\Client($predis);
```

#### Configuration

[](#configuration-1)

Some parts of the worker can be customized by sending an array of options as the second argument.

```
$predis = new Predis\Client();
$worker = new PhpRedisQueue\QueueWorker($predis, 'queuename', [
  'logger' => new Logger()
]);
```

##### Available options:

[](#available-options-1)

- **logger**: a logger that implements `Psr\Log\LoggerInterface`. Default: null

### Methods

[](#methods-1)

#### **`push(string $queue, string $jobName = 'default', array $jobData = [])`**

[](#pushstring-queue-string-jobname--default-array-jobdata--)

Pushes a job to the end of a queue.

Returns: Integer. ID of job.

Arguments:

- `$queue`: Name of the queue.
- `$jobName`: Name of the job to handle the work.
- `$data`: Data to pass to the worker.

#### **`pushToFront(string $queue, string string $jobName = 'default', array $jobData = [])`**

[](#pushtofrontstring-queue-string-string-jobname--default-array-jobdata--)

Pushes a job to the front of a queue.

Returns: Integer. ID of job.

Arguments:

- `$queue`: Name of the queue.
- `$jobName`: Name of the job to handle the work.
- `$data`: Data to pass to the worker.

#### **`pull(int $id)`**

[](#pullint-id)

Pull a job from a queue.

Returns: Boolean. `true` if the job was successfully removed; otherwise, `false`.

Arguments:

- `$id`: ID of job to pull.

#### **`rerun(int $id)`**

[](#rerunint-id)

Reruns a previously failed job. Throws an exception if the job was successful already or cannot be found.

Returns: Boolean. TRUE if the job was successfully readded to the queue.

Arguments:

- `$id`: ID of failed job.

#### **`createJobGroup(int total = null, $data = [])`**

[](#createjobgroupint-total--null-data--)

Creates a job group, which allows you to link jobs together. Use the [`group_after`](#addcallbackstring-name-callable-callable) callback to perform work when all jobs in the group have completed.

Returns: PhpRedisQueue\\models\\JobGroup object

Arguments:

- `$total`: Total number of jobs.
- `$data`: array of data to store with the job group.

### Job Group

[](#job-group)

#### Initialization

[](#initialization-2)

A job group are created via the `Client::createJobGroup`, which then returns the JobGroup model.

```
$predis = new Predis\Client();
$client = new \PhpRedisQueue\Client($predis);

$group = $group->createJobGroup($total = null, $data = []);
```

Returns: JobGroup model

Arguments:

- `$total`: Total number of jobs, if known at initialization.
- `$data`: Array of data to attach to the group, which can be of use in the `group_after` callback.

### JobGroup Model Methods

[](#jobgroup-model-methods)

#### **`push(string $queue, string $jobName = 'default', array $jobData = [])`**

[](#pushstring-queue-string-jobname--default-array-jobdata---1)

Pushes a job to the job group. Note: jobs cannot be added to a Job Group if it has already been queued.

Returns: Integer. ID of job.

Arguments:

- `$queue`: Name of the queue.
- `$jobName`: Name of the job to handle the work.
- `$data`: Data to pass to the worker.

#### **`setTotal(int total)`**

[](#settotalint-total)

Tells the group how many jobs to expect. Enables the Job Group to automatically add the jobs to the queue once the total is reached. Alternatively, use [`JobGroup::queue()`](#queue) to manually queue.

Returns: Boolean. TRUE if the total was successfully set.

#### **`queue()`**

[](#queue)

Add the group's jobs to the queue. Only use this method if the Job Group is unaware of the total number of jos to expect via initialization of [`JobGroup::setTotal()`](#settotalint-total).

Returns: Boolean

#### **`removeFromQueue()`**

[](#removefromqueue)

Removes any remaining jobs in the group from their queues.

Returns: Boolean

#### **`getJobs()`**

[](#getjobs)

Get an array of jobs associated with the group.

Returns: array of Job models.

#### **`getUserData()`**

[](#getuserdata)

Get the data assigned to the group on initialization.

Returns: array

### CLI

[](#cli)

The command line interface allows you to interact with the queue. To access the tool, run:

```
./vendor/bin/prq
```

To list all available commands, run:

```
./vendor/bin/prq list
```

To get help with a command, run:

### Get help with a command

[](#get-help-with-a-command)

```
./vendor/bin/prq queues:list --help
```

### Connecting to Redis in the CLI

[](#connecting-to-redis-in-the-cli)

By default, the CLI connects to Redis at `tcp://127.0.0.1:6379`. If you need to customize this connection, you have a couple options:

#### Environment variables

[](#environment-variables)

You can configure the connection to Redis using environment variables:

- To replace parts of the default connection string, set any or all of the following: `REDIS_SCHEME`, `REDIS_HOST`, `REDIS_PORT`
- To override the entire connection string, set `REDIS_URI`

The CLI will automatically use these variables to create the connection string.

##### Examples

[](#examples)

- Setting `REDIS_HOST=my-redis-host` will result in a connection string of `tcp://my-redis-host:6379`
- Setting `REDIS_URI=unix:/path/to/redis.sock` will result in a connection string of `unix:/path/to/redis.sock`

#### Connection file

[](#connection-file)

To create a more complex connection, create a PHP file that returns an instance of `Predis\Client` and pass the file to the `prq` command via the `--connection` (or `-c`) option.

```
