PHPackages                             bingo-soft/concurrent - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. bingo-soft/concurrent

ActiveProject[Queues &amp; Workers](/categories/queues)

bingo-soft/concurrent
=====================

Concurrent programming constructs for PHP

1.5.0(2y ago)36102MITPHPPHP ^7.4 || ^8.0

Since Aug 10Pushed 1y ago1 watchersCompare

[ Source](https://github.com/bingo-soft/concurrent)[ Packagist](https://packagist.org/packages/bingo-soft/concurrent)[ Docs](https://github.com/bingo-soft/concurrent)[ RSS](/packages/bingo-soft-concurrent/feed)WikiDiscussions main Synced 1w ago

READMEChangelog (10)Dependencies (5)Versions (15)Used By (2)

[![Latest Stable Version](https://camo.githubusercontent.com/79c77b859418c2dceebb3573ad1b23fb45b98f70f57e4c47f907910cdecf7d8c/68747470733a2f2f706f7365722e707567782e6f72672f62696e676f2d736f66742f636f6e63757272656e742f762f737461626c652e706e67)](https://packagist.org/packages/bingo-soft/concurrent)[![Minimum PHP Version](https://camo.githubusercontent.com/0e9ac047546796cfdbe1423d1f4d91c8f37d2fbb11614a7900bb7686aaa5401f/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f7068702d253345253344253230372e342d3838393242462e737667)](https://php.net/)[![License: MIT](https://camo.githubusercontent.com/784362b26e4b3546254f1893e778ba64616e362bd6ac791991d2c9e880a3a64e/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f4c6963656e73652d4d49542d677265656e2e737667)](https://opensource.org/licenses/MIT)[![Scrutinizer Code Quality](https://camo.githubusercontent.com/45aec450b5e00761943f564c07a122e875aa42c1764a1a43798264545d4d2ae9/68747470733a2f2f7363727574696e697a65722d63692e636f6d2f672f62696e676f2d736f66742f636f6e63757272656e742f6261646765732f7175616c6974792d73636f72652e706e673f623d6d61696e)](https://scrutinizer-ci.com/g/bingo-soft/concurrent/?branch=main)

Concurrent
==========

[](#concurrent)

Concurrent programming constructs for PHP

Installation
============

[](#installation)

Install library, using Composer:

```
composer require bingo-soft/concurrent

```

Example 1 (simple pool)
=======================

[](#example-1-simple-pool)

```
$pool = new DefaultPoolExecutor(3); //only three active processes in the pool
$task1 = new TestTask("task 1");
$task2 = new TestTask("task 2");
$task3 = new TestTask("task 3");
$task4 = new TestTask("task 4");
$task4 = new TestTask("task 5");

$pool->execute($task1);
$pool->execute($task2);
$pool->execute($task3);
$pool->execute($task4); //task for is waiting for an empty slot in the pool
$pool->execute($task5); //task for is waiting for an empty slot in the pool
$pool->shutdown(); //shutdown pool with all processes attached
```

Example 2 (CountDownLatch)
==========================

[](#example-2-countdownlatch)

```
    //IPC is implemented via server socket on default port 1081? can be changed

    $initializer = new ComponentInitializer(4);

    // Simulate component initialization
    $initializer->initializeComponent(function () {
        fwrite(STDERR, getmypid() . ": Process is sleeping for 1 second\n");
        sleep(1);
    });
    $initializer->initializeComponent(function () {
        fwrite(STDERR, getmypid() . ": Process is sleeping for 2 seconds\n");
        sleep(2);
    });
    $initializer->initializeComponent(function () {
        fwrite(STDERR, getmypid() . ": Process is sleeping for 3 seconds\n");
        sleep(3);
    });
    $initializer->initializeComponent(function () {
        fwrite(STDERR, getmypid() . ": Process is sleeping for 4 seconds\n");
        sleep(4);
    });

    $start = hrtime(true);
    $initializer->awaitInitialization();
    $end = hrtime(true);

    //Less than 5 seconds total
    assert(floor(($end - $start) / 1000000) < 5000);
```

Example 3 (ForkJoinPool with recursive task)
============================================

[](#example-3-forkjoinpool-with-recursive-task)

```
    //SumTask.php with recursive task

    use Concurrent\ThreadInterface;
    use Concurrent\Task\RecursiveTask;

    class SumTask extends RecursiveTask
    {
        public $start;
        public $end;
        private const THRESHOLD = 10000; // Arbitrary threshold to split tasks
        public $timestamp;

        public function __construct(int $start, int $end)
        {
            parent::__construct();
            $this->start = $start;
            $this->end = $end;
            $this->timestamp = hrtime(true);
        }

        public function castResult($result)
        {
            return intval($result);
        }

        public function __serialize(): array
        {
            return [
                'xid' => $this->xid,
                'start' => $this->start,
                'end' => $this->end,
                'timestamp' => $this->timestamp,
                'result' => self::$result->get($this->xid, 'result')
            ];
        }

        public function __unserialize(array $data): void
        {
            $this->xid = $data['xid'];
            $this->start = $data['start'];
            $this->end = $data['end'];
            $this->timestamp = $data['timestamp'];
        }

        public function compute(ThreadInterface $worker, ...$args)
        {
            $length = $this->end - $this->start;
            if ($length start; $i end; $i++) {
                    $sum += $i;
                }
                return $sum;
            } else { // Recursive case
                $mid = $this->start + ($this->end - $this->start) / 2;
                $leftTask = new SumTask($this->start, $mid);
                $rightTask = new SumTask($mid + 1, $this->end);
                $leftTask->fork($worker); // Fork the first task
                $firstHalf = $rightTask->compute($worker, ...$args);
                $secondHalf = $leftTask->join($worker, ...$args); // Join results
                    return $firstHalf + $secondHalf;
            }
        }
    }

    // ===== Usage

    //Initialize pool, can reset default port of inter process communication
    $pool = ForkJoinPool::commonPool(/*1081*/);

    //Invoke recursive task on pool
    $result = $pool->invoke(new SumTask(1, 300000));
    assert(45000150000, $result);
```

Example 4 (CompletableFuture based on multiprocessing)
======================================================

[](#example-4-completablefuture-based-on-multiprocessing)

```
    //4.1
    $inputValue = 4;

    //issue non-blocking calls executed on process pool
    $future = CompletableFuture::supplyAsync(function () use ($inputValue) {
        $res = $inputValue * $inputValue;
        return $res;
    }, $executor)->thenApplyAsync(function ($result) {
        usleep(100000);
        $res = $result * 2;
        return $res;
    }, $executor)->thenApplyAsync(function ($result) {
        $res = $result * 2;
        return $res;
    }, $executor);

    //blocking call to get resulting value
    assert($future->get() == 64);

    //4.2

    $inputValue = 4;

    $future = CompletableFuture::supplyAsync(function () use ($inputValue) {
        $res = $inputValue * $inputValue;
        return $res;
    })->thenApplyAsync(function ($result) {
        usleep(100000);
        $res = $result * 2;
        return $res;
    });

    //non-blocking call that runs after first two non-blocking calls - can be used for logging etc.
    $future->thenRunAsync(function () {
        fwrite(STDERR, "Running a follow-up background task...\n");
    });

    assert($future->get() == 32);

    //4.3

    $future1 = CompletableFuture::supplyAsync(function () {
        return 2;
    });
    $future2 = CompletableFuture::supplyAsync(function () {
        return 3;
    });

    //combine results of two futures and make non-blocking computation
    $resultFuture = $future1->thenCombineAsync($future2, function ($result1, $result2) {
        return $result1 + $result2;
    });

    assert($resultFuture->join() == 5);

    //4.4

    $future1 = CompletableFuture::runAsync(function () {
        // Simulate a task
        usleep(100000);
    });
    $future2 = CompletableFuture::runAsync(function () {
        // Simulate a task
        usleep(200000);
    });

    //non-blocking task running after both futures complete
    $combinedFuture = $future1->runAfterBothAsync($future2, function () {
        fwrite(STDERR, "Running a follow-up background task...\n");
    });

    $combinedFuture->join();

    //4.5

    $future1 = CompletableFuture::supplyAsync(function () {
        usleep(100000);
        return 2;
    });
    $future2 = CompletableFuture::supplyAsync(function () {
        return 1;
    });

    //non-blocking task running after either of two futures
    $resultFuture = $future1->applyToEitherAsync($future2, function ($result) {
        return $result * 2;
    });

    //4.6

    $future1 = CompletableFuture::runAsync(function () {
        // Simulate a task that takes longer
        usleep(200000);
    });
    $future2 = CompletableFuture::runAsync(function () {
        // Simulate a quicker start
        usleep(100000);
    });

    //non-blocking background task running after either of two futures
    $combinedFuture = $future1->runAfterEitherAsync($future2, function () {
        fwrite(STDERR, "Running a follow-up background task...\n");
    });

    //4.7

    //combine results of two non-blocking tasks
    $future = CompletableFuture::supplyAsync(function () {
        return "Hello";
    })->thenApplyAsync(function ($result) {
        return $result . " World";
    })->whenCompleteAsync(function ($result, $exception) {
        if ($exception == null) {
            assert(strpos($result, "World") !== false);
        }
    });

    //4.8

    $future1 = CompletableFuture::supplyAsync(function () {
        return "Hello";
    });
    $future2 = CompletableFuture::supplyAsync(function () {
        return "World";
    });

    //future that waits for all provided futures to complete
    $combinedFuture = CompletableFuture::allOf($future1, $future2);
    $combinedFuture->get();

    //4.9

     $future1 = CompletableFuture::supplyAsync(function () {
        //usleep(200000);
        return "Hello";
    });
    $future2 = CompletableFuture::supplyAsync(function () {
        return "World";
    });

    //future that waits any of the provided futures to complete
    $anyOfFuture = CompletableFuture::anyOf($future1, $future2);
```

Example 5 (ScheduledPoolExecutor, cycling jobs)
===============================================

[](#example-5-scheduledpoolexecutor-cycling-jobs)

```
use Concurrent\Executor\ScheduledPoolExecutor;

//create pool with 4 workers (processes)
$executor = new ScheduledPoolExecutor(4);

$futures = [];
//15 parallel cycling jobs
for ($i = 1; $i < 15; $i += 1) {

    //period in milliseconds
    $period = rand(3, 15);
    $future = $executor->scheduleAtFixedRate(function () use ($period) {
        fwrite(STDERR, getmypid() . ": task executed, period = $period (ms), current time = " . hrtime(true) . " (ns)\n");
    }, 0, $period, TimeUnit::MILLISECONDS);
    $futures[] = $future;
}

//sleep for 10 seconds and cancel execution
sleep(10);
foreach ($futures as $future) {
    $future->cancel(false);
}
```

Example 6 (ScheduledPoolExecutor, delayed jobs)
===============================================

[](#example-6-scheduledpoolexecutor-delayed-jobs)

```
use Concurrent\Executor\ScheduledPoolExecutor;

//create pool with 4 workers (processes)
$executor = new ScheduledPoolExecutor(4);

$future1 = $executor->schedule(function () {
    fwrite(STDERR, getmypid() . ": delayed task 1 executed, delay = 100 (ms), current time = " . hrtime(true) . " (ns)\n");
}, 100, TimeUnit::MILLISECONDS);

$future2 = $executor->schedule(function () {
    fwrite(STDERR, getmypid() . ": delayed task 2 executed, delay = 100 (ms), current time = " . hrtime(true) . " (ns)\n");
}, 100, TimeUnit::MILLISECONDS);

$future3 = $executor->schedule(function () {
    fwrite(STDERR, getmypid() . ": delayed task 3 executed, delay = 200 (ms), current time = " . hrtime(true) . " (ns)\n");
}, 200, TimeUnit::MILLISECONDS);

$future4 = $executor->schedule(function () {
    fwrite(STDERR, getmypid() . ": delayed task 4 executed, delay = 200 (ms), current time = " . hrtime(true) . " (ns)\n");
}, 200, TimeUnit::MILLISECONDS);

$future5 = $executor->schedule(function () {
    fwrite(STDERR, getmypid() . ": delayed task 5 executed, delay = 1000 (ms), current time = " . hrtime(true) . " (ns)\n");
}, 1000, TimeUnit::MILLISECONDS);
```

Running tests
=============

[](#running-tests)

```
./vendor/bin/phpunit ./tests

```

Running docker container with examples
======================================

[](#running-docker-container-with-examples)

```
cd example
docker-compose build
docker-compose up

```

To test core usage, you can edit `TestTask` - change value `2000` in `run` method to `8000` and then run container again. When container is running check how cores are used - use `top` command, then press `f` and turn on `p` (Last used Cpu). You will see, that while container is running, different processor cores are used.

Dependencies
============

[](#dependencies)

The library depends on Swoole extension and on GMP extension - the last one for handling large numbers.

###  Health Score

31

—

LowBetter than 68% of packages

Maintenance26

Infrequent updates — may be unmaintained

Popularity16

Limited adoption so far

Community14

Small or concentrated contributor base

Maturity60

Established project with proven stability

 Bus Factor1

Top contributor holds 96.4% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~49 days

Recently: every ~82 days

Total

14

Last Release

740d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/72e4d0dc8a6479742cdd0f141255b6f130df96cf2a1e3640349b9c737138697a?d=identicon)[bingo-soft](/maintainers/bingo-soft)

---

Top Contributors

[![vicvolk](https://avatars.githubusercontent.com/u/7192415?v=4)](https://github.com/vicvolk "vicvolk (27 commits)")[![bingo-soft](https://avatars.githubusercontent.com/u/48438172?v=4)](https://github.com/bingo-soft "bingo-soft (1 commits)")

---

Tags

phpconcurrentparallelpoolworker

###  Code Quality

TestsPHPUnit

Code StylePHP\_CodeSniffer

### Embed Badge

![Health badge](/badges/bingo-soft-concurrent/health.svg)

```
[![Health](https://phpackages.com/badges/bingo-soft-concurrent/health.svg)](https://phpackages.com/packages/bingo-soft-concurrent)
```

###  Alternatives

[dusterio/laravel-aws-worker

Run Laravel (or Lumen) tasks and queue listeners inside of AWS Elastic Beanstalk workers

3105.7M](/packages/dusterio-laravel-aws-worker)[qxsch/worker-pool

Runs tasks in a parallel processing workerpool.

108325.7k1](/packages/qxsch-worker-pool)[microsoft/azure-storage-queue

This project provides a set of PHP client libraries that make it easy to access Microsoft Azure Storage Queue APIs.

142.6M17](/packages/microsoft-azure-storage-queue)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
