PHPackages                             jacobhyde/laravel-observer-pipeline - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Database &amp; ORM](/categories/database)
4. /
5. jacobhyde/laravel-observer-pipeline

ActiveLibrary[Database &amp; ORM](/categories/database)

jacobhyde/laravel-observer-pipeline
===================================

Explicit, ordered, and testable pipelines for Laravel Eloquent model events.

v1.0.0(4mo ago)01MITPHPPHP ^8.1|^8.2|^8.3|^8.4CI passing

Since Jan 4Pushed 4mo agoCompare

[ Source](https://github.com/jacob-hyde/laravel-observer-pipeline)[ Packagist](https://packagist.org/packages/jacobhyde/laravel-observer-pipeline)[ RSS](/packages/jacobhyde-laravel-observer-pipeline/feed)WikiDiscussions main Synced 1mo ago

READMEChangelogDependencies (5)Versions (2)Used By (0)

Laravel Observer Pipeline
=========================

[](#laravel-observer-pipeline)

**Explicit, ordered, and testable pipelines for Laravel Eloquent model events.**

[![PHP Version](https://camo.githubusercontent.com/7663c9d53dc13cedaf0660a8745a7e77d2dd711257f36aa86ebce12a0600ef42/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f7068702d253345253344382e312d626c75652e737667)](https://www.php.net/) [![Laravel](https://camo.githubusercontent.com/3cefaa9e1a00e393b49e2ecba1383d856ade657666fc9cd9188496de84558ad0/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f6c61726176656c2d25354531302e3025374325354531312e3025374325354531322e302d7265642e737667)](https://laravel.com/) [![License](https://camo.githubusercontent.com/8bb50fd2278f18fc326bf71f6e88ca8f884f72f179d3e555e20ed30157190d0d/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f6c6963656e73652d4d49542d677265656e2e737667)](https://claude.ai/chat/LICENSE.md)

Laravel observers are powerful, but they quickly become hard to reason about when multiple side effects happen on a single model event, execution order matters, some steps should be async, logic becomes scattered across observers and listeners, and testing behavior becomes painful.

**Laravel Observer Pipeline** solves this by introducing **first-class pipelines** for model events.

Table of Contents
-----------------

[](#table-of-contents)

- [Features](https://claude.ai/chat/bcad2c00-bc9d-465e-97d8-0adb48c9364c#features)
- [Installation](https://claude.ai/chat/bcad2c00-bc9d-465e-97d8-0adb48c9364c#installation)
- [Quick Start](https://claude.ai/chat/bcad2c00-bc9d-465e-97d8-0adb48c9364c#quick-start)
- [Core Concepts](https://claude.ai/chat/bcad2c00-bc9d-465e-97d8-0adb48c9364c#core-concepts)
- [Fluent Builder API](https://claude.ai/chat/bcad2c00-bc9d-465e-97d8-0adb48c9364c#fluent-builder-api)
- [Attribute-Based Pipelines](https://claude.ai/chat/bcad2c00-bc9d-465e-97d8-0adb48c9364c#attribute-based-pipelines)
- [Pipeline Context](https://claude.ai/chat/bcad2c00-bc9d-465e-97d8-0adb48c9364c#pipeline-context)
- [Failure Handling](https://claude.ai/chat/bcad2c00-bc9d-465e-97d8-0adb48c9364c#failure-handling)
- [Async Steps](https://claude.ai/chat/bcad2c00-bc9d-465e-97d8-0adb48c9364c#async-steps)
- [Configuration](https://claude.ai/chat/bcad2c00-bc9d-465e-97d8-0adb48c9364c#configuration)
- [Testing](https://claude.ai/chat/bcad2c00-bc9d-465e-97d8-0adb48c9364c#testing)
- [Artisan Commands](https://claude.ai/chat/bcad2c00-bc9d-465e-97d8-0adb48c9364c#artisan-commands)
- [Advanced Usage](https://claude.ai/chat/bcad2c00-bc9d-465e-97d8-0adb48c9364c#advanced-usage)
- [Examples](https://claude.ai/chat/bcad2c00-bc9d-465e-97d8-0adb48c9364c#examples)
- [Troubleshooting](https://claude.ai/chat/bcad2c00-bc9d-465e-97d8-0adb48c9364c#troubleshooting)
- [Design Philosophy](https://claude.ai/chat/bcad2c00-bc9d-465e-97d8-0adb48c9364c#design-philosophy)

Features
--------

[](#features)

- ✅ **Explicit, ordered pipelines** for Eloquent model events (created, updated, saved, deleted, restored)
- ✅ **Two registration methods**: Fluent builder API or PHP 8 attributes
- ✅ **Step-level async execution** with configurable queue, connection, and delay
- ✅ **Shared pipeline context** across all steps for data passing
- ✅ **Flexible failure handling**: Fail-loud (default) or continue-on-failure semantics
- ✅ **Failure handler steps** that run when exceptions occur
- ✅ **Built-in testing utilities** with `ObserverPipeline::fake()` for easy test isolation
- ✅ **Zero magic**: Uses Laravel's native observer system and queue jobs
- ✅ **Attribute discovery** with optional caching for production performance
- ✅ **Conflict resolution** strategies for handling duplicate pipeline registrations

Installation
------------

[](#installation)

Require the package via Composer:

```
composer require jacobhyde/laravel-observer-pipeline
```

### Requirements

[](#requirements)

- PHP 8.1 or higher
- Laravel 10.0, 11.0, or 12.0

The package uses Laravel's package auto-discovery, so the service provider will be automatically registered.

### Publish Configuration (Optional)

[](#publish-configuration-optional)

Publish the configuration file to customize settings:

```
php artisan vendor:publish --tag=observer-pipeline-config
```

This will create `config/observer-pipeline.php` in your application.

Quick Start
-----------

[](#quick-start)

Here's a simple example to get you started:

**Fluent Builder Approach:**

```
use JacobHyde\ObserverPipeline\ObserverPipeline;

ObserverPipeline::model(User::class)
    ->on('created')
    ->pipe([
        SendWelcomeEmail::class,
    ])
    ->register();
```

**Attribute-Based Approach:**

```
use JacobHyde\ObserverPipeline\Attributes\OnModelEvent;
use JacobHyde\ObserverPipeline\Attributes\Pipeline;

#[OnModelEvent(model: User::class, event: 'created')]
#[Pipeline(steps: [SendWelcomeEmail::class])]
final class UserCreatedPipeline {}
```

Both approaches achieve the same result. Choose the one that fits your project's style.

Core Concepts
-------------

[](#core-concepts)

### Pipeline Steps

[](#pipeline-steps)

Each step in a pipeline is a simple **invokable class** that receives a `PipelineContext`:

```
use JacobHyde\ObserverPipeline\Support\PipelineContext;

final class SendWelcomeEmail
{
    public function __invoke(PipelineContext $ctx): void
    {
        $user = $ctx->model();
        Mail::to($user->email)->send(new WelcomeMail($user));
    }
}
```

Steps are executed **in the exact order** they're defined in the pipeline.

### Supported Model Events

[](#supported-model-events)

The package supports all standard Eloquent model events:

- `created` - Fired when a new model is saved for the first time
- `updated` - Fired when an existing model is updated
- `saved` - Fired when a model is created or updated
- `deleted` - Fired when a model is deleted
- `restored` - Fired when a soft-deleted model is restored (requires `SoftDeletes` trait)

Fluent Builder API
------------------

[](#fluent-builder-api)

The fluent builder provides a chainable interface for defining pipelines.

### Basic Usage

[](#basic-usage)

```
use JacobHyde\ObserverPipeline\ObserverPipeline;

ObserverPipeline::model(User::class)
    ->on('created')
    ->pipe([
        SyncToCrm::class,
        AssignDefaultRole::class,
        SendWelcomeEmail::class,
    ])
    ->register();
```

### Method Reference

[](#method-reference)

#### `ObserverPipeline::model(string $model)`

[](#observerpipelinemodelstring-model)

Start building a pipeline for a specific model class.

```
ObserverPipeline::model(User::class)
```

#### `->on(string $event)`

[](#-onstring-event)

Specify which model event should trigger the pipeline.

```
->on('created')  // or 'updated', 'saved', 'deleted', 'restored'
```

#### `->pipe(array $steps)`

[](#-pipearray-steps)

Define the ordered list of step classes to execute.

```
->pipe([
    StepOne::class,
    StepTwo::class,
    StepThree::class,
])
```

Steps are executed in the exact order provided.

#### `->async(array $stepOptions)`

[](#-asyncarray-stepoptions)

Mark specific steps to be executed asynchronously via Laravel's queue system.

```
->async([
    SendWelcomeEmail::class => ['queue' => 'emails'],
    ProcessAnalytics::class => [
        'queue' => 'analytics',
        'connection' => 'redis',
        'delay' => 60,  // seconds
    ],
])
```

**Available async options:**

- `queue` (string|null) - Queue name (default: null, uses default queue)
- `connection` (string|null) - Queue connection name (default: null)
- `delay` (int|null) - Delay in seconds before processing (default: null)

These options are merged with defaults from your configuration file. See [Async Steps](https://claude.ai/chat/bcad2c00-bc9d-465e-97d8-0adb48c9364c#async-steps) for more details.

#### `->stopOnFailure()`

[](#-stoponfailure)

Stop executing remaining steps when a step throws an exception (default behavior).

```
->stopOnFailure()
```

When a step fails, the exception is immediately re-thrown, and remaining steps are skipped.

#### `->continueOnFailure()`

[](#-continueonfailure)

Continue executing remaining steps even if one step fails.

```
->continueOnFailure()
```

All steps will execute, but the first exception encountered will be re-thrown at the end, ensuring the pipeline still "fails loudly" for error reporting.

#### `->onFailure(array $steps)`

[](#-onfailurearray-steps)

Define handler steps that run when any step in the pipeline throws an exception.

```
->onFailure([
    LogFailure::class,
    NotifyAdmin::class,
])
```

Failure handlers receive the same `PipelineContext` as regular steps, with the exception available via `$ctx->get('_exception')`. Failure handlers run in "best-effort" mode - if they throw exceptions, those are ignored to prevent masking the original failure.

**Note:** `onFailure()` is currently only available via the fluent builder, not in attribute-based pipelines.

#### `->register()`

[](#-register)

Register the pipeline definition. This must be called last to complete the registration.

```
->register()
```

### Complete Example

[](#complete-example)

```
ObserverPipeline::model(Order::class)
    ->on('created')
    ->pipe([
        ValidateOrder::class,
        ChargePayment::class,
        CreateShippingLabel::class,
        SendConfirmationEmail::class,
    ])
    ->async([
        CreateShippingLabel::class => ['queue' => 'shipping'],
        SendConfirmationEmail::class => [
            'queue' => 'emails',
            'delay' => 30,
        ],
    ])
    ->onFailure([
        LogOrderFailure::class,
        RefundPayment::class,
    ])
    ->continueOnFailure()
    ->register();
```

Attribute-Based Pipelines
-------------------------

[](#attribute-based-pipelines)

Define pipelines using PHP 8 attributes for a declarative approach.

### Basic Example

[](#basic-example)

```
use JacobHyde\ObserverPipeline\Attributes\OnModelEvent;
use JacobHyde\ObserverPipeline\Attributes\Pipeline;

#[OnModelEvent(model: User::class, event: 'created')]
#[Pipeline(
    steps: [
        SyncToCrm::class,
        AssignDefaultRole::class,
        SendWelcomeEmail::class,
    ],
    async: [
        SendWelcomeEmail::class => ['queue' => 'emails'],
    ],
    stopOnFailure: true
)]
final class UserCreatedPipeline {}
```

### Attribute Reference

[](#attribute-reference)

#### `#[OnModelEvent]`

[](#onmodelevent)

Specifies which model and event the pipeline handles.

```
#[OnModelEvent(model: User::class, event: 'created')]
```

**Parameters:**

- `model` (string) - Fully qualified class name of the Eloquent model
- `event` (string) - Model event name ('created', 'updated', 'saved', 'deleted', 'restored')

#### `#[Pipeline]`

[](#pipeline)

Defines the pipeline configuration.

```
#[Pipeline(
    steps: array,
    async?: array,
    stopOnFailure?: bool
)]
```

**Parameters:**

- `steps` (array) - Ordered array of step class names
- `async` (array, optional) - Async step configuration (same format as fluent builder)
- `stopOnFailure` (bool, optional) - Whether to stop on failure (default: true)

**Note:** `onFailure` handlers are not yet supported in attribute-based pipelines. Use the fluent builder if you need failure handlers.

### Discovery and Caching

[](#discovery-and-caching)

Attribute-based pipelines are automatically discovered when the application boots. The discovery process:

1. Scans configured paths (default: `app_path('Pipelines')`)
2. Loads classes with both `#[OnModelEvent]` and `#[Pipeline]` attributes
3. Registers them into the pipeline registry

For production performance, enable caching in your configuration:

```
'attributes' => [
    'cache' => true,  // Cache discovered pipelines
],
```

Then run the cache command:

```
php artisan observer-pipeline:cache
```

Clear the cache when you add or modify attribute pipelines:

```
php artisan observer-pipeline:clear
```

Pipeline Context
----------------

[](#pipeline-context)

Every step receives a `PipelineContext` instance that provides access to the model, event, and shared data.

### Context API

[](#context-api)

#### `model(): Model`

[](#model-model)

Get the Eloquent model instance that triggered the pipeline.

```
$user = $ctx->model();
```

#### `event(): string`

[](#event-string)

Get the name of the event that triggered the pipeline.

```
$event = $ctx->event();  // 'created', 'updated', etc.
```

#### `original(): array`

[](#original-array)

Get the original model attributes (before changes).

```
$original = $ctx->original();
// ['id' => 1, 'name' => 'John', 'email' => 'john@example.com']
```

This is particularly useful in `updated` events to compare old vs new values.

#### `changes(): array`

[](#changes-array)

Get only the attributes that changed.

```
$changes = $ctx->changes();
// ['name' => 'Jane'] // Only changed attributes
```

#### `get(string $key, mixed $default = null): mixed`

[](#getstring-key-mixed-default--null-mixed)

Retrieve a value from the shared context meta data.

```
$value = $ctx->get('custom-key');
$value = $ctx->get('custom-key', 'default-value');
```

#### `set(string $key, mixed $value): void`

[](#setstring-key-mixed-value-void)

Store a value in the shared context meta data for use by subsequent steps.

```
$ctx->set('processed-by', 'step-one');
$ctx->set('metadata', ['key' => 'value']);
```

#### `meta(): array`

[](#meta-array)

Get all stored meta data.

```
$allMeta = $ctx->meta();
// ['key1' => 'value1', 'key2' => 'value2']
```

### Data Sharing Between Steps

[](#data-sharing-between-steps)

Steps can share data via the context:

```
final class StepOne
{
    public function __invoke(PipelineContext $ctx): void
    {
        $result = expensive_operation();
        $ctx->set('operation-result', $result);
    }
}

final class StepTwo
{
    public function __invoke(PipelineContext $ctx): void
    {
        $result = $ctx->get('operation-result');
        // Use the result from StepOne
    }
}
```

### Original and Changes Example

[](#original-and-changes-example)

```
final class LogUserUpdate
{
    public function __invoke(PipelineContext $ctx): void
    {
        $user = $ctx->model();
        $original = $ctx->original();
        $changes = $ctx->changes();

        Log::info('User updated', [
            'user_id' => $user->id,
            'old_name' => $original['name'] ?? null,
            'new_name' => $changes['name'] ?? null,
        ]);
    }
}
```

Failure Handling
----------------

[](#failure-handling)

### Default Behavior: Fail Loudly

[](#default-behavior-fail-loudly)

By default, pipelines **fail loudly**:

- If a step throws an exception, execution stops immediately
- Remaining steps are skipped
- The exception is re-thrown so Laravel can handle it (logging, reporting, etc.)

```
ObserverPipeline::model(User::class)
    ->on('created')
    ->pipe([
        StepOne::class,   // Runs
        StepTwo::class,   // Throws exception - execution stops
        StepThree::class, // Never runs
    ])
    ->register();
```

### Continue on Failure

[](#continue-on-failure)

Use `continueOnFailure()` to execute all steps even if some fail:

```
ObserverPipeline::model(User::class)
    ->on('created')
    ->pipe([
        StepOne::class,   // Runs
        StepTwo::class,   // Throws exception - but execution continues
        StepThree::class, // Still runs
    ])
    ->continueOnFailure()
    ->register();
```

**Important:** Even with `continueOnFailure()`, the pipeline still fails loudly. The first exception encountered will be re-thrown after all steps complete, ensuring errors are still reported.

### Failure Handler Steps

[](#failure-handler-steps)

Define steps that run when exceptions occur:

```
ObserverPipeline::model(Order::class)
    ->on('created')
    ->pipe([
        ProcessPayment::class,
        CreateShipping::class,
    ])
    ->onFailure([
        LogFailure::class,
        RefundPayment::class,
        NotifyAdmin::class,
    ])
    ->register();
```

Failure handlers:

- Run when any step in the pipeline throws an exception
- Receive the same `PipelineContext` as regular steps
- Can access the exception via `$ctx->get('_exception')`
- Run in "best-effort" mode - their exceptions are ignored to prevent masking the original failure

**Example Failure Handler:**

```
final class LogFailure
{
    public function __invoke(PipelineContext $ctx): void
    {
        $exception = $ctx->get('_exception');
        $model = $ctx->model();

        Log::error('Pipeline step failed', [
            'model' => $model::class,
            'model_id' => $model->id,
            'event' => $ctx->event(),
            'exception' => $exception->getMessage(),
        ]);
    }
}
```

Async Steps
-----------

[](#async-steps)

Mark steps to run asynchronously via Laravel's queue system.

### Basic Async Configuration

[](#basic-async-configuration)

```
ObserverPipeline::model(User::class)
    ->on('created')
    ->pipe([
        SyncToCrm::class,         // Runs synchronously
        SendWelcomeEmail::class,  // Runs asynchronously
    ])
    ->async([
        SendWelcomeEmail::class => ['queue' => 'emails'],
    ])
    ->register();
```

### Async Options

[](#async-options)

#### Queue Name

[](#queue-name)

Specify which queue to use:

```
->async([
    SendEmail::class => ['queue' => 'emails'],
])
```

#### Queue Connection

[](#queue-connection)

Specify which queue connection to use:

```
->async([
    ProcessData::class => [
        'connection' => 'redis',
        'queue' => 'processing',
    ],
])
```

#### Delay

[](#delay)

Delay execution by a number of seconds:

```
->async([
    SendReminder::class => [
        'queue' => 'emails',
        'delay' => 3600,  // 1 hour
    ],
])
```

#### Combined Options

[](#combined-options)

```
->async([
    SendEmail::class => [
        'queue' => 'emails',
        'connection' => 'redis',
        'delay' => 60,
    ],
])
```

### Config Defaults

[](#config-defaults)

Async options are merged with defaults from your configuration:

```
// config/observer-pipeline.php
'async' => [
    'queue' => 'default',
    'connection' => 'database',
    'delay' => null,
],
```

Step-specific options override config defaults:

```
// Config: queue => 'default', connection => 'database'
->async([
    SendEmail::class => ['queue' => 'emails'],  // Uses 'emails' queue, 'database' connection
])
```

### Job Execution

[](#job-execution)

Async steps are dispatched as `RunPipelineStepJob` queue jobs. The job:

1. Retrieves the model from the database using the stored model ID
2. Recreates the `PipelineContext` with the model and event
3. Restores any meta data that was set in previous steps
4. Executes the step

If the model is deleted before the job runs, the job silently skips execution.

### Testing Async Steps

[](#testing-async-steps)

When using `ObserverPipeline::fake()`, async steps are recorded but not actually queued:

```
$fake = ObserverPipeline::fake();

// ... trigger pipeline ...

$fake->assertStepQueued(SendEmail::class);
```

Configuration
-------------

[](#configuration)

Publish the configuration file to customize behavior:

```
php artisan vendor:publish --tag=observer-pipeline-config
```

### Configuration Reference

[](#configuration-reference)

#### `attributes.enabled`

[](#attributesenabled)

Enable or disable automatic discovery of attribute-based pipelines.

```
'attributes' => [
    'enabled' => true,  // Set to false to disable attribute discovery
],
```

#### `attributes.paths`

[](#attributespaths)

Array of directory paths to scan for pipeline classes with attributes.

```
'attributes' => [
    'paths' => [
        app_path('Pipelines'),
        app_path('Domain/Orders/Pipelines'),
    ],
],
```

**Note:** Currently, only classes in the `App\Pipelines\` namespace are discovered. This is a limitation of the current discovery implementation.

#### `attributes.cache`

[](#attributescache)

Cache discovered pipelines for performance. Should be `true` in production.

```
'attributes' => [
    'cache' => true,  // Cache in production
],
```

When enabled, run `php artisan observer-pipeline:cache` after adding or modifying attribute pipelines.

#### `conflicts`

[](#conflicts)

Strategy for handling duplicate pipeline registrations (same model + event).

**Options:**

- `'throw'` (default) - Throw an exception when duplicates are detected
- `'prefer_fluent'` - Keep fluent builder registrations, ignore attribute registrations
- `'prefer_attributes'` - Keep attribute registrations, ignore fluent builder registrations

```
'conflicts' => 'throw',
```

**Example with `prefer_fluent`:**

```
// First: Attribute pipeline
#[OnModelEvent(model: User::class, event: 'created')]
#[Pipeline(steps: [StepA::class])]
class UserCreatedPipeline {}

// Second: Fluent pipeline (takes precedence)
ObserverPipeline::model(User::class)
    ->on('created')
    ->pipe([StepB::class])
    ->register();
// Result: StepB runs, StepA is ignored
```

#### `defaults.stop_on_failure`

[](#defaultsstop_on_failure)

Default failure behavior for pipelines that don't explicitly specify.

```
'defaults' => [
    'stop_on_failure' => true,  // Default: stop on first failure
],
```

This can be overridden per pipeline using `->stopOnFailure()` or `->continueOnFailure()`.

#### `defaults.on_failure`

[](#defaultson_failure)

Default failure handler steps applied to all pipelines.

```
'defaults' => [
    'on_failure' => [
        GlobalFailureLogger::class,
    ],
],
```

Pipeline-specific failure handlers (via `->onFailure()`) are executed in addition to these defaults.

#### `async.queue`

[](#asyncqueue)

Default queue name for async steps.

```
'async' => [
    'queue' => 'default',  // or null to use Laravel's default
],
```

#### `async.connection`

[](#asyncconnection)

Default queue connection for async steps.

```
'async' => [
    'connection' => 'database',  // or 'redis', 'sqs', etc.
],
```

#### `async.delay`

[](#asyncdelay)

Default delay (in seconds) for async steps.

```
'async' => [
    'delay' => null,  // No delay by default
],
```

#### `reentry.enabled`

[](#reentryenabled)

Enable re-entry protection to prevent pipelines from triggering themselves.

```
'reentry' => [
    'enabled' => true,
],
```

**Note:** This feature may not be fully implemented yet. Check the source code for current status.

#### `reentry.ttl`

[](#reentryttl)

Time-to-live for re-entry protection locks (in seconds).

```
'reentry' => [
    'ttl' => 10,  // Lock expires after 10 seconds
],
```

### Complete Configuration Example

[](#complete-configuration-example)

```
return [
    'attributes' => [
        'enabled' => true,
        'paths' => [
            app_path('Pipelines'),
        ],
        'cache' => env('APP_ENV') === 'production',
    ],

    'conflicts' => 'throw',

    'defaults' => [
        'stop_on_failure' => true,
        'on_failure' => [],
    ],

    'async' => [
        'queue' => null,
        'connection' => null,
        'delay' => null,
    ],

    'reentry' => [
        'enabled' => true,
        'ttl' => 10,
    ],
];
```

Testing
-------

[](#testing)

Observer Pipeline includes built-in testing utilities to make testing pipelines easy and isolated.

### Basic Usage

[](#basic-usage-1)

```
use JacobHyde\ObserverPipeline\ObserverPipeline;

$fake = ObserverPipeline::fake();

// Register and trigger pipelines
ObserverPipeline::model(User::class)
    ->on('created')
    ->pipe([SendEmail::class])
    ->register();

User::factory()->create();

// Assertions
$fake->assertRan(User::class, 'created', [SendEmail::class]);
```

### Testing API

[](#testing-api)

#### `ObserverPipeline::fake(): PipelineFake`

[](#observerpipelinefake-pipelinefake)

Activate fake mode and return the fake instance for assertions.

```
$fake = ObserverPipeline::fake();
```

When faked:

- Steps do **not** execute
- Jobs are **not** dispatched to the queue
- Pipeline execution is **recorded** for assertions

#### `->assertRan(string $model, string $event, array $steps)`

[](#-assertranstring-model-string-event-array-steps)

Assert that a pipeline ran with specific steps in exact order.

```
$fake->assertRan(User::class, 'created', [
    SyncToCrm::class,
    SendEmail::class,
]);
```

#### `->assertStepRan(string $stepClass)`

[](#-assertstepranstring-stepclass)

Assert that a specific step ran (regardless of which pipeline).

```
$fake->assertStepRan(SendEmail::class);
```

#### `->assertStepQueued(string $stepClass)`

[](#-assertstepqueuedstring-stepclass)

Assert that a specific step was queued for async execution.

```
$fake->assertStepQueued(SendEmail::class);
```

### Complete Testing Example

[](#complete-testing-example)

```
use Tests\TestCase;
use JacobHyde\ObserverPipeline\ObserverPipeline;
use App\Models\User;

class UserPipelineTest extends TestCase
{
    public function test_user_created_pipeline_runs(): void
    {
        $fake = ObserverPipeline::fake();

        ObserverPipeline::model(User::class)
            ->on('created')
            ->pipe([
                SyncToCrm::class,
                AssignRole::class,
                SendWelcomeEmail::class,
            ])
            ->async([
                SendWelcomeEmail::class => ['queue' => 'emails'],
            ])
            ->register();

        User::factory()->create();

        $fake->assertRan(User::class, 'created', [
            SyncToCrm::class,
            AssignRole::class,
            SendWelcomeEmail::class,
        ]);

        $fake->assertStepQueued(SendWelcomeEmail::class);
    }

    public function test_pipeline_continues_on_failure(): void
    {
        $fake = ObserverPipeline::fake();

        ObserverPipeline::model(User::class)
            ->on('created')
            ->pipe([
                StepOne::class,
                ThrowingStep::class,
                StepThree::class,
            ])
            ->continueOnFailure()
            ->register();

        try {
            User::factory()->create();
        } catch (\Exception $e) {
            // Expected
        }

        // All steps should have run
        $fake->assertStepRan(StepOne::class);
        $fake->assertStepRan(ThrowingStep::class);
        $fake->assertStepRan(StepThree::class);
    }
}
```

Artisan Commands
----------------

[](#artisan-commands)

### `observer-pipeline:list`

[](#observer-pipelinelist)

List all registered pipelines.

```
php artisan observer-pipeline:list
```

**Example Output:**

```
+------------------+---------+----------------------------+------+-----------------+
| model            | event   | steps                      | async| stop_on_failure |
+------------------+---------+----------------------------+------+-----------------+
| App\Models\User  | created | SyncToCrm, SendEmail       |      | yes             |
| App\Models\Order | created | ProcessPayment, ShipOrder  | ShipOrder | yes       |
+------------------+---------+----------------------------+------+-----------------+

```

### `observer-pipeline:cache`

[](#observer-pipelinecache)

Discover and cache attribute-based pipelines for fast loading.

```
php artisan observer-pipeline:cache
```

**When to use:**

- After adding new attribute-based pipelines
- In your deployment process
- When `attributes.cache` is enabled in config

**Output:**

```
Observer pipeline manifest cached.
Path: /path/to/bootstrap/cache/observer-pipeline.php
Pipelines: 5

```

### `observer-pipeline:clear`

[](#observer-pipelineclear)

Clear the cached pipeline manifest.

```
php artisan observer-pipeline:clear
```

**When to use:**

- During development when modifying attribute pipelines
- When pipelines aren't being discovered
- To force fresh discovery on next request

**Output:**

```
Observer pipeline manifest cache cleared.
Path: /path/to/bootstrap/cache/observer-pipeline.php

```

Advanced Usage
--------------

[](#advanced-usage)

### Multiple Pipelines for Same Model/Event

[](#multiple-pipelines-for-same-modelevent)

You can register multiple pipelines for the same model and event, but you must configure conflict resolution:

```
// config/observer-pipeline.php
'conflicts' => 'prefer_fluent',  // or 'prefer_attributes'
```

With `prefer_fluent`, the last fluent registration wins. With `prefer_attributes`, attribute pipelines take precedence.

### Custom Step Classes with Dependencies

[](#custom-step-classes-with-dependencies)

Steps are resolved from the service container, so you can inject dependencies:

```
final class SyncToCrm
{
    public function __construct(
        private CrmClient $crm,
        private LoggerInterface $logger
    ) {}

    public function __invoke(PipelineContext $ctx): void
    {
        $user = $ctx->model();
        $this->crm->syncUser($user);
        $this->logger->info('User synced to CRM', ['user_id' => $user->id]);
    }
}
```

### Context Data Sharing Patterns

[](#context-data-sharing-patterns)

**Pattern 1: Accumulate Data**

```
final class CollectUserData
{
    public function __invoke(PipelineContext $ctx): void
    {
        $data = $ctx->get('collected-data', []);
        $data[] = 'step-one-result';
        $ctx->set('collected-data', $data);
    }
}

final class ProcessCollectedData
{
    public function __invoke(PipelineContext $ctx): void
    {
        $data = $ctx->get('collected-data', []);
        // Process all collected data
    }
}
```

**Pattern 2: Conditional Execution**

```
final class CheckCondition
{
    public function __invoke(PipelineContext $ctx): void
    {
        if ($someCondition) {
            $ctx->set('should-process', true);
        }
    }
}

final class ConditionalStep
{
    public function __invoke(PipelineContext $ctx): void
    {
        if (!$ctx->get('should-process', false)) {
            return;  // Skip this step
        }
        // Process...
    }
}
```

### Manual Discovery

[](#manual-discovery)

Trigger attribute discovery manually:

```
use JacobHyde\ObserverPipeline\ObserverPipeline;

ObserverPipeline::discover();
```

This uses the configured paths from your config file.

Examples
--------

[](#examples)

### User Registration Pipeline

[](#user-registration-pipeline)

```
ObserverPipeline::model(User::class)
    ->on('created')
    ->pipe([
        ValidateUserData::class,
        CreateUserProfile::class,
        AssignDefaultRole::class,
        SendWelcomeEmail::class,
        TrackRegistration::class,
    ])
    ->async([
        SendWelcomeEmail::class => ['queue' => 'emails'],
        TrackRegistration::class => ['queue' => 'analytics'],
    ])
    ->onFailure([
        LogRegistrationFailure::class,
        CleanupPartialUser::class,
    ])
    ->register();
```

### Order Processing Pipeline

[](#order-processing-pipeline)

```
ObserverPipeline::model(Order::class)
    ->on('created')
    ->pipe([
        ValidateInventory::class,
        ReserveInventory::class,
        ProcessPayment::class,
        CreateShippingLabel::class,
        SendOrderConfirmation::class,
        UpdateInventory::class,
    ])
    ->async([
        CreateShippingLabel::class => [
            'queue' => 'shipping',
            'delay' => 300,  // 5 minutes
        ],
        SendOrderConfirmation::class => ['queue' => 'emails'],
    ])
    ->onFailure([
        ReleaseInventory::class,
        RefundPayment::class,
        NotifyOrderTeam::class,
    ])
    ->continueOnFailure()
    ->register();
```

### Attribute-Based Email Pipeline

[](#attribute-based-email-pipeline)

```
#[OnModelEvent(model: Newsletter::class, event: 'created')]
#[Pipeline(
    steps: [
        ValidateContent::class,
        RenderTemplate::class,
        QueueEmails::class,
    ],
    async: [
        QueueEmails::class => ['queue' => 'emails'],
    ],
    stopOnFailure: true
)]
final class NewsletterCreatedPipeline {}
```

### Multi-Step Data Synchronization

[](#multi-step-data-synchronization)

```
ObserverPipeline::model(Product::class)
    ->on('updated')
    ->pipe([
        SyncToSearchIndex::class,      // Uses context to get changes
        UpdateInventorySystem::class,  // Uses context to get changes
        InvalidateCache::class,        // Uses context to get model
        NotifySubscribers::class,      // Uses context to get changes
    ])
    ->async([
        SyncToSearchIndex::class => ['queue' => 'search'],
        UpdateInventorySystem::class => ['queue' => 'inventory'],
        NotifySubscribers::class => ['queue' => 'notifications'],
    ])
    ->register();

// Step implementation
final class SyncToSearchIndex
{
    public function __invoke(PipelineContext $ctx): void
    {
        $product = $ctx->model();
        $changes = $ctx->changes();

        // Only sync if relevant fields changed
        if (isset($changes['name']) || isset($changes['description'])) {
            SearchIndex::update($product);
        }
    }
}
```

Troubleshooting
---------------

[](#troubleshooting)

### Pipelines Not Running

[](#pipelines-not-running)

**Check 1:** Ensure the pipeline is registered before the model event fires.

```
// In a service provider's boot() method
ObserverPipeline::model(User::class)
    ->on('created')
    ->pipe([...])
    ->register();
```

**Check 2:** Verify the model observer is registered (happens automatically, but check logs).

**Check 3:** For attribute pipelines, ensure discovery is enabled and paths are correct:

```
'attributes' => [
    'enabled' => true,
    'paths' => [app_path('Pipelines')],
],
```

**Check 4:** Clear and rebuild the cache:

```
php artisan observer-pipeline:clear
php artisan observer-pipeline:cache
```

### Steps Not Executing in Order

[](#steps-not-executing-in-order)

Steps execute in the exact order defined in `->pipe()`. If order seems wrong:

1. Check that steps are listed in the correct order
2. Verify no steps are throwing exceptions (which would stop execution)
3. For async steps, remember they execute later, not in sequence

### Async Steps Not Queuing

[](#async-steps-not-queuing)

**Check 1:** Verify queue configuration:

```
->async([
    Step::class => ['queue' => 'emails'],  // Must specify queue
])
```

**Check 2:** Ensure queue worker is running:

```
php artisan queue:work
```

**Check 3:** Check queue connection settings in `config/queue.php`

### Attribute Pipelines Not Discovered

[](#attribute-pipelines-not-discovered)

**Check 1:** Verify class namespace is `App\Pipelines\*` (current limitation)

**Check 2:** Ensure both attributes are present:

```
#[OnModelEvent(...)]  // Required
#[Pipeline(...)]      // Required
```

**Check 3:** Check that the file is in a configured path:

```
'attributes' => [
    'paths' => [app_path('Pipelines')],  // Must match your file location
],
```

### Testing Issues

[](#testing-issues)

**Problem:** Assertions fail even though pipeline should run

**Solution:** Store the fake instance and reuse it:

```
$fake = ObserverPipeline::fake();  // Store this
// ... register and trigger ...
$fake->assertRan(...);  // Use the same instance
```

**Problem:** Steps execute during tests

**Solution:** Ensure `ObserverPipeline::fake()` is called before registering pipelines.

Design Philosophy
-----------------

[](#design-philosophy)

### Explicit Over Implicit

[](#explicit-over-implicit)

Pipelines are explicitly defined - no magic discovery of "listeners" or convention-based registration. You see exactly what runs and when.

### No Workflow Engines

[](#no-workflow-engines)

This is not a workflow engine. It's a simple, ordered list of steps. No DAGs, no complex state machines, no UI builders.

### Laravel-Native

[](#laravel-native)

Uses Laravel's built-in observer system and queue jobs. No custom event dispatchers or job systems.

### Testable

[](#testable)

Built-in faking makes it easy to test pipeline behavior without executing side effects.

### Fail Loudly

[](#fail-loudly)

By default, exceptions are re-thrown immediately. This ensures errors are visible and can be handled by Laravel's error handling system.

### Simple Steps

[](#simple-steps)

Steps are just invokable classes. No interfaces to implement, no base classes to extend. Keep it simple.

---

**Just clean, predictable pipelines for model events.**

###  Health Score

36

—

LowBetter than 82% of packages

Maintenance76

Regular maintenance activity

Popularity1

Limited adoption so far

Community6

Small or concentrated contributor base

Maturity51

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Unknown

Total

1

Last Release

129d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/272afff1264c6afef6d9bdf167122b57418f476a73ee2959bda30c79114cb86d?d=identicon)[jacobhyde](/maintainers/jacobhyde)

---

Top Contributors

[![jacob-hyde](https://avatars.githubusercontent.com/u/37390874?v=4)](https://github.com/jacob-hyde "jacob-hyde (4 commits)")

---

Tags

laraveleventseloquentobserverattributespipeline

###  Code Quality

TestsPHPUnit

### Embed Badge

![Health badge](/badges/jacobhyde-laravel-observer-pipeline/health.svg)

```
[![Health](https://phpackages.com/badges/jacobhyde-laravel-observer-pipeline/health.svg)](https://phpackages.com/packages/jacobhyde-laravel-observer-pipeline)
```

###  Alternatives

[watson/validating

Eloquent model validating trait.

9723.3M47](/packages/watson-validating)[silber/bouncer

Eloquent roles and abilities.

3.6k4.4M25](/packages/silber-bouncer)[dyrynda/laravel-model-uuid

This package allows you to easily work with UUIDs in your Laravel models.

4802.8M8](/packages/dyrynda-laravel-model-uuid)[tucker-eric/eloquentfilter

An Eloquent way to filter Eloquent Models

1.8k4.8M26](/packages/tucker-eric-eloquentfilter)[reedware/laravel-relation-joins

Adds the ability to join on a relationship by name.

2121.2M13](/packages/reedware-laravel-relation-joins)[cybercog/laravel-love

Make Laravel Eloquent models reactable with any type of emotions in a minutes!

1.2k302.7k1](/packages/cybercog-laravel-love)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
