PHPackages                             etouches/phplumber - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. etouches/phplumber

ActiveLibrary[Queues &amp; Workers](/categories/queues)

etouches/phplumber
==================

Pipelining library which allows a mix of asynchronous and synchronous processes.

v1.0.0-beta(9y ago)681MITPHPPHP &gt;=5.3

Since Mar 28Pushed 8y ago2 watchersCompare

[ Source](https://github.com/etouches-corp/phplumber)[ Packagist](https://packagist.org/packages/etouches/phplumber)[ RSS](/packages/etouches-phplumber/feed)WikiDiscussions master Synced 1mo ago

READMEChangelog (1)Dependencies (1)Versions (2)Used By (0)

Phplumber
=========

[](#phplumber)

Phplumber is a very simple pipelining library for PHP which allows a mix of asynchronous and synchronous processes. Use it to break a large process into multiple steps using a queue and multi-processing. Phplumber contains no hard-coded dependencies and can be backed by any queue setup and any storage mechanism.

Requirements
------------

[](#requirements)

- PHP 5.3+
- A queue, such as RabbitMQ or Redis
- Storage, such as a relational database table or Redis

Example
-------

[](#example)

Let's take creating and filling a database as an example. It takes multiple steps and some can be done concurrently.

1. Create database (one process)
2. Create and populate tables (one process per table)
3. Create views dependent on multiple tables (one process, dependent on all tables existing)

```
                         Create table 1
                       /                \
    Create database ->   Create table 2   -> Create views
                       \                /
                         Create table 3

```

First we would define our processes. Sequential steps extend `Process`. A step that can run multiple times with different data extends `MultiProcess`.

```
class CreateDatabase extends Process
{
    public function invoke($payload)
    {
        $database_name = $payload['database_name'];
        echo "Drop database $database_name if it exists...\n";
        echo "Creating database $database_name...\n";
    }
}
```

```
class CreateTable extends MultiProcess
{
    // Determine the data we need to queue for async processes
    public function getAsyncPayloads($payload)
    {
        $database_name = $payload['database_name'];
        $table_names = array('first_table', 'second_table', 'third_table');

        $payloads = array();
        foreach ($table_names as $table) {
            $payloads[] = array('database_name' => $database_name, 'table_name' => $table);
        }
        return $payloads;
    }

    public function invoke($payload)
    {
        $database_name = $payload['database_name'];
        $table_name = $payload['table'];
        echo "Connecting to database $database_name...\n";
        echo "Creating and populating $table_name...\n";

        switch ($payload['table']) {
            case 'first_table':
                // Create table and insert rows...
                break;
            // ...
        }
    }
}
```

Then we define the sequence of processes.

```
class CreateAndFillDatabase extends ProcessList
{
    protected function setup()
    {
        $this
            ->add('CreateDatabase')
            ->add('CreateTable')
            ->add('CreateViews');
    }
}
```

We can now kick off the process sequence.

```
$equation = new CreateAndFillDatabase();
$equation->process(array('database_name' => 'test_db'));
```

See the `examples` directory for complete, working demos.

Getting Started
---------------

[](#getting-started)

1. Implement `StorageInterface`. This will hold semaphore data. Appropriate storage engines include any relational database, nosql, or key-value stores such as Redis.
2. Extend the `Queue` class to integrate with a queue system. Appropriate queue engines include Redis and RabbitMQ.
3. Write each process as a class that extends `Process` (for synchronous) or `MultiProcess` (for asynchronous).
4. Implement `ProcessFactoryInterface`. This will create each instance of `ProcessInterface`, allowing you to set up your processes with any prerequisites, such as a database connection or configuration options.
5. Put the processes together by extending `ProcessList`.
6. Implement a worker daemon which instantiates your `Queue` implementation and calls `consume()` to listen for incoming messages. Each message is to invoke a single part of a multi-process. Run multiple workers to execute processes concurrently.
7. Choose a place in your system to start the entire workflow, instantiate your `ProcessList`, and call `process()`, passing it the initial payload.

###  Health Score

23

—

LowBetter than 27% of packages

Maintenance20

Infrequent updates — may be unmaintained

Popularity11

Limited adoption so far

Community9

Small or concentrated contributor base

Maturity44

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Unknown

Total

1

Last Release

3328d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/93078f3c8a22711c24251fdcff9df5b54a47f6a6d8f5167c2573e2ccb84020f7?d=identicon)[mattschwartz](/maintainers/mattschwartz)

---

Top Contributors

[![matt-schwartz](https://avatars.githubusercontent.com/u/5067345?v=4)](https://github.com/matt-schwartz "matt-schwartz (1 commits)")

---

Tags

asyncdesign patternpipeline

###  Code Quality

TestsPHPUnit

### Embed Badge

![Health badge](/badges/etouches-phplumber/health.svg)

```
[![Health](https://phpackages.com/badges/etouches-phplumber/health.svg)](https://phpackages.com/packages/etouches-phplumber)
```

###  Alternatives

[amphp/amp

A non-blocking concurrency framework for PHP applications.

4.4k123.4M321](/packages/amphp-amp)[react/socket

Async, streaming plaintext TCP/IP and secure TLS socket server and client connections for ReactPHP

1.3k116.9M402](/packages/react-socket)[revolt/event-loop

Rock-solid event loop for concurrent PHP applications.

91943.6M138](/packages/revolt-event-loop)[spatie/async

Asynchronous and parallel PHP with the PCNTL extension

2.8k4.5M37](/packages/spatie-async)[amphp/parallel

Parallel processing component for Amp.

84746.2M73](/packages/amphp-parallel)[react/dns

Async DNS resolver for ReactPHP

535114.1M100](/packages/react-dns)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
