PHPackages                             vherbaut/laravel-pipeline-jobs - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. vherbaut/laravel-pipeline-jobs

ActiveLibrary[Queues &amp; Workers](/categories/queues)

vherbaut/laravel-pipeline-jobs
==============================

A Laravel package for building job pipelines with saga pattern support

1.6.3(1mo ago)20MITPHPPHP ^8.2CI passing

Since Apr 12Pushed 1mo agoCompare

[ Source](https://github.com/vherbaut/laravel-pipeline-jobs)[ Packagist](https://packagist.org/packages/vherbaut/laravel-pipeline-jobs)[ Docs](https://github.com/vherbaut/laravel-pipeline-jobs)[ RSS](/packages/vherbaut-laravel-pipeline-jobs/feed)WikiDiscussions main Synced 1w ago

READMEChangelog (7)Dependencies (8)Versions (12)Used By (0)

Laravel Pipeline Jobs
=====================

[](#laravel-pipeline-jobs)

[![Latest Version on Packagist](https://camo.githubusercontent.com/abe33acbe1729ab7b99e879b70494fd3b059faa0ef1efb4ddf235afe64c3c555/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f762f76686572626175742f6c61726176656c2d706970656c696e652d6a6f62732e7376673f7374796c653d666c61742d737175617265)](https://packagist.org/packages/vherbaut/laravel-pipeline-jobs)[![Tests](https://camo.githubusercontent.com/0e22c6745391f21078477be6322c25caad852f1b0bfa6fa170c5b46ac49e1cdb/68747470733a2f2f696d672e736869656c64732e696f2f6769746875622f616374696f6e732f776f726b666c6f772f7374617475732f76686572626175742f6c61726176656c2d706970656c696e652d6a6f62732f72756e2d74657374732e796d6c3f6272616e63683d6d61696e266c6162656c3d7465737473267374796c653d666c61742d737175617265)](https://github.com/vherbaut/laravel-pipeline-jobs/actions/workflows/run-tests.yml)[![PHPStan](https://camo.githubusercontent.com/4470f8df20a6d13b9eac69d9a1f3abcd72eb30f8436f5355e69f06cb6bd970ab/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f5048505374616e2d6c6576656c253230352d627269676874677265656e2e7376673f7374796c653d666c61742d737175617265)](https://phpstan.org/)[![Total Downloads](https://camo.githubusercontent.com/0143a30aa725770de4dffc74341e30095562503757d434b82e7ce0b1a11e6f20/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f64742f76686572626175742f6c61726176656c2d706970656c696e652d6a6f62732e7376673f7374796c653d666c61742d737175617265)](https://packagist.org/packages/vherbaut/laravel-pipeline-jobs)[![License](https://camo.githubusercontent.com/c654a5b1e774d8ded1bbe6e7d883cf4a1e60174882e0798731c2af6f269d0ceb/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f6c2f76686572626175742f6c61726176656c2d706970656c696e652d6a6f62732e7376673f7374796c653d666c61742d737175617265)](https://packagist.org/packages/vherbaut/laravel-pipeline-jobs)

> Une documentation en français est disponible dans [README-fr.md](README-fr.md).

**Orchestrate your Laravel business jobs with a shared typed context, automatic compensation on failure, and built in observability.**

You have already written a chain of jobs to process an order, onboard a user, or run a billing cycle. It usually looks like this:

1. Job 1 produces a result, you stash it in the cache under an ad hoc key so Job 2 can pick it up.
2. Job 2 fails after Job 1 has charged the customer, so you write the refund logic by hand.
3. To add logging or metrics, you edit each job one by one.
4. In tests you mock the bus, in production it runs queued, and the two paths drift apart over time.

This package replaces that patchwork with a fluent API:

- A **typed context** object flows through every step. No more cache keys to pass a DTO between three jobs.
- A step fails? The pipeline automatically runs the **saga compensation** you declared (refund, inventory release, remote resource cleanup).
- Logs, metrics, alerts: one call to `dispatchEvents()` exposes three Laravel events, all correlated by a `pipelineId`.
- The **same code** runs synchronously in your Pest tests and queued in production. You add or remove `shouldBeQueued()`, nothing else changes.

Table of Contents
-----------------

[](#table-of-contents)

- [Why this package exists](#why-this-package-exists)
- [What changes in your day to day](#what-changes-in-your-day-to-day)
- [Is this for me?](#is-this-for-me)
- [Requirements](#requirements)
- [Installation](#installation)
- [Quick Start](#quick-start)
- [Ecosystem Integration Example](#ecosystem-integration-example)
- [Documentation](#documentation)
- [Roadmap](#roadmap)
- [Contributing](#contributing)
- [License](#license)

Why this package exists
-----------------------

[](#why-this-package-exists)

`Bus::chain()` runs jobs in sequence. That is where the help ends. Every other concern around a real business flow is left to you:

What you actually needPlain `Bus::chain()`With Laravel Pipeline JobsShare data between stepsSerialize to cache or DB, retrieve in each job, defend against cache misses and race conditions.One typed context object flows through every step, with IDE autocompletion and static analysis.Roll back on failureWrite ad hoc undo logic inside each job. Get the order wrong, miss a step, leak half written state.`compensateWith(...)` runs the reverse path automatically, with three `FailStrategy` policies.Observe runs (logs, metrics, alerts)Inject logging into every job, or write a listener class per chain.`dispatchEvents()`, three Laravel events, correlated by `pipelineId`.Run the same flow sync in tests and queued in prodMaintain two code paths, or skip the test.Add or remove `shouldBeQueued()`. Same pipeline.Cap throughput per tenant, per customer, per anythingSprinkle throttles in each job. Racy, inconsistent.`rateLimit($key, max, perSeconds)` and `maxConcurrent($key, limit)` gate the whole pipeline before any step runs.Assert what ran, in what order, against which contextMock the bus, reconstruct intent, hope for the best.`Pipeline::fake()` with first class assertions on steps, context snapshots, and compensation.Fan out / join, nest sub pipelines, pick a branch at runtimeBuild it yourself every time.`JobPipeline::parallel()`, `JobPipeline::nest()`, `Step::branch()`, all composable.If any row in that table describes your current pain, this package was written for you.

What changes in your day to day
-------------------------------

[](#what-changes-in-your-day-to-day)

### Before

[](#before)

```
// Job 1: charge the customer, persist the invoice somewhere the next job can find it.
Cache::put("order:{$order->id}:invoice", $invoice, 3600);

// Job 2: fetch the invoice, reserve stock, persist again.
$invoice = Cache::get("order:{$order->id}:invoice") ?? throw new RuntimeException('lost');
Cache::put("order:{$order->id}:shipment", $shipment, 3600);

// Job 3: fetch both, send the email, and if anything blows up halfway through...
// good luck figuring out the rollback, the observability, and the test strategy.
```

### After

[](#after)

```
JobPipeline::make([
    ValidateOrder::class,
    ChargeCustomer::class,
    ReserveInventory::class,
    SendConfirmation::class,
])
    ->compensateWith([RefundCustomer::class, ReleaseInventory::class])
    ->dispatchEvents()
    ->shouldBeQueued()
    ->send(new OrderContext(order: $order))
    ->run();
```

Typed context. Declarative rollback. Observability on. Queued in production. The exact same lines, minus `shouldBeQueued()`, run in a Pest test with `Pipeline::fake()` and first class assertions.

Is this for me?
---------------

[](#is-this-for-me)

Reach for this package when any of these is true:

1. You have multi step business flows where each step depends on the previous one (orders, onboarding, billing, imports, syncs, provisioning).
2. You need partial rollback when a step fails halfway (refund the charge, release the stock, close the remote resource).
3. You want the same flow testable in sync, runnable in queue, with zero forked code paths.
4. You want per tenant or per customer rate limiting and concurrency on a whole flow, not on individual jobs.
5. You care about type safety, static analysis, and IDE autocompletion across every step of a long flow.

Skip this package if you only dispatch fire and forget jobs that do not share state, never need rollback, and never need a shared context.

Key features at a glance
------------------------

[](#key-features-at-a-glance)

- **Typed context.** A shared DTO flows through every step, with full IDE autocompletion and static analysis support.
- **Sync and queued execution.** Flip a single call (`shouldBeQueued()`) to move a pipeline between modes with no code changes.
- **Saga compensation.** Declarative rollback with `compensateWith()` plus three `FailStrategy` policies.
- **Conditional steps.** `when()` / `unless()` predicates evaluated against the live context.
- **Lifecycle hooks and observability.** Six hooks (per step and pipeline level) for logging, metrics, and alerting.
- **Event listener bridge.** One line to register a pipeline as a listener.
- **Parallel execution and branching.** Fan out / fan in groups (`JobPipeline::parallel`), nested sub pipelines (`JobPipeline::nest`), conditional branches (`Step::branch`).
- **Admission control.** Pipeline wide `rateLimit()` and `maxConcurrent()` gates, keyed by closure or string, evaluated before any step runs.
- **Ecosystem integration.** Opt in Laravel events, `reverse()` for symmetrical rollbacks, three accepted step shapes (`handle()`, middleware `handle($passable, Closure $next)`, invokable `__invoke()`).
- **Comprehensive testing toolkit.** `Pipeline::fake()`, recording mode, context snapshots, compensation assertions.

Requirements
------------

[](#requirements)

DependencyVersionPHP8.2+Laravel11.x, 12.x, 13.xInstallation
------------

[](#installation)

```
composer require vherbaut/laravel-pipeline-jobs
```

The package auto discovers its service provider and facade. No manual registration is needed.

### Optional: enable parallel step groups

[](#optional-enable-parallel-step-groups)

Parallel step groups (`JobPipeline::parallel([...])`) fan out each sub step through Laravel's `Bus::batch()`, which requires the `job_batches` table. If you plan to use parallel groups on queued pipelines, run Laravel's built in command once to publish and apply the migration:

```
php artisan queue:batches-table
php artisan migrate
```

You can skip this step if you never dispatch parallel groups, or if you only run them synchronously (sync pipelines do not touch `Bus::batch()`).

Quick Start
-----------

[](#quick-start)

**1. Define a typed context** that carries data through your pipeline:

```
use Vherbaut\LaravelPipelineJobs\Context\PipelineContext;

class OrderContext extends PipelineContext
{
    public ?Invoice $invoice = null;
    public string $status = 'pending';

    public function __construct(
        public Order $order,
    ) {}
}
```

**2. Write your job steps.** Add the `InteractsWithPipeline` trait to read the context:

```
use Vherbaut\LaravelPipelineJobs\Concerns\InteractsWithPipeline;

class ChargeCustomer
{
    use InteractsWithPipeline;

    public function handle(PaymentService $payments): void
    {
        $context = $this->pipelineContext();
        $context->invoice = $payments->charge($context->order);
        $context->status = 'charged';
    }
}
```

**3. Run the pipeline:**

```
use Vherbaut\LaravelPipelineJobs\JobPipeline;

$result = JobPipeline::make([
    ValidateOrder::class,
    ChargeCustomer::class,
    SendConfirmation::class,
])
    ->send(new OrderContext(order: $order))
    ->run();

// $result is the final OrderContext with all steps applied.
```

For the full walkthrough (context design, queued mode, compensation, hooks, testing), see [docs/en/getting-started.md](docs/en/getting-started.md).

Ecosystem Integration Example
-----------------------------

[](#ecosystem-integration-example)

The following example wires four integration features into a single pipeline. A tenant scoped order fulfillment chain that mixes a classic `handle()` step, a middleware style audit step, and an invokable Action step, gated by rate limit and concurrency, observable through Laravel events.

```
use Closure;
use Illuminate\Support\Facades\Event;
use Vherbaut\LaravelPipelineJobs\Context\PipelineContext;
use Vherbaut\LaravelPipelineJobs\Concerns\InteractsWithPipeline;
use Vherbaut\LaravelPipelineJobs\Events\PipelineCompleted;
use Vherbaut\LaravelPipelineJobs\Events\PipelineStepCompleted;
use Vherbaut\LaravelPipelineJobs\Events\PipelineStepFailed;
use Vherbaut\LaravelPipelineJobs\JobPipeline;

// 1. Typed context carrying the tenant id and the order payload.
final class OrderContext extends PipelineContext
{
    public ?Invoice $invoice = null;

    public function __construct(
        public readonly int $tenantId,
        public readonly Order $order,
    ) {}
}

// 2. Classic handle() step with InteractsWithPipeline for context access.
final class ValidateOrder
{
    use InteractsWithPipeline;

    public function handle(OrderValidator $validator): void
    {
        $validator->validate($this->pipelineContext()->order);
    }
}

// 3. Middleware style audit step. Uses $passable + Closure $next.
final class AuditStep
{
    public function handle(?OrderContext $passable, Closure $next): mixed
    {
        logger()->info('pipeline step started', ['tenant' => $passable?->tenantId]);
        $result = $next($passable);
        logger()->info('pipeline step finished', ['tenant' => $passable?->tenantId]);

        return $result;
    }
}

// 4. Action step. Invoked via __invoke(?PipelineContext $context).
final class NotifyCustomer
{
    public function __invoke(?OrderContext $context): void
    {
        if ($context?->invoice !== null) {
            Notification::send($context->order->customer, new OrderConfirmation($context->invoice));
        }
    }
}

// 5. Observe events in a service provider (once per boot).
Event::listen(PipelineStepCompleted::class, fn ($event) => metrics()->increment('pipeline.step.ok', ['class' => $event->stepClass]));
Event::listen(PipelineStepFailed::class,    fn ($event) => report($event->exception));
Event::listen(PipelineCompleted::class,     fn ($event) => metrics()->increment('pipeline.run.done'));

// 6. Compose the pipeline. All integration features at once.
$result = JobPipeline::make([
    ValidateOrder::class,
    AuditStep::class,
    ChargeCustomer::class,
    NotifyCustomer::class,
])
    ->rateLimit(
        fn (?OrderContext $ctx) => 'orders:tenant:'.$ctx->tenantId,
        max: 10,
        perSeconds: 60,
    )                                                  // Per tenant quota.
    ->maxConcurrent(
        fn (?OrderContext $ctx) => 'orders:tenant:'.$ctx->tenantId,
        limit: 3,
    )                                                  // Per tenant concurrency.
    ->dispatchEvents()                                 // Opt in observability.
    ->shouldBeQueued()
    ->send(new OrderContext(tenantId: $tenant->id, order: $order))
    ->run();

// Need to replay the same steps backwards for a tenant wide unwind?
// JobPipeline::make([...])->reverse()->send(...)->run();
```

The pipeline mixes three step shapes (classic, middleware, action) transparently. Rate limit and concurrency throw `PipelineThrottled` / `PipelineConcurrencyLimitExceeded` before any step runs when saturated. Events flow through Laravel's dispatcher and can be observed, queued, or batched by any listener.

See the dedicated docs for each feature:

- [Pipeline events](docs/en/pipeline-events.md).
- [Reverse pipelines](docs/en/reverse-pipelines.md).
- [Rate limiting and concurrency](docs/en/rate-limiting-concurrency.md).
- [Alternative step interfaces](docs/en/alternative-step-interfaces.md).

Documentation
-------------

[](#documentation)

English documentation lives under [`docs/en/`](docs/en/). French documentation lives under [`docs/fr/`](docs/fr/).

TopicDescriptionLinkGetting StartedInstall the package, write your first typed context, run a pipeline, pass data between steps.[docs/en/getting-started.md](docs/en/getting-started.md)Core Concepts`PipelineContext`, `PipelineBuilder` (array vs fluent), sync and queued execution modes.[docs/en/core-concepts.md](docs/en/core-concepts.md)Pipeline Aware JobsWire a job to the shared context via the `InteractsWithPipeline` trait or an explicit property. Dual mode jobs.[docs/en/pipeline-aware-jobs.md](docs/en/pipeline-aware-jobs.md)Return ValuesTransform the final context into a scalar value with `->return(Closure)`.[docs/en/return-values.md](docs/en/return-values.md)Conditional StepsBranch execution with `when()` / `unless()` predicates evaluated against the live context.[docs/en/conditional-steps.md](docs/en/conditional-steps.md)Queued PipelinesRun pipelines through Laravel's queue system. Serialization, retries, worker affinity.[docs/en/queued-pipelines.md](docs/en/queued-pipelines.md)Event Listener BridgeRegister a pipeline as an event listener with `JobPipeline::listen()` or `toListener()`.[docs/en/event-listener-bridge.md](docs/en/event-listener-bridge.md)Saga CompensationRollback with `compensateWith()`, `FailStrategy` policies, `CompensableJob` contract, failure observability.[docs/en/saga-compensation.md](docs/en/saga-compensation.md)Lifecycle HooksPer-step hooks (`beforeEach`, `afterEach`, `onStepFailed`) and pipeline-level callbacks (`onSuccess`, `onFailure(Closure)`, `onComplete`).[docs/en/lifecycle-hooks.md](docs/en/lifecycle-hooks.md)Per-Step ConfigurationRoute each step to its own queue or connection, override sync execution, set retry, backoff, and timeout policies per step, with pipeline-level defaults.[docs/en/per-step-configuration.md](docs/en/per-step-configuration.md)Dispatch VerbExecute a pipeline with `Pipeline::dispatch([...])` as a Bus::dispatch-style alternative to `->make()->run()`. Auto-runs on destruct.[docs/en/dispatch-verb.md](docs/en/dispatch-verb.md)Parallel Step GroupsFan out / fan in groups via `JobPipeline::parallel([...])`, `Bus::batch()` dispatch on the queue, context merging, nesting constraints.[docs/en/parallel-steps.md](docs/en/parallel-steps.md)Pipeline NestingReuse sub pipelines via `JobPipeline::nest(...)`, nested cursor for queued mode, outer FailStrategy inheritance, inner defaults governance.[docs/en/pipeline-nesting.md](docs/en/pipeline-nesting.md)Conditional BranchingPick a branch at runtime via `Step::branch($selector, [...])`, accepted branch values (class string, StepDefinition, sub pipeline), convergence on the next outer step.[docs/en/conditional-branching.md](docs/en/conditional-branching.md)Pipeline EventsOpt in Laravel events at three lifecycle points (`PipelineStepCompleted`, `PipelineStepFailed`, `PipelineCompleted`), correlate by `pipelineId`, queued listener caveat.[docs/en/pipeline-events.md](docs/en/pipeline-events.md)Reverse Pipelines`PipelineBuilder::reverse()` for outer position reversal, inner structure preservation, full pipeline state copy, compensation interaction.[docs/en/reverse-pipelines.md](docs/en/reverse-pipelines.md)Rate Limiting and ConcurrencyPipeline level admission gates via `rateLimit()` and `maxConcurrent()`, Closure based keys, Cache driver requirements, composition with events.[docs/en/rate-limiting-concurrency.md](docs/en/rate-limiting-concurrency.md)Alternative Step InterfacesThree accepted step shapes (`handle()`, middleware `handle($passable, Closure $next)`, invokable `__invoke()`), parameter naming contract, `InteractsWithPipeline` trait compatibility.[docs/en/alternative-step-interfaces.md](docs/en/alternative-step-interfaces.md)Testing`Pipeline::fake()`, recording mode, step and context assertions, compensation assertions.[docs/en/testing.md](docs/en/testing.md)API ReferenceComplete catalog of public symbols, methods, properties, exceptions, and events.[docs/en/api-reference.md](docs/en/api-reference.md)Roadmap
-------

[](#roadmap)

The following features are planned for future releases. The properties are already reserved in the codebase:

- **Top level named pipelines.** A `name('order-fulfillment')` to tag the whole pipeline (parallel groups, sub pipelines, and branches already expose an optional `name`).
- **Middleware and Action compensation.** Extend the strategy aware dispatcher to compensation paths so middleware shape and Action shape classes can serve as compensation targets (current contract requires classic `handle()` or `CompensableJob`).

Contributing
------------

[](#contributing)

Contributions are welcome. To get started:

```
# Run the test suite
composer test

# Run static analysis
composer analyse

# Format code
composer format
```

License
-------

[](#license)

The MIT License (MIT). Please see the [LICENSE](LICENSE) file for more information.

###  Health Score

41

—

FairBetter than 87% of packages

Maintenance90

Actively maintained with recent releases

Popularity3

Limited adoption so far

Community8

Small or concentrated contributor base

Maturity53

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 98.5% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~0 days

Total

10

Last Release

51d ago

### Community

Maintainers

![](https://avatars.githubusercontent.com/u/8347782?v=4)[Vincent H.](/maintainers/vherbaut)[@vherbaut](https://github.com/vherbaut)

---

Top Contributors

[![vincent-akawam](https://avatars.githubusercontent.com/u/166985545?v=4)](https://github.com/vincent-akawam "vincent-akawam (64 commits)")[![vherbaut](https://avatars.githubusercontent.com/u/8347782?v=4)](https://github.com/vherbaut "vherbaut (1 commits)")

---

Tags

laraveljobspipelinesagavherbaut

###  Code Quality

TestsPest

Static AnalysisPHPStan

Code StyleLaravel Pint

Type Coverage Yes

### Embed Badge

![Health badge](/badges/vherbaut-laravel-pipeline-jobs/health.svg)

```
[![Health](https://phpackages.com/badges/vherbaut-laravel-pipeline-jobs/health.svg)](https://phpackages.com/packages/vherbaut-laravel-pipeline-jobs)
```

###  Alternatives

[laravel/horizon

Dashboard and code-driven configuration for Laravel queues.

4.1k91.3M277](/packages/laravel-horizon)[laravel/scout

Laravel Scout provides a driver based solution to searching your Eloquent models.

1.7k53.0M578](/packages/laravel-scout)[illuminate/broadcasting

The Illuminate Broadcasting package.

7126.9M199](/packages/illuminate-broadcasting)[psalm/plugin-laravel

Psalm plugin for Laravel

3325.1M337](/packages/psalm-plugin-laravel)[illuminate/notifications

The Illuminate Notifications package.

513.0M1.1k](/packages/illuminate-notifications)[laravel/ai

The official AI SDK for Laravel.

9782.1M153](/packages/laravel-ai)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
