PHPackages                             rockett/pipeline - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Utility &amp; Helpers](/categories/utility)
4. /
5. rockett/pipeline

ActiveLibrary[Utility &amp; Helpers](/categories/utility)

rockett/pipeline
================

A plug and play pipeline implementation.

4.1.0(5mo ago)4166.2k—2.1%11MITPHPPHP ^8.3CI passing

Since Jun 25Pushed 3mo agoCompare

[ Source](https://github.com/mikerockett/pipeline)[ Packagist](https://packagist.org/packages/rockett/pipeline)[ RSS](/packages/rockett-pipeline/feed)WikiDiscussions main Synced 1mo ago

READMEChangelog (1)Dependencies (2)Versions (13)Used By (1)

Pipeline
========

[](#pipeline)

[![GitHub License](https://camo.githubusercontent.com/a987dff7c3353cda1afd35c49975d073cf3eebc15f221c56d00d70aa684f1749/68747470733a2f2f696d672e736869656c64732e696f2f6769746875622f6c6963656e73652f6d696b65726f636b6574742f706970656c696e65)](https://camo.githubusercontent.com/a987dff7c3353cda1afd35c49975d073cf3eebc15f221c56d00d70aa684f1749/68747470733a2f2f696d672e736869656c64732e696f2f6769746875622f6c6963656e73652f6d696b65726f636b6574742f706970656c696e65)[![Packagist Version](https://camo.githubusercontent.com/d4640a67c8dc2ff61012d37df73b975fe7026376ea40b4bb6e385a0dee9673ee/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f762f726f636b6574742f706970656c696e653f6c6162656c3d52656c65617365)](https://camo.githubusercontent.com/d4640a67c8dc2ff61012d37df73b975fe7026376ea40b4bb6e385a0dee9673ee/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f762f726f636b6574742f706970656c696e653f6c6162656c3d52656c65617365)[![Packagist Downloads](https://camo.githubusercontent.com/7f0f87255c750f5cf003d6bcc469d4805cd59ff3fb67be3dc2b2cbe33d5a9cea/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f646d2f726f636b6574742f706970656c696e653f6c6162656c3d496e7374616c6c73)](https://camo.githubusercontent.com/7f0f87255c750f5cf003d6bcc469d4805cd59ff3fb67be3dc2b2cbe33d5a9cea/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f646d2f726f636b6574742f706970656c696e653f6c6162656c3d496e7374616c6c73)[![Forgejo Workflow Status](https://camo.githubusercontent.com/71af26578c95801eb48e1477aece8e8488287e8d69c31126b1f948480af509a3/68747470733a2f2f6769742e726f636b6574742e70772f726f636b6574742f706970656c696e652f616374696f6e732f776f726b666c6f77732f74657374732e796d6c2f62616467652e737667)](https://camo.githubusercontent.com/71af26578c95801eb48e1477aece8e8488287e8d69c31126b1f948480af509a3/68747470733a2f2f6769742e726f636b6574742e70772f726f636b6574742f706970656c696e652f616374696f6e732f776f726b666c6f77732f74657374732e796d6c2f62616467652e737667)

Provides an implementation of the [pipeline pattern](https://en.wikipedia.org/wiki/Pipeline_(software)) with additional processors for **conditional interruption** and **stage tapping**.

This package was originally forked from League's excellent [pipeline package](https://github.com/thephpleague/pipeline).

Installation
------------

[](#installation)

```
composer require rockett/pipeline
```

Requires PHP 8.3+.

Quick Start
-----------

[](#quick-start)

```
use Rockett\Pipeline\Pipeline;

$pipeline = (new Pipeline)
    ->pipe(fn($x) => $x * 2)
    ->pipe(fn($x) => $x + 1);

echo $pipeline->process(10); // Outputs: 21
```

Note

**PHP 8.5+ Pipe Operator:** With PHP 8.5 introducing the native [pipe operator](https://php.watch/versions/8.5/pipe-operator), basic sequential operations can now be achieved without a package. However, this library still provides value through reusable pipeline objects, conditional interruption (`continueWhen`/`continueUnless`), stage tapping for observability (`beforeEach`/`afterEach`), and a fluent API for composing complex processing workflows that go beyond simple function chaining.

Pipeline Pattern
----------------

[](#pipeline-pattern)

The pipeline pattern lets you compose sequential operations by chaining *stages*. Each stage receives a *traveler* (payload), processes it, and passes the output to the next stage. Internally, this is equivalent to:

```
$output = $stage3($stage2($stage1($traveler)));
```

Immutability
------------

[](#immutability)

Pipelines are implemented as immutable stage-chains, contracted by the `PipelineContract` interface. When you add a new stage, the pipeline will be cloned with the new stage added in. This makes pipelines easy to re-use, and minimizes side-effects.

Usage
-----

[](#usage)

Operations in a pipeline (stages) can accept anything from the pipeline that satisfies the `callable` type-hint. So closures and anything that's invokable will work.

```
$pipeline = (new Pipeline)->pipe(static function ($traveler) {
  return $traveler * 10;
});
```

Class-based stages
------------------

[](#class-based-stages)

Classes can be used as stages by implementing `StageContract` and an `__invoke` method:

```
use Rockett\Pipeline\Pipeline;
use Rockett\Pipeline\Contracts\StageContract;

class TimesTwoStage implements StageContract
{
  public function __invoke($traveler)
  {
    return $traveler * 2;
  }
}

$pipeline = (new Pipeline)
  ->pipe(new TimesTwoStage)
  ->pipe(new PlusOneStage);

$pipeline->process(10); // Returns 21
```

You can create custom stage contracts to type-hint the traveler and return type.

Re-usability
------------

[](#re-usability)

Pipelines can be re-used as stages within other pipelines, enabling composable architectures:

```
$processApiRequest = (new Pipeline)
  ->pipe(new ExecuteHttpRequest) // B
  ->pipe(new ParseJsonResponse); // C

$pipeline = (new Pipeline)
  ->pipe(new ConvertToPsr7Request) // A
  ->pipe($processApiRequest) // (B and C)
  ->pipe(new ConvertToDataTransferObject); // D

$pipeline->process(new DeleteArticle($postId));
```

Pipeline Builders
-----------------

[](#pipeline-builders)

While pipelines are immutable by design, there are scenarios where you need to conditionally compose stages before building the pipeline. Pipeline builders solve this by providing a **mutable container** for collecting stages, which is then converted to an immutable pipeline:

```
use Rockett\Pipeline\Builder\PipelineBuilder;

$builder = new PipelineBuilder;

$builder->add(new ValidateInput)
  ->add(new SanitizeData);

if ($config->get('logging.enabled')) {
  $builder->add(new LogRequest);
}

if ($user->hasPermission('admin')) {
  $builder->add(new EnrichWithAdminData);
}

$builder->add(new TransformToResponse)
  ->add(new CompressOutput);

$pipeline = $builder->build();
$result = $pipeline->process($request);
```

Once `build()` is called, you have an immutable pipeline that can be reused or passed around safely without concerns about side-effects from modifications.

Processors
----------

[](#processors)

Processors handle iteration through stages and enable additional features like conditional interruption and stage tapping.

Caution

**As of v4.1, these processors are deprecated and slated for removal in v5:**

- **InterruptibleProcessor** – use **Processor** with `continueUnless()`/`continueWhen()`
- **TapProcessor** – use **Processor** with `beforeEach()`/`afterEach()`
- **InterruptibleTapProcessor** – use **Processor** with combined methods

### `FingersCrossedProcessor` (Default)

[](#fingerscrossedprocessor-default)

Basic sequential processing with no early exit capability (throw an exception to stop).

```
use Rockett\Pipeline\Pipeline;
use Rockett\Pipeline\Processors\FingersCrossedProcessor;

$pipeline = new Pipeline(new FingersCrossedProcessor);
// Or simply: new Pipeline() – FingersCrossedProcessor is the default
```

### `Processor`

[](#processor)

The `Processor` supports conditional interruption (early exit) and stage tapping (callbacks before/after each stage), configured fluently with method-chaining:

```
use Rockett\Pipeline\Processors\Processor;

$processor = (new Processor())
    ->continueUnless(fn($traveler) => $traveler->hasError())
    ->beforeEach(fn($traveler) => $logger->info('Processing:', $traveler->toArray()))
    ->afterEach(fn($traveler) => $metrics->increment('pipeline.stage.completed'));

$pipeline = (new Pipeline($processor))
    ->pipe(new ValidateInput)
    ->pipe(new ProcessData)
    ->pipe(new SaveToDatabase);
```

Features can be composed via method chaining:

- `continueUnless(callable)` – exit when callback returns true
- `continueWhen(callable)` – exit when callback returns false
- `invert()` – invert the interrupt condition
- `beforeEach(callable)` – execute callback before each stage
- `afterEach(callable)` – execute callback after each stage

#### Exiting pipelines early

[](#exiting-pipelines-early)

Use interrupt methods to exit pipelines early based on conditions:

```
use Rockett\Pipeline\Processors\Processor;

$processor = (new Processor())
    ->continueUnless(fn($traveler) => $traveler->hasError());

$pipeline = (new Pipeline($processor))
    ->pipe(new ValidateInput)
    ->pipe(new ProcessData)
    ->pipe(new SaveToDatabase);

$output = $pipeline->process($request);
```

In this example, when `$traveler->hasError()` returns true, the pipeline exits early.

**Available interrupt methods:**

```
// Exit when condition is true
$processor = (new Processor())
    ->continueUnless(fn($traveler) => $traveler->hasError());

// Exit when condition becomes false
$processor = (new Processor())
    ->continueWhen(fn($traveler) => $traveler->isValid());

// Invert the condition
$processor = (new Processor())
    ->continueWhen(fn($traveler) => $traveler->isValid())
    ->invert(); // Now exits when isValid() returns false
```

#### Invoking actions on each stage

[](#invoking-actions-on-each-stage)

Use tap methods to invoke callbacks before and/or after each stage for logging, metrics, or debugging:

```
use Rockett\Pipeline\Processors\Processor;

$processor = (new Processor())
    ->beforeEach(fn($traveler) => $logger->info('Processing:', $traveler->toArray()))
    ->afterEach(fn($traveler) => $metrics->increment('pipeline.stage.completed'));

$pipeline = (new Pipeline($processor))
    ->pipe(new StageOne)
    ->pipe(new StageTwo)
    ->pipe(new StageThree);

$output = $pipeline->process($traveler);
```

#### Per-stage conditions

[](#per-stage-conditions)

Stages can optionally implement a `condition` method to control whether they should execute. If the condition returns false, the stage is skipped and the traveler is passed to the next stage untouched.

```
class ProcessPaymentStage implements StageContract
{
  public function condition($traveler): bool
  {
    return $traveler->requiresPayment();
  }

  public function __invoke($traveler)
  {
    return $traveler->processPayment();
  }
}
```

Note

Condition-checking is done after the `beforeEach` stage tap.

Handling Exceptions
-------------------

[](#handling-exceptions)

The package won't catch exceptions. Handle them in your code, either inside a stage or when calling the pipeline.

```
$pipeline = (new Pipeline)->pipe(
  static fn () => throw new LogicException
);

try {
  $pipeline->process($traveler);
} catch(LogicException $e) {
  // Handle the exception.
}
```

Testing
-------

[](#testing)

```
composer test
```

License
-------

[](#license)

Pipeline is licensed under the permissive [MIT license](license.md).

Contributing
------------

[](#contributing)

Contributions are welcome – if you have something to add to this package, or have found a bug, feel free to submit a pull request for review.

###  Health Score

59

—

FairBetter than 99% of packages

Maintenance75

Regular maintenance activity

Popularity38

Limited adoption so far

Community21

Small or concentrated contributor base

Maturity85

Battle-tested with a long release history

 Bus Factor1

Top contributor holds 62.9% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~346 days

Recently: every ~439 days

Total

12

Last Release

171d ago

Major Versions

0.3.0 → 1.0.02018-06-05

1.0.0 → 2.0.02021-02-03

2.0.2 → 3.0.02023-02-07

3.0.0 → 4.0.02025-08-18

PHP version history (5 changes)0.1.0PHP &gt;=5.5

1.0.0PHP &gt;=7.1

2.0.0PHP &gt;=7.4

3.0.0PHP &gt;=8.1

4.0.0PHP ^8.3

### Community

Maintainers

![](https://www.gravatar.com/avatar/e4dcdd0d21252fa63156e1923b0a1fcc01b850e2b5fd3d917d5c09575f4f1dbb?d=identicon)[rockett](/maintainers/rockett)

---

Top Contributors

[![frankdejonge](https://avatars.githubusercontent.com/u/534693?v=4)](https://github.com/frankdejonge "frankdejonge (73 commits)")[![mikerockett](https://avatars.githubusercontent.com/u/4586280?v=4)](https://github.com/mikerockett "mikerockett (16 commits)")[![shadowhand](https://avatars.githubusercontent.com/u/38203?v=4)](https://github.com/shadowhand "shadowhand (13 commits)")[![nyamsprod](https://avatars.githubusercontent.com/u/51073?v=4)](https://github.com/nyamsprod "nyamsprod (2 commits)")[![raphaelstolt](https://avatars.githubusercontent.com/u/48225?v=4)](https://github.com/raphaelstolt "raphaelstolt (2 commits)")[![jblotus](https://avatars.githubusercontent.com/u/382230?v=4)](https://github.com/jblotus "jblotus (1 commits)")[![leoruhland](https://avatars.githubusercontent.com/u/1785552?v=4)](https://github.com/leoruhland "leoruhland (1 commits)")[![mstnorris](https://avatars.githubusercontent.com/u/1919816?v=4)](https://github.com/mstnorris "mstnorris (1 commits)")[![rishipuri](https://avatars.githubusercontent.com/u/4106278?v=4)](https://github.com/rishipuri "rishipuri (1 commits)")[![alexandrubau](https://avatars.githubusercontent.com/u/1225661?v=4)](https://github.com/alexandrubau "alexandrubau (1 commits)")[![spiliot](https://avatars.githubusercontent.com/u/1238092?v=4)](https://github.com/spiliot "spiliot (1 commits)")[![dannyvankooten](https://avatars.githubusercontent.com/u/885856?v=4)](https://github.com/dannyvankooten "dannyvankooten (1 commits)")[![duncan3dc](https://avatars.githubusercontent.com/u/546811?v=4)](https://github.com/duncan3dc "duncan3dc (1 commits)")[![hannesvdvreken](https://avatars.githubusercontent.com/u/1410358?v=4)](https://github.com/hannesvdvreken "hannesvdvreken (1 commits)")[![jamesdb](https://avatars.githubusercontent.com/u/6299056?v=4)](https://github.com/jamesdb "jamesdb (1 commits)")

---

Tags

leaguephppipelinepatterndesign patternpipelinecompositionsequential

###  Code Quality

TestsPest

### Embed Badge

![Health badge](/badges/rockett-pipeline/health.svg)

```
[![Health](https://phpackages.com/badges/rockett-pipeline/health.svg)](https://phpackages.com/packages/rockett-pipeline)
```

###  Alternatives

[league/pipeline

A plug and play pipeline implementation.

1.0k16.0M74](/packages/league-pipeline)[stolz/assets

An ultra-simple-to-use assets management library

296519.2k8](/packages/stolz-assets)[getsolaris/laravel-make-service

A MVCS pattern create a service command for Laravel 5+

81161.3k](/packages/getsolaris-laravel-make-service)[redeyeventures/geopattern

Generate beautiful SVG patterns.

11140.8k2](/packages/redeyeventures-geopattern)[functional-php/pattern-matching

Pattern matching for PHP with automatic destructuring.

8261.5k](/packages/functional-php-pattern-matching)[lezhnev74/pasvl

Array Validator (regular expressions for nested array, sort of)

5253.7k3](/packages/lezhnev74-pasvl)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
