PHPackages                             simsoft/data-flow - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [PDF &amp; Document Generation](/categories/documents)
4. /
5. simsoft/data-flow

ActiveLibrary[PDF &amp; Document Generation](/categories/documents)

simsoft/data-flow
=================

A lightweight, composable ETL (Extract, Transform, Load) pipeline library for PHP with fluent API, spreadsheet support, and flow control.

2.0.3(2w ago)0112MITPHPPHP &gt;=8.3CI passing

Since Nov 27Pushed 2w ago2 watchersCompare

[ Source](https://github.com/sim-soft/data-flow)[ Packagist](https://packagist.org/packages/simsoft/data-flow)[ RSS](/packages/simsoft-data-flow/feed)WikiDiscussions master Synced today

READMEChangelog (10)Dependencies (32)Versions (21)Used By (0)

Simsoft DataFlow
================

[](#simsoft-dataflow)

[![License: MIT](https://camo.githubusercontent.com/784362b26e4b3546254f1893e778ba64616e362bd6ac791991d2c9e880a3a64e/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f4c6963656e73652d4d49542d677265656e2e737667)](LICENSE)[![PHP](https://camo.githubusercontent.com/c6447a0f34a5721f3e61f5e30bd15977d04dc02ecbcd26efad1c3328d9f864f5/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f5048502d253345253344382e332d3838393242462e737667)](https://php.net)[![Docs](https://camo.githubusercontent.com/0d7af7efa18588fd3edfb316c6c441284e6dac440db7465aabe304fa43d3e02f/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f446f63732d73696d2d2d736f66742e6769746875622e696f2d626c75652e737667)](https://sim-soft.github.io/data-flow/)

> A lightweight, composable ETL pipeline library for PHP 8.3+

**DataFlow** helps you move data from one place to another — read from a source (database, CSV, API), transform it (filter, map, validate, enrich), and write it to a destination (database, spreadsheet, file). This pattern is called **ETL** (Extract, Transform, Load) and is the backbone of data migration, reporting, syncing, and batch processing.

With DataFlow, you describe your pipeline as a fluent chain:

```
(new DataFlow())
    ->from($source)         // Extract: where data comes from
    ->transform($logic)     // Transform: reshape, filter, validate
    ->load($destination)    // Load: where data goes
    ->run();
```

No framework required. No external services. Just PHP.

📖 **[Full Documentation](https://sim-soft.github.io/data-flow/)**

Why This Library
----------------

[](#why-this-library)

- **Fluent, composable API** — chain extractors, transformers, and loaders in a single readable expression
- **Built-in resilience** — retry with exponential backoff + jitter, circuit breaker, and checkpoint/resume without external dependencies
- **Zero-overhead opt-in** — every resilience feature uses the null object pattern; disabled features cost nothing at runtime
- **Generator-based streaming** — constant memory footprint regardless of dataset size
- **Per-stage error strategies** — configure Skip, Retry, Throw, or LogAndContinue independently on each stage
- **Crash recovery** — checkpoint/resume enables long-running pipelines to recover from failures without reprocessing from scratch
- **Circuit breaker** — prevents cascading failures when downstream services degrade, a pattern common in microservices (Resilience4j, Polly) but unique among PHP ETL libraries
- **Dead letter collection** — failed and circuit-open rows are captured with full context for inspection or reprocessing
- **Inline schema validation** — validate row data with pipe-delimited rules ( Laravel-style syntax) without leaving the pipeline
- **Real-time metrics** — pluggable MetricsExporter interface for emitting events to logging, StatsD, Prometheus, or custom systems
- **Dry-run mode** — validate entire pipelines without performing actual writes

Install
-------

[](#install)

```
composer require simsoft/data-flow
```

Basic Usage
-----------

[](#basic-usage)

Example using extract, transform and load.

```
require "vendor/autoload.php";

use Simsoft\DataFlow\DataFlow;

(new DataFlow())
    ->from([1, 2, 3])
    ->transform(function($num) {
        return $num * 2;
    })
    ->load(function($num) {
        echo $num . PHP_EOL;
    })
    ->run();

// Output:
// 2
// 4
// 6
```

Limit
-----

[](#limit)

Limit data output.

```
require "vendor/autoload.php";

use Simsoft\DataFlow\DataFlow;

(new DataFlow())
    ->from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    ->transform(function($num) {
        return $num * 2;
    })
    ->limit(5)  // output only 5 data.
    ->load(function($num) {
        echo $num . PHP_EOL;
    })
    ->run();

// Output:
// 2
// 4
// 6
// 8
// 10
```

Filter
------

[](#filter)

Filter method help you to filter the data.

```
require "vendor/autoload.php";

use Simsoft\DataFlow\DataFlow;

(new DataFlow())
    ->from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    ->filter(function($num) {
        // The call back should return bool.
        // In this case, return even number only.
        return $num % 2 === 0;
    })
    ->load(function($num) {
        echo $num . PHP_EOL;
    })
    ->run();

// Output:
// 2
// 4
// 6
// 8
// 10
```

Chunk
-----

[](#chunk)

Splitting data into smaller, manageable parts of a fixed size

```
(new DataFlow())
    ->from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    ->chunk(3) // set chunk size
    ->load(function(array $chunk, $key) {
        echo $key . '=' . json_encode($chunk, JSON_THROW_ON_ERROR) . PHP_EOL;
    })
    ->run();

// Output:
// 0=[1,2,3]
// 1=[4,5,6]
// 2=[7,8,9]
// 3=[10]
```

Mapping
-------

[](#mapping)

Mapping method allow you to convey the data to another format. Original keys are preserved; mapped keys are added or overwritten.

```
(new DataFlow())
    ->from([
        ['First Name' => 'John', 'Last Name' => 'Doe', 'age' => 20],
        ['First Name' => 'Jane', 'Last Name' => 'Doe', 'age' => 30],
        ['First Name' => 'John', 'Last Name' => 'Smith', 'age' => 50],
        ['First Name' => 'Jane', 'Last Name' => 'Smith', 'age' => 60],
    ])
    ->map([
        // rename the key
        'first_name' => 'First Name',
        'last_name' => 'Last Name',

        // customise data via callback method.
        'full_name' => fn($data) => $data['first_name'] . ' ' . $data['last_name'],
        'senior' => fn($data) => $data['age'] > 30 ? 'Yes' : 'No',
    ])
    ->load(function($data) {
        echo $data['full_name'] . ' is ' . $data['age'] . ' years old. ' . $data['senior'] . PHP_EOL;
    })
    ->run();

// Output:
// John Doe is 20 years old. No
// Jane Doe is 30 years old. Yes
// John Smith is 50 years old. Yes
// Jane Smith is 60 years old. Yes
```

Set New Map
-----------

[](#set-new-map)

`setNewMap()` converts source data into a completely new array containing \*\*only \*\* the mapped keys. Unlike `map()` which merges into the existing row, `setNewMap()` discards all original keys.

```
(new DataFlow())
    ->from([
        ['first_name' => 'John', 'last_name' => 'Doe', 'age' => 20, 'status' => 'active', 'internal_id' => 'x99'],
        ['first_name' => 'Jane', 'last_name' => 'Smith', 'age' => 30, 'status' => 'inactive', 'internal_id' => 'x42'],
    ])
    ->setNewMap([
        'name' => fn($row) => $row['first_name'] . ' ' . $row['last_name'],
        'age' => 'age',
    ])
    ->load(function($data) {
        // $data contains ONLY 'name' and 'age' — no 'status', 'internal_id', etc.
        echo json_encode($data) . PHP_EOL;
    })
    ->run();

// Output:
// {"name":"John Doe","age":20}
// {"name":"Jane Smith","age":30}
```

### map() vs setNewMap()

[](#map-vs-setnewmap)

`map()``setNewMap()`Original keysPreservedDiscardedResult containsAll original keys + mapped keysOnly mapped keysUse caseAdd/rename columns while keeping the restReshape into a new structure, drop unwanted fieldsPreview
-------

[](#preview)

`preview()` is a debugging helper that limits the pipeline to N rows and dumps each row's key and value. Use it to inspect the data structure at any point in the pipeline.

```
(new DataFlow())
    ->from([
        ['name' => 'John', 'email' => 'john@example.com'],
        ['name' => 'Jane', 'email' => 'jane@example.com'],
        ['name' => 'Bob', 'email' => 'bob@example.com'],
    ])
    ->map(['full_name' => fn($row) => strtoupper($row['name'])])
    ->preview(2) // show first 2 rows then stop
    ->run();

// Output:
// Key: int(0)
// Value: array(3) { ["name"]=> "John", ["email"]=> "john@example.com", ["full_name"]=> "JOHN" }
//
// Key: int(1)
// Value: array(3) { ["name"]=> "Jane", ["email"]=> "jane@example.com", ["full_name"]=> "JANE" }
```

Insert `preview()` at any point to understand the data shape before writing the next stage.

Flow Continuation
-----------------

[](#flow-continuation)

Connecting flows into a chain.

```
$flow1 = (new DataFlow())
    ->from([1, 2, 3])
    ->transform(function($num) {
        return $num * 2;
    });

(new DataFlow())
    ->from($flow1) // connect flow1 to flow2.
    ->transform(function($num) {
        return $num * 3;
    })
    ->load(function($num) {
        echo $num . PHP_EOL;
    })
    ->run();

// Output:
// 6
// 12
// 18
```

Pipeline Result
---------------

[](#pipeline-result)

Every `run()` call returns a `PipelineResult` with execution metadata.

```
use Simsoft\DataFlow\DataFlow;

$result = (new DataFlow())
    ->from([1, 2, 3, 4, 5])
    ->transform(fn($n) => $n * 2)
    ->load(fn($n) => $n)
    ->run();

echo "Processed: {$result->getProcessedRows()} rows\n";
echo "Duration: " . round($result->getDurationMs()) . "ms\n";
echo "Peak memory: " . round($result->getPeakMemoryBytes() / 1024) . " KB\n";
```

Error Handling
--------------

[](#error-handling)

Configure per-stage error strategies for production resilience.

```
use Simsoft\DataFlow\DataFlow;
use Simsoft\DataFlow\Enums\ErrorStrategy;

$result = (new DataFlow())
    ->from($records)
    ->transform(
        (new MyTransformer())
            ->withErrorStrategy(ErrorStrategy::Skip) // skip failing rows
            ->withName('validator')
    )
    ->load(fn($row) => $row)
    ->run();

echo "Processed: {$result->getProcessedRows()}\n";
echo "Failed: {$result->getFailedRows()}\n";
```

Available strategies: `Throw` (default), `Skip`, `Retry`, `LogAndContinue`.

Dry-Run Mode
------------

[](#dry-run-mode)

Validate pipelines without performing actual writes.

```
$result = (new DataFlow())
    ->from($records)
    ->transform(fn($row) => $row)
    ->load(new DatabaseLoader())
    ->dryRun()
    ->run();

echo "Would process: {$result->getProcessedRows()} rows\n";
// No data was actually written
```

Logging &amp; Progress
----------------------

[](#logging--progress)

Inject a PSR-3 logger and track progress on large datasets.

```
use Simsoft\DataFlow\DataFlow;

$result = (new DataFlow())
    ->from($largeDataset)
    ->withLogger($psrLogger)
    ->onProgress(function (int $count, float $elapsedMs) {
        echo "\r  Processed {$count} rows...";
    }, interval: 1000)
    ->onError(function (\Throwable $e, mixed $row, string $stage) {
        error_log("[{$stage}] {$e->getMessage()}");
    })
    ->transform(fn($row) => $row)
    ->load(fn($row) => $row)
    ->run();
```

Advanced Usage
--------------

[](#advanced-usage)

1. [Using Closure](docs/01-USING_CLOSURE.md)
2. [Useful Processors](docs/02-USEFUL_PROCESSORS.md)
3. [Customized ETL Processor](docs/03-CUSTOMIZED_PROCESSOR.md)
4. [Create Reusable Data Flow](docs/04-CONTROLLABLE_DATAFLOW.md)
5. [Using Payload](docs/05-USING_PAYLOAD.md)
6. [Macro &amp; Mixin](docs/06-MACRO_AND_MIXIN.md)
7. [Error Handling](docs/07-ERROR_HANDLING.md)
8. [Observability &amp; Metrics](docs/08-OBSERVABILITY.md)
9. [Dry-Run Mode](docs/09-DRY_RUN.md)
10. [Schema Validation](docs/10-SCHEMA_VALIDATION.md)
11. [Circuit Breaker](docs/11-CIRCUIT_BREAKER.md)
12. [Checkpoint &amp; Resume](docs/12-CHECKPOINT_RESUME.md)
13. [Metrics Exporter](docs/13-METRICS_EXPORTER.md)
14. [Spreadsheet (PhpSpreadsheet)](docs/14-SPREADSHEET.md)

License
-------

[](#license)

The Simsoft DataFlow is licensed under the MIT License. See the [LICENSE](LICENSE) file for details

###  Health Score

47

—

FairBetter than 93% of packages

Maintenance96

Actively maintained with recent releases

Popularity10

Limited adoption so far

Community10

Small or concentrated contributor base

Maturity63

Established project with proven stability

 Bus Factor1

Top contributor holds 56.1% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~37 days

Recently: every ~101 days

Total

16

Last Release

19d ago

Major Versions

1.0.11 → 2.0.02026-05-17

PHP version history (2 changes)1.0.0PHP &gt;=8.2

2.0.2PHP &gt;=8.3

### Community

Maintainers

![](https://www.gravatar.com/avatar/7c3e6315469b56ed1797318e31e05bcddb12dba268488a2fb0cd2b43971c9ac3?d=identicon)[vzangloo](/maintainers/vzangloo)

---

Top Contributors

[![vzangloo](https://avatars.githubusercontent.com/u/1908200?v=4)](https://github.com/vzangloo "vzangloo (37 commits)")[![sim-soft](https://avatars.githubusercontent.com/u/118705222?v=4)](https://github.com/sim-soft "sim-soft (29 commits)")

---

Tags

phpstreamingxlsxcsviteratorspreadsheetbatchextracttransformetlloadpipelinedata processingdata flow

###  Code Quality

TestsPHPUnit

Static AnalysisPHPStan

Type Coverage Yes

### Embed Badge

![Health badge](/badges/simsoft-data-flow/health.svg)

```
[![Health](https://phpackages.com/badges/simsoft-data-flow/health.svg)](https://phpackages.com/packages/simsoft-data-flow)
```

###  Alternatives

[tempest/framework

The PHP framework that gets out of your way.

2.2k34.4k15](/packages/tempest-framework)[flow-php/flow

PHP ETL - Extract Transform Load - Data processing framework

85036.3k](/packages/flow-php-flow)[maatwebsite/excel

Supercharged Excel exports and imports in Laravel

12.9k157.3M896](/packages/maatwebsite-excel)[kimai/kimai

Kimai - Time Tracking

4.8k9.0k1](/packages/kimai-kimai)[shopware/platform

The Shopware e-commerce core

3.4k1.5M3](/packages/shopware-platform)[shopware/core

Shopware platform is the core for all Shopware ecommerce products.

585.6M572](/packages/shopware-core)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
