PHPackages                             boutdecode/etl-core-bundle - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Utility &amp; Helpers](/categories/utility)
4. /
5. boutdecode/etl-core-bundle

ActiveSymfony-bundle[Utility &amp; Helpers](/categories/utility)

boutdecode/etl-core-bundle
==========================

Symfony Bundle providing a configurable ETL (Extract/Transform/Load) pipeline engine with CQS, scheduling and workflow support.

v0.2.0(3w ago)2641MITPHPPHP &gt;=8.2CI failing

Since Mar 17Pushed 3w agoCompare

[ Source](https://github.com/boutdecode/etl-core)[ Packagist](https://packagist.org/packages/boutdecode/etl-core-bundle)[ RSS](/packages/boutdecode-etl-core-bundle/feed)WikiDiscussions main Synced 3w ago

READMEChangelog (2)Dependencies (53)Versions (4)Used By (1)

ETLCoreBundle
=============

[](#etlcorebundle)

A Symfony Bundle providing a configurable ETL (Extract / Transform / Load) pipeline engine built on top of Domain-Driven Design, CQS, Symfony Messenger, Symfony Workflow and Flow-PHP.

[![PHP Version](https://camo.githubusercontent.com/50b13d3d01778282febe11a92c7166976ae637e4ee8ffe64bce210549ed5e096/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f5048502d382e32253230253743253230382e33253230253743253230382e342d626c7565)](https://php.net/)[![Symfony](https://camo.githubusercontent.com/41bad6518b62d28a237aff2accb44d508824602a6994a3e2683ef4ec92037599/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f53796d666f6e792d362e34253230253743253230372e782d677265656e)](https://symfony.com/)[![License](https://camo.githubusercontent.com/8174925d009b42074d50ab5cc7e29fcb1aa613b0d9cb2e43097697a40cf90fa4/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f4c6963656e73652d4d49542d79656c6c6f77)](LICENSE)

---

Requirements
------------

[](#requirements)

DependencyVersionPHP`>= 8.2`Symfony`^6.4 || ^7.0`Doctrine ORM`^3.6`Flow-PHP ETL`~0.25`---

Concepts
--------

[](#concepts)

### ETL — Extract, Transform, Load

[](#etl--extract-transform-load)

ETL is a data processing pattern split into three sequential stages:

StageRole**Extract**Read raw data from a source (CSV file, API, database, …)**Transform**Filter, map, enrich or validate the extracted data**Load**Write the processed data to a destination (database, JSON file, …)Each stage is implemented as a **Step** — a single, focused unit of work. Steps are chained together so the output of one becomes the input of the next, flowing through a shared `Context` object.

### Workflow vs Pipeline

[](#workflow-vs-pipeline)

These two terms look similar but represent fundamentally different things in this bundle:

**Workflow — the reusable template**

A `Workflow` is a named, static definition that describes *what* should happen:

- the ordered list of steps to execute (`stepConfiguration`), each identified by a `code` that maps to a registered `ExecutableStep` service
- the default configuration for each step
- global options (timeout, retry policy, …) via `configuration`

A `Workflow` has no notion of time, data, or execution state. It never runs by itself. Think of it as a *class* or a *recipe*.

**Pipeline — the execution instance**

A `Pipeline` is a concrete, time-bound instance created from a `Workflow`. It represents *one specific run*:

- it holds the actual **input data** for that run (e.g. path to the file to import)
- it may **override** the step configuration for that run specifically
- it carries a **status** (`pending` → `in_progress` → `completed` / `failed`) managed by a Symfony Workflow state machine
- it records timestamps (`scheduledAt`, `startedAt`, `finishedAt`)

Think of it as an *object instantiated from a class* — or a *ticket raised against a recipe*.

```
Workflow  ──createFromWorkflowId()──►  Pipeline  ──dispatch()──►  execution
(template, reusable)                   (instance, stateful)        (runtime)

```

**Step — configuration vs execution**

The same word "step" covers two distinct things:

ConceptWhereRole`Step` (config)Stored with the `Pipeline`Carries `code`, `order`, and per-step `configuration`. A value object — no logic.`ExecutableStep` (service)Symfony DI containerImplements the actual ETL logic in `process(Context)`. Tagged `boutdecode_etl_core.executable_step`.At runtime the `StepResolver` bridges the two: it looks up the `ExecutableStep` service whose tag matches `Step::getCode()`, clones it, applies the step configuration, and hands it to the execution chain.

---

Installation
------------

[](#installation)

```
composer require boutdecode/etl-core-bundle
```

If you are **not** using Symfony Flex, register the bundle manually:

```
// config/bundles.php
return [
    // ...
    BoutDeCode\ETLCoreBundle\BoutDeCodeETLCoreBundle::class => ['all' => true],
];
```

```
// config/packages/boutdecode_etl_core.yaml

imports:
    - { resource: "@BoutDeCodeETLCoreBundle/Resources/config/config.yaml" }
```

---

Configuration
-------------

[](#configuration)

No configuration is required. The bundle works out of the box with sensible defaults.

The bundle exposes no configurable keys under `boutdecode_etl_core:` — all service IDs, tags, and bus names are fixed constants defined by the bundle itself:

ConstantValueCommand bus`boutdecode_etl_core.command.bus`Query bus`boutdecode_etl_core.query.bus`Executable step tag`boutdecode_etl_core.executable_step`Step middleware tag`boutdecode_etl_core.step_middleware`Pipeline middleware tag`boutdecode_etl_core.pipeline_middleware`---

Data
----

[](#data)

### Entities

[](#entities)

The bundle does **not** ship Doctrine entities. You must create them in your application and then generate the migrations.

The bundle provides abstract base classes to extend and interfaces to implement:

What to createExtendsImplements`Workflow` entity`AbstractWorkflow`—`Step` entity`AbstractStep`—`Pipeline` entity`AbstractPipeline`—`StepHistory` entity`AbstractStepHistory`—`PipelineHistory` entity`AbstractPipelineHistory`—Each abstract class holds all the typed properties and method implementations. The only thing left to add in the concrete entity is:

- A Doctrine `#[ORM\Entity]` / `#[ORM\Table]` mapping.
- An `$id` property with its getter (`getId(): string`), except for `Step` and history entities where you may choose any PK strategy.
- The ORM column/relation mappings on the inherited properties (use `#[ORM\Column]` etc. directly in the child class).

#### Example — minimal entity set

[](#example--minimal-entity-set)

```
// src/Entity/Workflow.php
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\AbstractWorkflow;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Uid\Uuid;

#[ORM\Entity]
#[ORM\Table(name: 'workflow')]
class Workflow extends AbstractWorkflow
{
    #[ORM\Id]
    #[ORM\Column(type: 'uuid', unique: true)]
    private string $id;

    #[ORM\Column]
    protected string $name;

    #[ORM\Column(nullable: true)]
    protected ?string $description = null;

    #[ORM\Column(type: 'json')]
    protected array $stepConfiguration = [];

    #[ORM\Column(type: 'json')]
    protected array $configuration = [];

    #[ORM\Column]
    protected \DateTimeImmutable $createdAt;

    #[ORM\Column(nullable: true)]
    protected ?\DateTimeImmutable $updatedAt = null;

    public function __construct(string $name)
    {
        $this->id = (string) Uuid::v7();
        $this->name = $name;
        $this->createdAt = new \DateTimeImmutable();
        $this->stepConfiguration = [];
        $this->configuration = [];
    }

    public function getId(): string
    {
        return $this->id;
    }
}
```

```
// src/Entity/Step.php
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\AbstractStep;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Uid\Uuid;

#[ORM\Entity]
#[ORM\Table(name: 'step')]
class Step extends AbstractStep
{
    #[ORM\Id]
    #[ORM\Column(type: 'uuid', unique: true)]
    private string $id;

    #[ORM\ManyToOne(targetEntity: Workflow::class)]
    #[ORM\JoinColumn(nullable: false)]
    private Workflow $workflow;

    #[ORM\Column(nullable: true)]
    protected ?string $name = null;

    #[ORM\Column]
    protected string $code;

    #[ORM\Column(type: 'json')]
    protected array $configuration = [];

    #[ORM\Column]
    protected int $order = 0;

    public function __construct(string $code, Workflow $workflow)
    {
        $this->id = (string) Uuid::v7();
        $this->code = $code;
        $this->workflow = $workflow;
    }

    public function getId(): string
    {
        return $this->id;
    }
}
```

```
// src/Entity/Pipeline.php
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\AbstractPipeline;
use BoutDeCode\ETLCoreBundle\Core\Domain\Enum\PipelineStatus;
use Doctrine\Common\Collections\ArrayCollection;
use Doctrine\Common\Collections\Collection;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Uid\Uuid;

#[ORM\Entity]
#[ORM\Table(name: 'pipeline')]
class Pipeline extends AbstractPipeline
{
    #[ORM\Id]
    #[ORM\Column(type: 'uuid', unique: true)]
    private string $id;

    #[ORM\ManyToOne(targetEntity: Workflow::class)]
    #[ORM\JoinColumn(nullable: false)]
    protected Workflow $workflow;

    #[ORM\OneToMany(targetEntity: Step::class, mappedBy: 'pipeline', cascade: ['persist'])]
    #[ORM\OrderBy(['order' => 'ASC'])]
    protected iterable $steps;

    #[ORM\Column(type: 'json')]
    protected array $configuration = [];

    #[ORM\Column(type: 'json')]
    protected array $input = [];

    #[ORM\Column(enumType: PipelineStatus::class)]
    protected PipelineStatus $status;

    #[ORM\Column]
    protected \DateTimeImmutable $createdAt;

    #[ORM\Column(nullable: true)]
    protected ?\DateTimeImmutable $scheduledAt = null;

    #[ORM\Column(nullable: true)]
    protected ?\DateTimeImmutable $startedAt = null;

    #[ORM\Column(nullable: true)]
    protected ?\DateTimeImmutable $finishedAt = null;

    public function __construct(Workflow $workflow)
    {
        $this->id = (string) Uuid::v7();
        $this->workflow = $workflow;
        $this->status = PipelineStatus::PENDING;
        $this->createdAt = new \DateTimeImmutable();
        $this->steps = new ArrayCollection();
        $this->runnableSteps = new ArrayCollection();
    }

    public function getId(): string
    {
        return $this->id;
    }
}
```

```
// src/Entity/StepHistory.php
use BoutDeCode\ETLCoreBundle\Run\Domain\Model\AbstractStepHistory;
use BoutDeCode\ETLCoreBundle\Run\Domain\Enum\StepHistoryStatusEnum;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Uid\Uuid;

#[ORM\Entity]
#[ORM\Table(name: 'step_history')]
class StepHistory extends AbstractStepHistory
{
    #[ORM\Id]
    #[ORM\Column(type: 'uuid', unique: true)]
    private string $id;

    #[ORM\Column(enumType: StepHistoryStatusEnum::class)]
    protected StepHistoryStatusEnum $status;

    #[ORM\Column]
    protected \DateTimeImmutable $createdAt;

    #[ORM\Column(type: 'json', nullable: true)]
    protected mixed $input = null;

    #[ORM\Column(type: 'json', nullable: true)]
    protected mixed $result = null;

    public function __construct(StepHistoryStatusEnum $status, mixed $input, mixed $result)
    {
        $this->id = (string) Uuid::v7();
        $this->status = $status;
        $this->createdAt = new \DateTimeImmutable();
        $this->input = $input;
        $this->result = $result;
    }

    public function getId(): string
    {
        return $this->id;
    }
}
```

```
// src/Entity/PipelineHistory.php
use BoutDeCode\ETLCoreBundle\Run\Domain\Model\AbstractPipelineHistory;
use BoutDeCode\ETLCoreBundle\Run\Domain\Enum\PipelineHistoryStatusEnum;
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\Pipeline as PipelineInterface;
use Doctrine\Common\Collections\ArrayCollection;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Uid\Uuid;

#[ORM\Entity]
#[ORM\Table(name: 'pipeline_history')]
class PipelineHistory extends AbstractPipelineHistory
{
    #[ORM\Id]
    #[ORM\Column(type: 'uuid', unique: true)]
    private string $id;

    #[ORM\ManyToOne(targetEntity: Pipeline::class)]
    #[ORM\JoinColumn(nullable: false)]
    protected PipelineInterface $pipeline;

    #[ORM\Column(enumType: PipelineHistoryStatusEnum::class)]
    protected PipelineHistoryStatusEnum $status;

    #[ORM\Column]
    protected \DateTimeImmutable $createdAt;

    #[ORM\Column(type: 'json', nullable: true)]
    protected mixed $input = null;

    #[ORM\Column(type: 'json', nullable: true)]
    protected mixed $result = null;

    #[ORM\OneToMany(targetEntity: StepHistory::class, mappedBy: 'pipelineHistory', cascade: ['persist'])]
    protected iterable $stepHistories;

    public function __construct(PipelineInterface $pipeline, PipelineHistoryStatusEnum $status, mixed $input, mixed $result)
    {
        $this->id = (string) Uuid::v7();
        $this->pipeline = $pipeline;
        $this->status = $status;
        $this->createdAt = new \DateTimeImmutable();
        $this->input = $input;
        $this->result = $result;
        $this->stepHistories = new ArrayCollection();
    }

    public function getId(): string
    {
        return $this->id;
    }
}
```

### Migrations

[](#migrations)

Once all entities are created, generate and run the Doctrine migrations:

```
php bin/console doctrine:migrations:diff
php bin/console doctrine:migrations:migrate
```

---

Architecture
------------

[](#architecture)

```
src/
├── ETLCoreBundle.php               # Bundle entry point
├── DependencyInjection/
│   ├── ETLCoreExtension.php        # Loads services, exposes config parameters
│   └── Configuration.php           # Config tree (boutdecode_etl_core:)
├── Resources/config/
│   ├── services.yaml               # Service definitions & tagged iterators
│   ├── config.yaml                 # Root import (messenger + workflow)
│   └── packages/
│       ├── messenger.yaml          # Buses & routing
│       └── workflow.yaml           # pipeline_lifecycle state machine
├── Core/                           # Central domain (Pipeline, Step, Context)
├── ETL/                            # ETL logic (Extract, Transform, Load)
├── Run/                            # Execution engine & middleware
└── CQS/                            # Command / Query Separation

```

### Key patterns

[](#key-patterns)

PatternWhereDomain-Driven Design`*/Domain/` layersCQS (Command / Query Separation)`src/CQS/`Middleware chain`Run/Domain/Middleware/`Strategy (pluggable steps)`ETL/Domain/Model/` + `ExecutableStep` tagState machine`pipeline_lifecycle` Symfony Workflow---

Implementing a Custom Step
--------------------------

[](#implementing-a-custom-step)

### 1. Declare the step with `#[AsExecutableStep]`

[](#1-declare-the-step-with-asexecutablestep)

Every step class must carry the `#[AsExecutableStep]` attribute. It serves two purposes:

- **`code`** — the unique machine identifier used to resolve the step at runtime (e.g. when a `Workflow` references a step by its code). It must be unique across the whole application.
- **`configurationDescription`** *(optional)* — a map of configuration key → human-readable description, returned by `getConfigurationDescription()`. Useful for documentation and introspection.

```
use BoutDeCode\ETLCoreBundle\ETL\Domain\Attribute\AsExecutableStep;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Model\AbstractExtractorStep;

#[AsExecutableStep(
    code: 'app.extractor.my_csv',
    configurationDescription: [
        'source'    => 'Absolute path to the CSV file',
        'delimiter' => 'Field delimiter character (default: ",")',
    ],
)]
final class MyCsvExtractorStep extends AbstractExtractorStep
{
    // …
}
```

### 2. Implement the stage method

[](#2-implement-the-stage-method)

Extend one of the three abstract base classes and implement the corresponding method. The `Context $context` parameter is always injected by the framework — you do not need to call `process()` yourself.

Base classMethod to implementSignature`AbstractExtractorStep``extract()``extract(mixed $source, Context $context, array $configuration = []): mixed``AbstractTransformerStep``transform()``transform(mixed $data, Context $context, array $configuration = []): mixed``AbstractLoaderStep``load()``load(mixed $data, mixed $destination, Context $context, array $configuration = []): mixed`The `$configuration` array is automatically populated from the step's entry in the pipeline configuration. Default values can also be injected via the constructor and stored in `$this->configuration`.

#### Extractor example

[](#extractor-example)

```
use BoutDeCode\ETLCoreBundle\Core\Domain\DTO\Context;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Attribute\AsExecutableStep;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Model\AbstractExtractorStep;

#[AsExecutableStep(
    code: 'app.extractor.my_csv',
    configurationDescription: [
        'source'    => 'Absolute path to the CSV file',
        'delimiter' => 'Field delimiter character (default: ",")',
    ],
)]
final class MyCsvExtractorStep extends AbstractExtractorStep
{
    public function __construct(
        private readonly string $delimiter = ',',
    ) {}

    public function extract(mixed $source, Context $context, array $configuration = []): array
    {
        $filePath  = is_string($source) ? $source : ($configuration['source'] ?? '');
        $delimiter = $configuration['delimiter'] ?? $this->delimiter;

        // … read and return rows …
        return [];
    }
}
```

#### Transformer example

[](#transformer-example)

```
use BoutDeCode\ETLCoreBundle\Core\Domain\DTO\Context;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Attribute\AsExecutableStep;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Model\AbstractTransformerStep;

#[AsExecutableStep(
    code: 'app.transformer.uppercase_name',
    configurationDescription: [
        'field' => 'Name of the field to uppercase (default: "name")',
    ],
)]
final class UppercaseNameTransformStep extends AbstractTransformerStep
{
    public function transform(mixed $data, Context $context, array $configuration = []): mixed
    {
        if (! is_array($data)) {
            return $data;
        }

        $field = $configuration['field'] ?? 'name';

        return array_map(static function (array $row) use ($field): array {
            if (isset($row[$field]) && is_string($row[$field])) {
                $row[$field] = strtoupper($row[$field]);
            }

            return $row;
        }, $data);
    }
}
```

#### Loader example

[](#loader-example)

```
use BoutDeCode\ETLCoreBundle\Core\Domain\DTO\Context;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Attribute\AsExecutableStep;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Model\AbstractLoaderStep;

#[AsExecutableStep(
    code: 'app.loader.csv_file',
    configurationDescription: [
        'destination' => 'Absolute path to the output CSV file',
    ],
)]
final class CsvFileLoadStep extends AbstractLoaderStep
{
    public function load(mixed $data, mixed $destination, Context $context, array $configuration = []): bool
    {
        if (! is_string($destination)) {
            throw new \InvalidArgumentException('Destination must be a file path string.');
        }

        // … write $data to $destination …
        return true;
    }
}
```

### 3. Register the step

[](#3-register-the-step)

All classes that implement `ExecutableStep` (which all three abstract base classes do) are **automatically tagged** `boutdecode_etl_core.executable_step` via `_instanceof` — no manual service configuration is required as long as your class is picked up by Symfony's autowiring.

```
# config/services.yaml  (standard Symfony service autodiscovery — nothing extra needed)
App\ETL\Step\:
    resource: '../src/ETL/Step/'
    autowire: true
    autoconfigure: true
```

If for any reason you need to register a step explicitly:

```
# config/services.yaml
App\ETL\Step\MyCsvExtractorStep:
    tags:
        - { name: boutdecode_etl_core.executable_step }
```

### 4. Reference the step in a Workflow

[](#4-reference-the-step-in-a-workflow)

Use the `code` declared in `#[AsExecutableStep]` as the step identifier in your `Workflow`'s `stepConfiguration`:

```
$workflow->setStepConfiguration([
    [
        'code'          => 'app.extractor.my_csv',
        'name'          => 'extract_customers',
        'order'         => 1,
        'configuration' => [
            'source'    => '/data/customers.csv',
            'delimiter' => ';',
        ],
    ],
    [
        'code'          => 'app.transformer.uppercase_name',
        'name'          => 'normalize_names',
        'order'         => 2,
        'configuration' => [
            'field' => 'name',
        ],
    ],
    [
        'code'          => 'app.loader.csv_file',
        'name'          => 'save_result',
        'order'         => 3,
        'configuration' => [
            'destination' => '/output/result.csv',
        ],
    ],
]);
```

At runtime, the `StepResolver` reads each step's `code`, finds the matching tagged service in the container, and injects the per-step `configuration` before execution.

---

CQS — Commands &amp; Queries
----------------------------

[](#cqs--commands--queries)

### Dispatching a command

[](#dispatching-a-command)

Inject `CommandBus` and call `dispatch()` with any object implementing `Command`:

```
use BoutDeCode\ETLCoreBundle\CQS\Application\Operation\Command\CommandBus;

class MyService
{
    public function __construct(private readonly CommandBus $commandBus) {}

    public function doSomething(): void
    {
        $this->commandBus->dispatch(new MyCommand(/* ... */));
    }
}
```

### Running a pipeline from a Workflow

[](#running-a-pipeline-from-a-workflow)

The bundle ships one built-in command: `ExecuteWorkflowCommand`. It takes a persisted pipeline ID and triggers the full middleware chain asynchronously.

#### Step 1 — Implement `PipelineFactory`

[](#step-1--implement-pipelinefactory)

The bundle provides the `PipelineFactory` interface but no concrete implementation — you must provide one (typically a Doctrine-backed service):

```
// src/Factory/PipelineFactory.php
use BoutDeCode\ETLCoreBundle\Core\Domain\Factory\PipelineFactory as PipelineFactoryInterface;
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\Pipeline;
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\Step;
use BoutDeCode\ETLCoreBundle\Core\Domain\Data\Provider\WorkflowProvider;
use BoutDeCode\ETLCoreBundle\Core\Domain\Data\Persister\PipelinePersister;

final class PipelineFactory implements PipelineFactoryInterface
{
    public function __construct(
        private readonly WorkflowProvider $workflowProvider,
        private readonly PipelinePersister $pipelinePersister,
    ) {}

    public function create(array $steps = [], array $configuration = []): Pipeline
    {
        // build a Pipeline from a list of Step objects
        // ...
    }

    /**
     * @param array $overrideConfiguration
     * @param array $input
     */
    public function createFromWorkflowId(
        string $workflowId,
        array $overrideConfiguration = [],
        array $input = [],
    ): Pipeline {
        $workflow = $this->workflowProvider->findWorkflowByIdentifier($workflowId);

        // build Pipeline from Workflow steps & config, then persist it
        $pipeline = new \App\Entity\Pipeline($workflow);
        // ... populate steps, configuration, input ...

        return $this->pipelinePersister->create($pipeline);
    }
}
```

The bundle's `DataInterfaceAliasPass` compiler pass automatically creates the DI alias as soon as your class is registered as a service — no manual wiring needed.

#### Step 2 — Create and persist the Pipeline

[](#step-2--create-and-persist-the-pipeline)

```
use BoutDeCode\ETLCoreBundle\Core\Domain\Factory\PipelineFactory;

final class StartImportHandler
{
    public function __construct(
        private readonly PipelineFactory $pipelineFactory,
    ) {}

    public function handle(string $workflowId): string
    {
        $pipeline = $this->pipelineFactory->createFromWorkflowId(
            workflowId: $workflowId,
            overrideConfiguration: [
                'extract_step' => ['file' => '/data/import.csv'],
            ],
            input: ['source' => 'manual'],
        );

        // Pipeline is now persisted with PipelineStatus::PENDING
        return $pipeline->getId();
    }
}
```

#### Step 3 — Dispatch the execution command

[](#step-3--dispatch-the-execution-command)

```
use BoutDeCode\ETLCoreBundle\CQS\Application\Operation\Command\CommandBus;
use BoutDeCode\ETLCoreBundle\Run\Application\Operation\Command\ExecuteWorkflowCommand;

final class StartImportHandler
{
    public function __construct(
        private readonly PipelineFactory $pipelineFactory,
        private readonly CommandBus $commandBus,
    ) {}

    public function handle(string $workflowId): void
    {
        $pipeline = $this->pipelineFactory->createFromWorkflowId(
            workflowId: $workflowId,
            input: ['source' => 'manual'],
        );

        // ExecuteWorkflowCommand implements AsyncCommand:
        // routed to an async Messenger transport if one is configured,
        // otherwise handled synchronously.
        $this->commandBus->dispatch(
            new ExecuteWorkflowCommand(pipelineId: $pipeline->getId())
        );
    }
}
```

> **Note:** `ExecuteWorkflowCommand` implements `AsyncCommand`. If you configure a Symfony Messenger transport for the `async` routing key the execution will be deferred to a worker. The pipeline must be in `PipelineStatus::PENDING` — if it is already `IN_PROGRESS`, `COMPLETED`, or `FAILED` the handler returns silently without re-running it.

#### Reading the results

[](#reading-the-results)

`CommandBus::dispatch()` returns the value produced by the handler (`Context`). You can inspect the results directly when running synchronously:

```
use BoutDeCode\ETLCoreBundle\Core\Domain\DTO\Context;

/** @var Context $context */
$context = $this->commandBus->dispatch(
    new ExecuteWorkflowCommand(pipelineId: $pipeline->getId())
);

// Last result produced by the pipeline
$result = $context->getResult();

// Result keyed by step name
$extracted = $context->getResultByKey('extract_step');

// Check for step failures
$errors = $context->getErrors(); // array
```

---

Adding Custom Middleware
------------------------

[](#adding-custom-middleware)

### Pipeline middleware

[](#pipeline-middleware)

```
use BoutDeCode\ETLCoreBundle\Run\Domain\Middleware\Middleware;
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\Context;

final class AuditPipelineMiddleware implements Middleware
{
    public function process(Context $context, callable $next): Context
    {
        // before
        $result = $next($context);
        // after
        return $result;
    }
}
```

```
# config/services.yaml
App\Middleware\AuditPipelineMiddleware:
    tags:
        - { name: boutdecode_etl_core.pipeline_middleware, priority: 50 }
```

### Step middleware

[](#step-middleware)

Same pattern, tag name: `boutdecode_etl_core.step_middleware`.

Built-in middleware priority reference:

MiddlewareTagPriority`PipelineStartMiddleware`pipeline100`PipelineFailureMiddleware`pipeline1`PipelineProcessMiddleware`pipeline0`PipelineHistoryMiddleware`pipeline-50`PipelineSuccessMiddleware`pipeline-100`StepStartMiddleware`step100`StepFailureMiddleware`step1`StepProcessMiddleware`step0`StepHistoryMiddleware`step-50`StepSuccessMiddleware`step-100---

Testing
-------

[](#testing)

```
# All tests
composer test

# Unit tests only
composer test:unit

# Integration tests only
composer test:integration
```

Current status: **394 unit tests, 3 integration tests — all passing**.

---

License
-------

[](#license)

MIT — see [LICENSE](LICENSE).

---

**Built with ❤️ by [Boutdecode](https://github.com/boutdecode)**

###  Health Score

41

—

FairBetter than 87% of packages

Maintenance95

Actively maintained with recent releases

Popularity15

Limited adoption so far

Community10

Small or concentrated contributor base

Maturity38

Early-stage or recently created project

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~38 days

Total

3

Last Release

23d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/19c26457bb6e8bb12ec378c98d1e0980b745f1c2df70618d6308c444b5a27891?d=identicon)[kevin.balicot](/maintainers/kevin.balicot)

---

Top Contributors

[![kevinbalicot](https://avatars.githubusercontent.com/u/4362142?v=4)](https://github.com/kevinbalicot "kevinbalicot (4 commits)")

---

Tags

symfonybundleetlpipelinecqs

###  Code Quality

TestsPHPUnit

Static AnalysisPHPStan

Code StyleECS

Type Coverage Yes

### Embed Badge

![Health badge](/badges/boutdecode-etl-core-bundle/health.svg)

```
[![Health](https://phpackages.com/badges/boutdecode-etl-core-bundle/health.svg)](https://phpackages.com/packages/boutdecode-etl-core-bundle)
```

###  Alternatives

[sulu/sulu

Core framework that implements the functionality of the Sulu content management system

1.3k1.4M196](/packages/sulu-sulu)[sylius/sylius

E-Commerce platform for PHP, based on Symfony framework.

8.5k5.8M712](/packages/sylius-sylius)[shopware/core

Shopware platform is the core for all Shopware ecommerce products.

585.4M518](/packages/shopware-core)[rcsofttech/audit-trail-bundle

Enterprise-grade, high-performance Symfony audit trail bundle. Automatically track Doctrine entity changes with split-phase architecture, multiple transports (HTTP, Queue, Doctrine), and sensitive data masking.

1155.2k](/packages/rcsofttech-audit-trail-bundle)[pimcore/pimcore

Content &amp; Product Management Framework (CMS/PIM/E-Commerce)

3.8k3.8M462](/packages/pimcore-pimcore)[shopware/platform

The Shopware e-commerce core

3.4k1.5M3](/packages/shopware-platform)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
