PHPackages                             wizcodepl/laravel-pipe - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Utility &amp; Helpers](/categories/utility)
4. /
5. wizcodepl/laravel-pipe

ActiveLibrary[Utility &amp; Helpers](/categories/utility)

wizcodepl/laravel-pipe
======================

Stage-based pipeline framework for batch ETL of record streams in Laravel — drivers, resolvers, per-stage stats.

0.2.0(1mo ago)066↓100%MITPHPPHP ^8.2CI passing

Since Apr 30Pushed 1mo agoCompare

[ Source](https://github.com/wizcodepl/laravel-pipe)[ Packagist](https://packagist.org/packages/wizcodepl/laravel-pipe)[ Docs](https://github.com/wizcodepl/laravel-pipe)[ RSS](/packages/wizcodepl-laravel-pipe/feed)WikiDiscussions main Synced 1w ago

READMEChangelog (1)Dependencies (6)Versions (3)Used By (0)

 [![Laravel Pipe](art/logo.svg)](art/logo.svg)

laravel-pipe
============

[](#laravel-pipe)

[![Tests](https://github.com/wizcodepl/laravel-pipe/actions/workflows/tests.yml/badge.svg)](https://github.com/wizcodepl/laravel-pipe/actions/workflows/tests.yml)[![License: MIT](https://camo.githubusercontent.com/fdf2982b9f5d7489dcf44570e714e3a15fce6253e0cc6b5aa61a075aac2ff71b/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f4c6963656e73652d4d49542d79656c6c6f772e737667)](https://opensource.org/licenses/MIT)[![PHP Version](https://camo.githubusercontent.com/962aced9b09d89716dbebf186ff899754a096ff1068b6b7988675c2d9fab9331/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f7068702d253545382e322d626c75652e737667)](https://php.net)

> Stage-based pipeline framework for batch ETL of record streams in Laravel — drivers, resolvers, per-stage stats.

If you have multiple product suppliers...
-----------------------------------------

[](#if-you-have-multiple-product-suppliers)

Each one ships their catalog in a different shape. Supplier A serves a JSON REST API with translations under `name.{locale}`. Supplier B emits an XML feed with parameters embedded as `` HTML inside the description. Supplier C hands you a CSV with questionable encoding. Supplier D needs OAuth + an MD5 hash signed with a timestamp to mint a token that expires every hour.

**Every supplier has "color"** — but A returns it structurally (`features.color = "Black"`), B has it inside the product name (`"Acme Slim Tee L Black"`), C buries it inside HTML (`colorOnyx Mist`), and D doesn't return color at all (you have to parse it from the model name).

**The data is inconsistent.** The same matte-black t-shirt will be:

- supplier A: `"Black"`
- supplier B: `"BLACK"`
- supplier C: `"Jet"` (single-word marketing color)
- supplier D: `"Onyx Black"` (manufacturer marketing color you want to collapse into your canonical `"Black"`)

**Some data is just missing.** One supplier exposes `material` in the spec table. Another only ships a model name (`"Merino Crew Neck"` → you infer wool). A third returns nothing — you have to validate that the product fails the ProductType requirements and skip it.

**The APIs are fragile** — random 5xx responses, image-hosting timeouts, and new product categories you don't know about yet. Your sync either silently drops records or blows up mid-run leaving half-written state in the database.

---

This package is for that problem.

What you get
------------

[](#what-you-get)

```
                ┌─ Driver ──────────── pulls supplier data
                │  loadItems()        (HTTP, file, queue — generator with lazy fetch)
                │
                ▼
         PipelineResolver ──────────── picks a pipeline per record
                │  resolve()           (returning null = skip + unrouted counter)
                │
                ▼
            Pipeline ─────────────── runs stages in declared order
                │  stages()            (list of class names)
                │
                ▼
             Stages ───────────────── per-stage transformation
                │  static run(Item)   (each reads from Item, writes to Item)
                │  - $item->fail($msg, $context) when a stage can't do its job
                │  - throw / fail → caught, logged, remaining stages for THIS
                │    record skip; the next record starts clean
                │
                ▼
          PipelineRun ────────────── per-stage stats
                                       entered / succeeded / failed / duration
                                       + processed / unrouted total

```

**Core principles:**

- The driver is lazy — `loadItems()` is a generator, `--limit=10` stops fetching after 10 records.
- One Item per record — each record gets its own mutable bag, stages can't step on each other.
- Stage fails? The remaining stages for THAT record are skipped, the next record starts clean.
- PipelineRun tells you EXACTLY where the problem is: `ValidateRequired Failed=300` → 300 records dropped because `material` was missing. `StageFailureException::context` is forwarded to `onStageError` so you log the exact SKU + the missing fields.

Quick start
-----------

[](#quick-start)

```
composer require wizcodepl/laravel-pipe
```

Optionally publish the config file (only needed if you want to register drivers in `config/laravel-pipe.php` instead of using the package defaults):

```
php artisan vendor:publish --tag=laravel-pipe-config
```

Skeleton generator for a new supplier:

```
php artisan make:pipe Acme --pipeline=Apparel
```

This creates:

```
app/Pipes/Acme/
├── AcmeDriver.php                ← loadItems() with TODO
├── AcmePipelineResolver.php      ← resolve() with TODO
└── ApparelPipeline/
    └── ApparelPipeline.php       ← stages() with TODO

```

Fill in the three TODOs, drop your stage classes into `ApparelPipeline/Stages/`, and register the driver in `config/laravel-pipe.php`:

```
'drivers' => [
    'acme' => App\Pipes\Acme\AcmeDriver::class,
],
```

Run:

```
php artisan pipe:run acme --limit=10
```

Output:

```
Processed: 10  •  Unrouted: 0  •  Duration: 4.21s

+----------------------------------------+---------+-----------+--------+----------+
| Stage                                  | Entered | Succeeded | Failed | Duration |
+----------------------------------------+---------+-----------+--------+----------+
| Acme\Stages\MapBaseData                | 10      | 10        | 0      | 0.012s   |
| Acme\Stages\NormalizeAttributes        | 10      | 10        | 0      | 0.034s   |
| Acme\Stages\ResolveColor               | 10      | 10        | 0      | 0.008s   |
| Acme\Stages\ValidateRequired           | 10      | 7         | 3      | 0.021s   |  ← 3 dropped
| Acme\Stages\PersistProduct             | 7       | 7         | 0      | 0.842s   |
| Acme\Stages\PersistDescription         | 7       | 7         | 0      | 0.045s   |
| Acme\Stages\SyncImages                 | 7       | 7         | 0      | 1.234s   |
+----------------------------------------+---------+-----------+--------+----------+

```

Seven products landed in the database. Three failed at ValidateRequired — your `sync_quality` log channel will tell you per-SKU what was missing thanks to `StageFailureException::context`.

A stage is a tiny class
-----------------------

[](#a-stage-is-a-tiny-class)

```
class MapBaseData
{
    public static function run(Item $item): void
    {
        $raw = $item->get('rawData');

        $item->set('baseData', [
            'sku' => $raw['Sku'],
            'price' => $raw['Price'],
            'brand' => $raw['Brand'],
        ]);
    }
}
```

Stages typically come in three shapes inside an apparel pipeline:

```
ResolveProductType.php   → item.productType  (e.g. shirts, jackets, dresses)
MapBaseData.php          → item.baseData     (sku, price, brand, weight)
NormalizeAttributes.php  → item.attributes   (raw spec from supplier)
ResolveColor.php         → item.attributes.color           (canonical enum label)
ResolveSize.php          → item.attributes.size            (XS/S/M/L/XL/XXL)
ResolveMaterial.php      → item.attributes.material        (Cotton/Wool/Polyester)
ResolvePattern.php       → item.attributes.pattern         (Solid/Striped/Floral)
ValidateRequired.php     → item.fail() if required attrs missing
PersistProduct.php       → item.product, item.variant
PersistColorOption.php   → ProductOption attach + value sync
PersistSizeOption.php    → ProductOption attach + value sync
PersistDescription.php   → write description to DB
SyncImages.php           → download + attach media

```

Item — the mutable bag
----------------------

[](#item--the-mutable-bag)

Every record gets its own `Item`. Stages read and write through it:

```
$item = new Item(['rawData' => $apiRecord]);

$item->get('rawData');                  // read (with optional default)
$item->set('baseData', [...]);          // write
$item->has('product');                  // boolean (true even when value is null)
$item->fail('Missing brand', [          // throws StageFailureException with context
    'sku' => $sku,
]);
```

The driver injects `PipelineRun` under the reserved key `_run`. Don't touch that from inside stages — the package owns it.

Failing a stage
---------------

[](#failing-a-stage)

`Item::fail($message, $context = [])` throws a `StageFailureException`. The abstract pipeline catches it, increments the stage's `failed` counter, calls `onStageError($stage, $exception, $item)`, and stops the chain for that record.

```
class ResolveColor
{
    public static function run(Item $item): void
    {
        $attrs = $item->get('attributes', []);
        $manufacturer = (string) ($attrs['manufacturer_color'] ?? '');
        $color = self::match($manufacturer);

        if ($color === null) {
            $item->fail('Could not resolve color', [
                'manufacturer_color' => $manufacturer,
            ]);
        }

        $attrs['color'] = $color->label();
        $item->set('attributes', $attrs);
    }
}
```

Override `onStageError` in your concrete pipeline to wire it to your logger:

```
class ApparelPipeline extends AbstractPipeline
{
    protected function onStageError(string $stage, \Throwable $e, Item $item): void
    {
        Log::channel('sync_quality')->error('acme | stage failed', [
            'stage' => $stage,
            'sku' => ($item->get('baseData') ?? [])['sku'] ?? null,
            'message' => $e->getMessage(),
            ...($e instanceof StageFailureException ? $e->context : []),
        ]);
    }
}
```

The `...$e->context` spread merges per-stage diagnostics (manufacturer values, the unparseable raw string, the missing field list, etc.) into the log record.

Inconsistent data — Resolve\* stages
------------------------------------

[](#inconsistent-data--resolve-stages)

The classic problem: two suppliers ship different strings for the same color.

```
Supplier A:  features.color       = "Black"
Supplier B:  features.color       = "BLACK"
Supplier C:  manufacturer_color   = "Onyx Black"     (from spec table)
Supplier D:  product.name         = "Crew Tee Black" (only inside the name)

```

Solution: a `ResolveColor` stage that maps every variant to the canonical `Color::Black` enum, with a substring fallback through a 50+ entry dictionary.

For attributes with a stable domain (size, material, pattern) use PHP enums with a `label()` method. For the long-tail of color names, substring matching with locale-specific fallbacks works well.

Validating required data
------------------------

[](#validating-required-data)

A `ValidateRequired` stage typically reads `state.productType`, checks which attributes that ProductType requires, and calls `$item->fail(...)` with the missing field list when anything is absent. The pipeline aborts for that record; the driver moves to the next one.

Per-stage invariant
-------------------

[](#per-stage-invariant)

```
Entered = Succeeded + Failed

```

Every record that entered a stage finishes in exactly one bucket. Failures on `ResolveX` mean unparseable input; failures on `Persist*` mean a DB constraint; failures on `SyncImages` mean the CDN was unreachable. The table after each sync tells you which one — no guessing.

How the engine works (in one paragraph)
---------------------------------------

[](#how-the-engine-works-in-one-paragraph)

`AbstractPipeline::run()` iterates `stages()` in order, wraps each in a `try/catch (\Throwable)`, and on failure calls `onStageError($stage, $e, $item)` and stops the chain for that record. `AbstractDriver::run(?int $limit)` iterates `loadItems()` (a generator — `--limit=10` actually stops the HTTP fetch after 10 records), routes each Item through the resolver, runs the chosen pipeline, and returns a `PipelineRun` with stats.

Commands
--------

[](#commands)

```
php artisan make:pipe {Name} [--pipeline=Main] [--force]
```

Generator: scaffolds three files in `app/Pipes/{Name}/`.

```
php artisan pipe:run {driver} [--limit=N]
```

Run a sync. `{driver}` is the key from `config/laravel-pipe.php`. Output: per-stage stats table + Processed/Unrouted/Duration headline.

Failure records
---------------

[](#failure-records)

Every stage failure is captured on `PipelineRun` (stage, message, context from `Item::fail()`). Read them with `$run->getFailures()` and wire them into whatever you already use — logs, alerts, audit tables.

To enrich each record (e.g. attach the SKU from `item.baseData`), override `AbstractPipeline::buildFailureRecord()` in your concrete pipeline:

```
protected function buildFailureRecord(string $stage, \Throwable $e, Item $item): array
{
    return [
        ...parent::buildFailureRecord($stage, $e, $item),
        'sku' => ($item->get('baseData') ?? [])['sku'] ?? null,
    ];
}
```

End-to-end test
---------------

[](#end-to-end-test)

Mock your API client or hit the live one — the package doesn't get in the way.

```
#[Group('e2e')]
class AcmeSupplierSyncTest extends TestCase
{
    use RefreshDatabase;

    public function test_sync_creates_products_from_real_api(): void
    {
        $run = app(AcmeDriver::class)->run(limit: 10);

        // Strict: 0 failures means every sample record made it through
        foreach ($run->getStages() as $stage => $stats) {
            $this->assertSame(0, $stats['failed'],
                "Stage {$stage} had {$stats['failed']} failures (entered={$stats['entered']})",
            );
        }

        $this->assertGreaterThan(0, Product::count());

        // Idempotency
        $first = Product::count();
        app(AcmeDriver::class)->run(limit: 10);
        $this->assertSame($first, Product::count());
    }
}
```

Skip from CI when network/credentials are unavailable: `vendor/bin/phpunit --exclude-group=e2e`.

Requirements
------------

[](#requirements)

- PHP 8.2+
- Laravel 11 / 12 (only required for `RunCommand`, `MakePipeCommand` and `PipeServiceProvider` — the artisan layer)

The core (`Item`, `AbstractPipeline`, `AbstractDriver`, `PipelineRun`, `StageFailureException`) is plain PHP 8.2. Laravel is only needed for the optional artisan + service-provider layer.

About Wizcode
-------------

[](#about-wizcode)

[Wizcode](https://wizcode.pl) is an e-commerce agency specialised in [Lunar](https://lunarphp.io). We design and ship B2B, B2C, and marketplace platforms on the Laravel + Lunar stack — from custom checkouts and supplier syncs to multi-channel pricing, PIM workflows, and headless storefronts.

Our open-source contributions to the Lunar ecosystem:

- [wizcodepl/lunar-product-schemas](https://github.com/wizcodepl/lunar-product-schemas) — migration-style schema builder for Lunar product types and attributes.
- [wizcodepl/laravel-pipe](https://github.com/wizcodepl/laravel-pipe) — stage-based pipeline framework for batch ETL of supplier feeds (used in production for catalog ingestion).

Contact us:

License
-------

[](#license)

MIT

###  Health Score

39

—

LowBetter than 84% of packages

Maintenance92

Actively maintained with recent releases

Popularity12

Limited adoption so far

Community6

Small or concentrated contributor base

Maturity37

Early-stage or recently created project

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~0 days

Total

2

Last Release

40d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/855ef6fbfb4716caf57215f39f769dafd5924fa27b83ebea1ed699b946a8c349?d=identicon)[wizku9](/maintainers/wizku9)

---

Top Contributors

[![wizku9](https://avatars.githubusercontent.com/u/136367424?v=4)](https://github.com/wizku9 "wizku9 (14 commits)")

---

Tags

laravelfeedimportdriveretlsyncpipelinenormalizationstage

###  Code Quality

TestsPHPUnit

Static AnalysisPHPStan

Code StyleLaravel Pint

Type Coverage Yes

### Embed Badge

![Health badge](/badges/wizcodepl-laravel-pipe/health.svg)

```
[![Health](https://phpackages.com/badges/wizcodepl-laravel-pipe/health.svg)](https://phpackages.com/packages/wizcodepl-laravel-pipe)
```

###  Alternatives

[laravel/ai

The official AI SDK for Laravel.

9782.1M153](/packages/laravel-ai)[livewire/flux

The official UI component library for Livewire.

9466.8M119](/packages/livewire-flux)[psalm/plugin-laravel

Psalm plugin for Laravel

3325.1M337](/packages/psalm-plugin-laravel)[zidbih/laravel-deadlock

Make temporary Laravel workarounds expire and fail CI when ignored.

954.0k](/packages/zidbih-laravel-deadlock)[interaction-design-foundation/laravel-geoip

Support for multiple Geographical Location services.

19253.0k3](/packages/interaction-design-foundation-laravel-geoip)[tomshaw/electricgrid

A feature-rich Livewire package designed for projects that require dynamic, interactive data tables.

119.2k](/packages/tomshaw-electricgrid)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
