PHPackages                             drealecs/thread-worker - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. drealecs/thread-worker

ActiveLibrary[Queues &amp; Workers](/categories/queues)

drealecs/thread-worker
======================

PHP multi-thread worker message based library

v1.1.0-beta(12y ago)627[2 issues](https://github.com/drealecs/thread-worker/issues)MITPHPPHP &gt;=5.3.0

Since Jan 4Pushed 12y ago2 watchersCompare

[ Source](https://github.com/drealecs/thread-worker)[ Packagist](https://packagist.org/packages/drealecs/thread-worker)[ RSS](/packages/drealecs-thread-worker/feed)WikiDiscussions master Synced 3d ago

READMEChangelogDependencies (2)Versions (6)Used By (0)

drealecs/thread-worker
======================

[](#drealecsthread-worker)

PHP multi-thread-worker message/event library [![Build Status](https://camo.githubusercontent.com/5c2207598ca077fbefe96501e13792cd8251179ec4aba415540d0081491ab306/68747470733a2f2f7365637572652e7472617669732d63692e6f72672f647265616c6563732f7468726561642d776f726b65722e706e673f6272616e63683d6d6173746572)](http://travis-ci.org/drealecs/thread-worker)

Introduction
------------

[](#introduction)

Thread-worker is a library that allows execution of tasks in parallel by multiple PHP processes on the same computer or on different computers.

The library has a lot of concepts borrowed from other languages.

Concepts
--------

[](#concepts)

- **Task** represents a function that should be executed asynchronously.
- **Queue** is a queue of tasks. Tasks can be put into queue by a process and taken out for execution by another process.
- **Executor** is an wrapper over a queue. Tasks will be passed to it in one PHP script and in another php script **RemoteExecutor** will be used to work on those tasks.
- **TaskResult** or **TaskException** are the result of a **Task**.

**Task**, **TaskResult** and **TaskException** are the entities that are being serialized as "messages" and write to/read from the queue.

API
---

[](#api)

### Task

[](#task)

A code that will be executed remotely cannot share variables or context with the calling code. When defining a function that can be executed asynchronously it must be created as a Task by extending \\ThreadWorker\\Task and implementing method `run()`:

```
class AddTask extends ThreadWorker\Task
{
    public function run($a, $b)
    {
        returns $a + $b;
    }
}
```

and after that, the task can be used in this way:

```
$task = new AddTask(3, 5);
$result = $task();
```

and this will make `$result` equals 8.

Just like a function, a task can return a value or not.

Of course, the example above execute a task locally, synchronously. To execute it asynchronously we will need a Queue and an Executor.

### Queue

[](#queue)

To synchronize working tasks a queue concept is being used.

There is a queue, someone puts a task in the queue and there are workers that take it and run it.

Interface of a task queue:

- public function queue($task, $captureResult); - called to queue a task for execution. There are Tasks that don't return a result and Tasks that returns a result. A task that does not returns a result is usually preferable because the calling code can do other things and get out of scope or even finish execution. To accomplish this `$captureResult` must be `false` in which case the methods does not returns anything. If we need a execute multiple task remotely and join their result we might need to pass second parameter as `true` and `queue()` method will return a task identifier that can be used later to query and retrieve the task result.
- public function start() - called by the script that can execute a task. This is a blocking method and it blocks until there is a task in the queue. It returns a RemoteTask which is a container for the Task and it's TaskResult.
- public function end($remoteTask) - called by the script that executed the task. It marks the task a being finished and it task is one that returns a response, it stores the TaskResult.
- public function getResult($taskId) - usually called by the script that queued the task for execution. It can be called only one time and is blocking until the task finished to execute.
- public function isQueued|isRunning|isFinished($taskId) - methods that can query the state of a task.
- public function getQueueSize() and getRunningSize() - methods that can query the queue for it's current workflow capacity.

Currently there is only one implementation of Queue: \\ThreadWorker\\RedisQueue and there are plans for: AMQPQueue, MySQLQueue

### Executor

[](#executor)

Executor wraps a queue and provide a simpler interface to queue task and work on tasks and get task results.

There is a QueueExecutor that has 2 methods:

- void execute(Task $task) - adds the task to the queue
- QueuedTask submit(Task $task) - adds the task to the queue and returns a QueuedTask instance that can be used to query task status and retrieve the task result.

Let's look at an example:

```
$queue = new ThreadWorker\RedisQueue('example');
$executor = new ThreadWorker\QueueExecutor($queue);

$task = new AddTask(3, 5);
$queuedTask = $executor->submit($task);
$result = $queuedTask->getResult()->getValue();
```

QueueExecutor is extended into RemoteExecutor that does the asynchronous running of the tasks. Worker's code would look like this:

```
$queue = new ThreadWorker\RedisQueue('example');
$worker = new ThreadWorker\RemoteExecutor($queue);

$worker->work();
```

An instance of RemoteExecutor is passed as an extra parameter to the `run()` method of the task and can be use to queue more tasks.

###  Health Score

22

—

LowBetter than 22% of packages

Maintenance13

Infrequent updates — may be unmaintained

Popularity12

Limited adoption so far

Community8

Small or concentrated contributor base

Maturity48

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~2 days

Total

5

Last Release

4503d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/8714fa223d32c91d96ba5aededaea9dba9b395eb7574732e5964c280d2b5efaf?d=identicon)[drealecs](/maintainers/drealecs)

---

Top Contributors

[![drealecs](https://avatars.githubusercontent.com/u/209984?v=4)](https://github.com/drealecs "drealecs (50 commits)")

---

Tags

asynchronousworkerthreadsThreadmultithread

###  Code Quality

TestsPHPUnit

### Embed Badge

![Health badge](/badges/drealecs-thread-worker/health.svg)

```
[![Health](https://phpackages.com/badges/drealecs-thread-worker/health.svg)](https://phpackages.com/packages/drealecs-thread-worker)
```

###  Alternatives

[amphp/amp

A non-blocking concurrency framework for PHP applications.

4.4k123.4M323](/packages/amphp-amp)[revolt/event-loop

Rock-solid event loop for concurrent PHP applications.

91943.6M138](/packages/revolt-event-loop)[amphp/parallel

Parallel processing component for Amp.

84746.2M74](/packages/amphp-parallel)[recoil/recoil

Asynchronous coroutines for PHP 7.

78961.5k7](/packages/recoil-recoil)[amphp/sync

Non-blocking synchronization primitives for PHP based on Amp and Revolt.

18852.8M39](/packages/amphp-sync)[amphp/serialization

Serialization tools for IPC and data storage in PHP.

13451.1M18](/packages/amphp-serialization)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
