PHPackages                             dbrans/stream-pipeline - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. dbrans/stream-pipeline

ActiveLibrary

dbrans/stream-pipeline
======================

A Stream based pipeline pattern implementation

2.5.1(2mo ago)3613↓100%MITPHPPHP &gt;=7.4CI passing

Since Feb 9Pushed 2mo ago1 watchersCompare

[ Source](https://github.com/danibranas/stream-pipeline-php)[ Packagist](https://packagist.org/packages/dbrans/stream-pipeline)[ RSS](/packages/dbrans-stream-pipeline/feed)WikiDiscussions master Synced 1mo ago

READMEChangelog (10)Dependencies (1)Versions (12)Used By (0)

[![Build Status](https://github.com/danibranas/stream-pipeline-php/workflows/Build/badge.svg)](https://github.com/danibranas/stream-pipeline-php/workflows/Build/badge.svg)

Stream Pipeline
===============

[](#stream-pipeline)

What is it?
-----------

[](#what-is-it)

A Stream based pipeline pattern implementation.

The Pipeline pattern uses ordered stages to process a sequence of input values. Each implemented task is represented by a stage of the pipeline. You can think of pipelines as similar to assembly lines in a factory, where each item in the assembly line is constructed in stages.

Features
--------

[](#features)

- **Fluent API / Chaining**
    Chain methods like `map`, `filter`, `reduce` in a readable, functional style.
- **Lazy evaluation**
    The pipeline **does not execute any operations until a terminal method** like `collect()` or `forEach()` is called.
    Each element is processed **one at a time through the entire chain**, minimizing memory usage and allowing infinite or very large data streams.
- **Lightweight and simple**
    No dependencies and easy to integrate into any project.
- **Common functional operations included**
    `map`, `filter`, `reduce`, `flatMap`, `distinct`, `limit`, `skip`...
- **Generics support via docblocks**
    Compatible with Psalm/PHPStan for static type checking.
- **Immutable style**
    Each operation returns a new pipeline instance, avoiding side effects.
- **Works with any iterable**
    Supports arrays, generators, or any PHP iterable.
- **Explicit terminal operations**
    Methods like `collect()` and `forEach()` allow controlled consumption of data.

How it works
------------

[](#how-it-works)

Stream pipeline allows you to go from writing expressions like:

```
// Old way
$input = [' B1 ', ' B2', 'a1 ', ' a2 ', 'a3', ' b1', ' b2', 'b3'];
$elements = [];

foreach ($input as $e) {
    $elem = strtoupper(trim($e));

    if (substr($e, 0, 1) === 'B' && !in_array($elem, $elements)) {
        $elements[] = $elem;
    }
}

return implode(',', $elements);
```

to writing functional expressions like:

```
return Stream::of(' B1 ', ' B2', 'a1 ', ' a2 ', 'a3', ' b1', ' b2', 'b3')
    ->map(Strings::trim())
    ->map(Strings::toUpper())
    ->filter(Strings::startsWith('B'))
    ->distinct()
    ->collect(Collectors::join(','));
```

Getting started
---------------

[](#getting-started)

The library is available as a Composer package on Packagist.org.

To install in your project, just run:

```
composer require dbrans/stream-pipeline
```

After this runs successfully, you can include the main class in your application logic:

```
use StreamPipeline\Stream;
```

Usage
-----

[](#usage)

You can initialize an *Stream* to use it:

```
use StreamPipeline\Stream;

$arrStream = Stream::fromIterable($myArray);
// or
$arrStream = Stream::of(' B1 ', ' B2', 'a1 ', ' a2 ', 'a3', ' b1', ' b2', 'b3')
```

A *Stream* object exposes several methods to operate with its elements:

```
use StreamPipeline\Collectors;
use StreamPipeline\Iterators\NumberGenerator;
use StreamPipeline\Operations\Strings;

// ...

$arrStream
    ->map(Strings::trim())
    ->map(Strings::toUpper())
    ->filter(Strings::startsWith('B'))
    ->distinct()
    ->forEach(function ($e) {
        echo $e;
    });
```

The *Stream* class is immutable, so each chaining method returns a new *Stream*.

The execution of a *Stream* is lazy, so the elements are iterated just one time only when a terminal operation (*forEach*, *reduce*, *toArray*, *collect*...) is called.

### Pipe operations

[](#pipe-operations)

Each method allows a callable argument:

```
$arrStream
    ->filter(function ($e) {
        return $e % 2 === 0;
    })
    ->map(function ($e) {
        return $e + 10;
    })
    ->toArray();
```

The library exposes some common operations to better readability:

```
$arrStream
    ->filter(Numbers::isEven())
    ->map(Numbers::plus(10))
    ->collect(Collectors::sum());
```

Please see the Javadoc for more information.

### *Stream* Methods

[](#stream-methods)

**Initialization static operations**:

- `of(...$elements): StreamInterface`
- `fromIterable(iterable $collection): StreamInterface`
- `iterate($initialValue, callable $stepOperation): StreamInterface`

**Pipe operations**:

- `map(callable $operation): StreamInterface`
- `filter(callable $operation): StreamInterface`
- `peek(callable $operation): StreamInterface`
- `tap(callable $operation): StreamInterface` (alias of `peek`).
- `limit(int $limit): StreamInterface`
- `skip(int $number): StreamInterface`
- `distinct(?callable $distinctBy = null): StreamInterface`
- `flatMap(?callable $operation = null): StreamInterface`
- `concat(iterable $elements): StreamInterface`
- `takeWhile(callable $operation): StreamInterface`
- `dropWhile(callable $operation): StreamInterface`

**Terminal operations**:

- `findFirst()`
- `count(): int`
- `forEach(callable $callback): void`
- `anyMatch(callable $condition): bool`
- `allMatch(callable $condition): bool`
- `noneMatch(callable $condition): bool`
- `reduce(callable $operation, $initialValue)`
- `toArray(bool $preserveKeys = false): array`
- `collect(?callable $collector)`

All callable functions receive: `function ($currentElement, $index, $originalIndex)` as arguments. Example:

```
$arrStream
    ->map(function ($elem, $i, $index) {
        return ...
    })
    ->toArray();
```

### Pre-defined Collectors

[](#pre-defined-collectors)

There are pre-defined collector functions with some common operations. You can use them with the terminal operator `collect()`:

- `Collectors::join(string $delimiter = '')`
- `Collectors::sum(?callable $mapper = null)`
- `Collectors::groupBy(?callable $classifier = null, ?callable $mapper = null)`
- `Collectors::groupAndReduceBy(?callable $keysMapper = null, ?callable $valuesMapper = null, ?callable $reducer = null)`

For example:

```
Stream::of('a', 'b', 'c', 'd', 'e', 'f')
    ->limit(5)
    ->collect(Collectors::join(','));
```

### Iterator classes

[](#iterator-classes)

- `NumberGenerator`: a number generator with an optional step.

Example:

```
Stream::iterate(1, new NumberGenerator(1))
    ->filter(Numbers::isEven())
    ->skip(5)
    ->limit(11);
```

### Operation classes

[](#operation-classes)

- `Logical`: logical operations (such as identity, true, false...).
- `Numbers`: numbers operations.
- `Objects`: generic objects operations.
- `Strings`: string utils and functions.
- `Values`: polimorphic values operations.

###  Health Score

47

—

FairBetter than 93% of packages

Maintenance90

Actively maintained with recent releases

Popularity18

Limited adoption so far

Community7

Small or concentrated contributor base

Maturity59

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~183 days

Recently: every ~302 days

Total

11

Last Release

78d ago

Major Versions

1.1.2 → 2.0.02022-10-23

PHP version history (2 changes)1.0.0PHP &gt;=7.3

2.0.0PHP &gt;=7.4

### Community

Maintainers

![](https://www.gravatar.com/avatar/f7f8d718a4338b55571dfa2b8b65abd499b9c8790e3b621eced07a63847b20e0?d=identicon)[dbrans](/maintainers/dbrans)

---

Top Contributors

[![danibranas](https://avatars.githubusercontent.com/u/10075148?v=4)](https://github.com/danibranas "danibranas (41 commits)")

---

Tags

functional-programminglibrarypackagistphppipelinesstream

###  Code Quality

TestsPHPUnit

### Embed Badge

![Health badge](/badges/dbrans-stream-pipeline/health.svg)

```
[![Health](https://phpackages.com/badges/dbrans-stream-pipeline/health.svg)](https://phpackages.com/packages/dbrans-stream-pipeline)
```

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
