PHPackages                             dynamik-dev/zenpipe-php - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Utility &amp; Helpers](/categories/utility)
4. /
5. dynamik-dev/zenpipe-php

ActiveLibrary[Utility &amp; Helpers](/categories/utility)

dynamik-dev/zenpipe-php
=======================

A simple, fluent pipeline implementation for PHP.

v0.2.0(5mo ago)741MITPHPCI passing

Since Oct 24Pushed 5mo agoCompare

[ Source](https://github.com/dynamik-dev/zenpipe-php)[ Packagist](https://packagist.org/packages/dynamik-dev/zenpipe-php)[ RSS](/packages/dynamik-dev-zenpipe-php/feed)WikiDiscussions main Synced 1mo ago

READMEChangelog (6)Dependencies (6)Versions (11)Used By (0)

ZenPipe
=======

[](#zenpipe)

 [ ![GitHub Actions](https://github.com/dynamik-dev/zenpipe-php/actions/workflows/CI.yml/badge.svg) ](https://github.com/dynamik-dev/zenpipe-php/actions/workflows/CI.yml) [ ![BuyMeACoffee](https://raw.githubusercontent.com/pachadotdev/buymeacoffee-badges/main/bmc-yellow.svg) ](https://buymeacoffee.com/chrisarter)

 [![ZenPipe Logo](./zenpipe.png)](./zenpipe.png)

ZenPipe is a simple and flexible PHP pipeline library that allows you to chain operations together to process, transform, or act on input.

```
$calculator = zenpipe()
   ->pipe(fn($price, $next) => $next($price * 0.8)) // 20% discount
   ->pipe(fn($price, $next) => $next($price * 1.1)); // add 10% tax

$calculator(100); // $88 (100 -> 80 -> 88)
```

You can also run the pipeline on demand:

```
zenpipe(100)
   ->pipe(fn($price, $next) => $next($price * 0.8)) // 20% discount
   ->pipe(fn($price, $next) => $next($price * 1.1)) // add 10% tax
   ->process(); // 88
```

Sections
--------

[](#sections)

1. [ZenPipe](#zenpipe)
2. [Requirements](#requirements)
3. [Installation](#installation)
4. [Usage](#usage)
    - [Pipeline Operations](#pipeline-operations)
    - [Class Methods as Operations](#class-methods-as-operations)
    - [Context Passing](#context-passing)
    - [Exception Handling](#exception-handling)
    - [PSR-15 Middleware](#psr-15-middleware)
    - [More Examples](#more-examples)
5. [API Reference](#api-reference)
6. [Contributing](#contributing)
7. [License](#license)

Requirements
------------

[](#requirements)

- PHP 8.2 or higher

Installation
------------

[](#installation)

```
composer require dynamik-dev/zenpipe-php
```

Usage
-----

[](#usage)

### Pipeline Operations

[](#pipeline-operations)

Pipeline operations are functions that take an input and return a processed value. Each operation can receive up to four parameters:

- `$input`: The value being processed
- `$next`: A callback to pass the value to the next operation
- `$return`: (Optional) A callback to exit the pipeline early with a value
- `$context`: (Optional) A shared context object passed to all operations

#### Basic Operation Example

[](#basic-operation-example)

Let's build an input sanitization pipeline:

```
// String sanitization pipeline
$sanitizer = zenpipe()
    ->pipe(fn($input, $next) => $next(trim($input)))
    ->pipe(fn($input, $next) => $next(preg_replace('/\s+/', ' ', $input)))
    ->pipe(fn($input, $next) => $next(strip_tags($input)))
    ->pipe(fn($input, $next) => $next(htmlspecialchars($input)))
    ->pipe(fn($input, $next) => $next(mb_convert_encoding(
        $input, 'UTF-8', mb_detect_encoding($input)
    )));

// Usage examples:
$dirtyInput = "  alert('xss')  Hello   World! ¥€$ ";
$cleanInput = $sanitizer($dirtyInput);
// Output: "Hello World! ¥€$"

// Can also be run on demand:
$result = zenpipe($dirtyInput)
    ->pipe(fn($input, $next) => $next(trim($input)))
    ->pipe(fn($input, $next) => $next(strip_tags($input)))
    ->process();
```

#### Operation with Early Return

[](#operation-with-early-return)

Below is a practical example of a content moderation pipeline with early returns:

```
// Content moderation pipeline with early returns
$moderationPipeline = zenpipe()
    ->pipe(function($content, $next, $return) {
        // Skip moderation for trusted authors
        if (Auth::user()->isTrusted()) {
            return $return([
                'status' => 'approved',
                'content' => $content,
                'skipped' => true
            ]);
        }
        return $next($content);
    })
    ->pipe(function($content, $next, $return) {
        // Quick check for banned words
        if (containsBannedWords($content)) {
            return $return([
                'status' => 'rejected',
                'reason' => 'prohibited_content'
            ]);
        }
        return $next($content);
    })
    ->pipe(function($content, $next) {
        // Send to AI moderation for nuanced analysis
        return $next(
            AI::moderate($content)
        );
    });

// Usage:
$result = $moderationPipeline("Hello, world!");
// Trusted user: Immediately returns approved
// Regular user: Goes through full moderation
```

### Class Methods as Operations

[](#class-methods-as-operations)

You can also use class methods as operations, with the same parameter options:

```
class MyClass
{
    public function validate($input, $next, $return)
    {
        if (empty($input)) {
            return $return('Input cannot be empty');
        }
        return $next(strtoupper($input));
    }
}

$pipeline = zenpipe()
   ->pipe([MyClass::class, 'validate']);
```

You can also pass an array of operations:

```
$pipeline = zenpipe()
   ->pipe([
        fn($input, $next) => $next(strtoupper($input)),
        [MyClass::class, 'validate']
    ]);
```

### Context Passing

[](#context-passing)

You can pass a shared context object to all operations using `withContext()`. This is useful for sharing state, configuration, or dependencies across the pipeline without threading them through the value.

```
// Use any object as context - your own DTO, stdClass, or array
class RequestContext
{
    public function __construct(
        public string $userId,
        public array $permissions,
        public array $logs = []
    ) {}
}

$context = new RequestContext(
    userId: 'user-123',
    permissions: ['read', 'write']
);

$result = zenpipe(['action' => 'update', 'data' => [...]])
    ->withContext($context)
    ->pipe(function ($request, $next, $return, RequestContext $ctx) {
        if (!in_array('write', $ctx->permissions)) {
            return $return(['error' => 'Unauthorized']);
        }
        $ctx->logs[] = "Permission check passed for {$ctx->userId}";
        return $next($request);
    })
    ->pipe(function ($request, $next, $return, RequestContext $ctx) {
        $ctx->logs[] = "Processing {$request['action']}";
        return $next([...$request, 'processed_by' => $ctx->userId]);
    })
    ->process();

// Context is mutable - logs are accumulated across operations
// $context->logs = ['Permission check passed for user-123', 'Processing update']
```

Type hint your context parameter in the operation signature for IDE support:

```
/** @var ZenPipe */
$pipeline = zenpipe()
    ->withContext(new RequestContext(...))
    ->pipe(fn($value, $next, $return, RequestContext $ctx) => ...);
```

### Exception Handling

[](#exception-handling)

Use `catch()` to handle exceptions gracefully without breaking the pipeline:

```
$result = zenpipe($userData)
    ->pipe(fn($data, $next) => $next(validateInput($data)))
    ->pipe(fn($data, $next) => $next(processPayment($data))) // might throw
    ->pipe(fn($data, $next) => $next(sendConfirmation($data)))
    ->catch(fn(Throwable $e, $originalValue) => [
        'error' => $e->getMessage(),
        'input' => $originalValue,
    ])
    ->process();
```

The catch handler receives:

- `$e`: The thrown exception (`Throwable`)
- `$value`: The original input value passed to `process()`
- `$context`: The context set via `withContext()` (null if not set)

If no catch handler is set, exceptions propagate normally.

### PSR-15 Middleware

[](#psr-15-middleware)

ZenPipe provides bidirectional PSR-15 middleware support. Requires `psr/http-server-middleware`.

#### Using PSR-15 Middleware in a Pipeline

[](#using-psr-15-middleware-in-a-pipeline)

Pass any `MiddlewareInterface` directly to `pipe()`:

```
$response = zenpipe($request)
    ->pipe(new CorsMiddleware())
    ->pipe(new AuthMiddleware())
    ->pipe(fn($req, $next, $return) => $return(new Response(200)))
    ->process();
```

When using PSR-15 middleware, the pipeline must return a `ResponseInterface`.

#### Using ZenPipe as PSR-15 Middleware

[](#using-zenpipe-as-psr-15-middleware)

Wrap a pipeline with `asMiddleware()` for use in PSR-15 frameworks:

```
$pipeline = zenpipe()
    ->pipe(fn($req, $next) => $next($req->withAttribute('processed', true)));

$app->middleware($pipeline->asMiddleware());
```

**Behavior:**

- If the pipeline returns a `ResponseInterface`, it's returned directly
- If the pipeline returns a `ServerRequestInterface`, it's passed to the next handler
- The PSR-15 handler is available via `$context->handler` for explicit delegation

```
$authPipeline = zenpipe()
    ->pipe(function ($req, $next, $return, $ctx) {
        if (!$req->hasHeader('Authorization')) {
            return $return(new Response(401));
        }
        return $ctx->handler->handle($req);
    });

$app->middleware($authPipeline->asMiddleware());
```

### More Examples

[](#more-examples)

#### RAG Processes

[](#rag-processes)

This pipeline can be used for RAG processes, where the output of one model is used as input for another.

```
$ragPipeline = zenpipe()
    ->pipe(fn($query, $next) => $next([
        'query' => $query,
        'embeddings' => OpenAI::embeddings()->create([
            'model' => 'text-embedding-3-small',
            'input' => $query
        ])->embeddings[0]->embedding
    ]))
    ->pipe(fn($data, $next) => $next([
        ...$data,
        'context' => Qdrant::collection('knowledge-base')
            ->search($data['embeddings'], limit: 3)
            ->map(fn($doc) => $doc->content)
            ->join("\n")
    ]))
    ->pipe(fn($data, $next) => $next(
        OpenAI::chat()->create([
            'model' => 'gpt-4-turbo-preview',
            'messages' => [
                [
                    'role' => 'system',
                    'content' => 'Answer using the provided context only.'
                ],
                [
                    'role' => 'user',
                    'content' => "Context: {$data['context']}\n\nQuery: {$data['query']}"
                ]
            ]
        ])->choices[0]->message->content
    ));

$answer = $ragPipeline("What's our refund policy?");
```

#### Email Validation with Early Return

[](#email-validation-with-early-return)

This pipeline demonstrates early returns for email validation:

```
$emailValidationPipeline = zenpipe()
    ->pipe(function($input, $next, $return) {
        if (!is_string($input)) {
            return $return('Input must be a string');
        }
        return $next(filter_var($input, FILTER_VALIDATE_EMAIL));
    })
    ->pipe(function($email, $next, $return) {
        if (!$email) {
            return $return('Invalid email format');
        }

        $domain = substr(strrchr($email, "@"), 1);
        $mxhosts = [];

        if (!getmxrr($domain, $mxhosts)) {
            return $return('Domain has no valid mail servers');
        }

        return $next(true);
    });

$result = $emailValidationPipeline('example@example.com');
// Returns: 'Domain has no valid mail servers'

$result = $emailValidationPipeline('invalid-email');
// Returns: 'Invalid email format'
```

API Reference
-------------

[](#api-reference)

See [API Reference](docs/API.md) for details.

Contributing
------------

[](#contributing)

See [CONTRIBUTING.md](CONTRIBUTING.md) for details.

License
-------

[](#license)

The MIT License (MIT). See [LICENSE](LICENSE) for details.

###  Health Score

32

—

LowBetter than 72% of packages

Maintenance70

Regular maintenance activity

Popularity10

Limited adoption so far

Community9

Small or concentrated contributor base

Maturity35

Early-stage or recently created project

 Bus Factor1

Top contributor holds 96% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~81 days

Recently: every ~101 days

Total

6

Last Release

167d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/990cfbdeab8ba6fd8c12d682db3e9723e22350bb8b377d91d692927f3e195241?d=identicon)[christopherarter](/maintainers/christopherarter)

---

Top Contributors

[![christopherarter](https://avatars.githubusercontent.com/u/12786153?v=4)](https://github.com/christopherarter "christopherarter (24 commits)")[![raphaelstolt](https://avatars.githubusercontent.com/u/48225?v=4)](https://github.com/raphaelstolt "raphaelstolt (1 commits)")

---

Tags

php

###  Code Quality

TestsPest

Static AnalysisPHPStan

Code StyleLaravel Pint

Type Coverage Yes

### Embed Badge

![Health badge](/badges/dynamik-dev-zenpipe-php/health.svg)

```
[![Health](https://phpackages.com/badges/dynamik-dev-zenpipe-php/health.svg)](https://phpackages.com/packages/dynamik-dev-zenpipe-php)
```

###  Alternatives

[crankycyclops/m2-module-discount-code-url

Allows discount codes to be applied to a browser session automatically via a query string or URL path.

4112.3k](/packages/crankycyclops-m2-module-discount-code-url)[fritzmg/contao-news-related

Simple Contao 4+ bundle for setting related news directly.

113.8k](/packages/fritzmg-contao-news-related)[terabin/flarum-ext-sitemap

Generate a Sitemap for Flarum automatically

103.0k1](/packages/terabin-flarum-ext-sitemap)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
