PHPackages                             drnasin/workflow-orchestrator - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. drnasin/workflow-orchestrator

ActiveLibrary[Queues &amp; Workers](/categories/queues)

drnasin/workflow-orchestrator
=============================

A lightweight, stateless workflow orchestration library for PHP

v1.5.0(3mo ago)00MITPHPPHP ^8.3CI passing

Since Sep 17Pushed 3mo agoCompare

[ Source](https://github.com/drnasin/workflow-orchestrator)[ Packagist](https://packagist.org/packages/drnasin/workflow-orchestrator)[ RSS](/packages/drnasin-workflow-orchestrator/feed)WikiDiscussions master Synced 1mo ago

READMEChangelog (1)Dependencies (2)Versions (8)Used By (0)

[![Tests](https://github.com/drnasin/workflow-orchestrator/actions/workflows/tests.yml/badge.svg)](https://github.com/drnasin/workflow-orchestrator/actions/workflows/tests.yml)[![License: MIT](https://camo.githubusercontent.com/fdf2982b9f5d7489dcf44570e714e3a15fce6253e0cc6b5aa61a075aac2ff71b/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f4c6963656e73652d4d49542d79656c6c6f772e737667)](https://opensource.org/licenses/MIT)[![PHP Version](https://camo.githubusercontent.com/c6447a0f34a5721f3e61f5e30bd15977d04dc02ecbcd26efad1c3328d9f864f5/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f5048502d253345253344382e332d3838393242462e737667)](https://php.net)

Workflow Orchestrator
=====================

[](#workflow-orchestrator)

A lightweight, stateless workflow orchestration library for PHP 8.3+.

Installation
------------

[](#installation)

```
composer require drnasin/workflow-orchestrator
```

Quick Start
-----------

[](#quick-start)

### 1. Define Your Workflow

[](#1-define-your-workflow)

```
use WorkflowOrchestrator\WorkflowOrchestrator;
use WorkflowOrchestrator\Attributes\Orchestrator;
use WorkflowOrchestrator\Attributes\Handler;

class OrderProcessor
{
    #[Orchestrator(channel: 'process.order')]
    public function processOrder(Order $order): array
    {
        $steps = ['validate', 'payment'];

        if ($order->getCustomer()->isPremium()) {
            $steps[] = 'apply-discount';
        }

        $steps[] = 'confirmation';
        return $steps;
    }

    #[Handler(channel: 'validate')]
    public function validate(Order $order): Order
    {
        // Validation logic
        if (!$order->hasItems()) {
            throw new \InvalidArgumentException('Order must have items');
        }
        return $order;
    }

    #[Handler(channel: 'payment')]
    public function processPayment(Order $order): Order
    {
        // Payment processing logic
        $order->markAsPaid();
        return $order;
    }

    #[Handler(channel: 'apply-discount')]
    public function applyDiscount(Order $order): Order
    {
        // Apply premium discount
        $order->applyDiscount(0.15);
        return $order;
    }

    #[Handler(channel: 'confirmation')]
    public function sendConfirmation(Order $order): Order
    {
        // Send confirmation email
        mail($order->getCustomer()->getEmail(), 'Order Confirmed', '...');
        return $order;
    }
}
```

### 2. Execute the Workflow

[](#2-execute-the-workflow)

```
// Simple usage
$orchestrator = WorkflowOrchestrator::create()
    ->register(OrderProcessor::class);

$result = $orchestrator->execute('process.order', $order);
```

That's it! The workflow will:

1. Validate the order
2. Process payment
3. Apply discount (if premium customer)
4. Send confirmation

Features
--------

[](#features)

- ✅ **Stateless workflow execution** - No database storage required
- ✅ **Dynamic step routing** - Workflows adapt based on data
- ✅ **Async step processing** - Heavy operations run in background with configurable retry
- ✅ **Header/metadata support** - Pass context between steps
- ✅ **Middleware support** - Add cross-cutting concerns
- ✅ **Step timeouts** - Enforce time limits on individual handlers
- ✅ **Event listeners** - Track step execution for logging, metrics, and monitoring
- ✅ **Clean PHP 8+ API** - Uses modern attributes and readonly properties
- ✅ **Container integration** - Works with any PSR-11 container
- ✅ **Zero dependencies** - Only requires PSR interfaces

Why Workflow Orchestrator?
--------------------------

[](#why-workflow-orchestrator)

### Traditional Approach Problems

[](#traditional-approach-problems)

```
// ❌ Hard to maintain, test, and modify
class OrderService
{
    public function process(Order $order): void
    {
        $this->validate($order);
        $this->processPayment($order);     // What if this fails?
        $this->updateInventory($order);    // What if this is slow?
        $this->sendEmail($order);          // What if email is down?
        $this->generateInvoice($order);    // Getting complex...
    }
}
```

### Workflow Orchestrator Approach

[](#workflow-orchestrator-approach)

```
// ✅ Clear, testable, and flexible
#[Orchestrator(channel: 'process.order')]
public function processOrder(Order $order): array
{
    return ['validate', 'payment', 'inventory', 'email', 'invoice'];
}

// Each step is isolated, testable, and can be async
#[Handler(channel: 'email', async: true)]
public function sendEmail(Order $order): Order { ... }
```

### Benefits Over State Machines

[](#benefits-over-state-machines)

- **Focus on behavior, not state** - Define what happens, not state transitions
- **No database storage** - Workflows are stateless and self-contained
- **Zero migrations** - Deploy workflow changes instantly
- **Easy testing** - Each step is independently testable
- **Horizontal scaling** - Any server can process any step

### Performance

[](#performance)

- **Stateless execution** - No database queries for workflow state
- **Async support** - Heavy operations don't block
- **Memory efficient** - No persistent workflow instances
- **Scales horizontally** - Add more servers without coordination

Error Handling
--------------

[](#error-handling)

Workflow steps that fail are automatically wrapped with context:

```
try {
    $result = $orchestrator->execute('process.order', $order);
} catch (WorkflowException $e) {
    // Exception message: "Step 'payment' failed: Card declined"
    Log::error('Workflow failed', [
        'step' => $e->getFailedStep(), // Available if you extend the exception
        'message' => $e->getMessage(),
        'original' => $e->getPrevious(),
    ]);
}
```

Testing
-------

[](#testing)

Test workflows in isolation:

```
use PHPUnit\Framework\TestCase;

class OrderProcessorTest extends TestCase
{
    public function test_premium_customer_gets_discount(): void
    {
        $container = new SimpleContainer();
        $orchestrator = WorkflowOrchestrator::create($container)
            ->register(OrderProcessor::class);

        $order = new Order($premiumCustomer, $items);

        $result = $orchestrator->execute('process.order', $order);

        $this->assertTrue($result->hasDiscount());
        $this->assertTrue($result->isPaid());
    }

    public function test_regular_customer_no_discount(): void
    {
        $container = new SimpleContainer();
        $orchestrator = WorkflowOrchestrator::create($container)
            ->register(OrderProcessor::class);

        $order = new Order($regularCustomer, $items);

        $result = $orchestrator->execute('process.order', $order);

        $this->assertFalse($result->hasDiscount());
        $this->assertTrue($result->isPaid());
    }
}
```

Advanced Usage
--------------

[](#advanced-usage)

### Dynamic Workflows

[](#dynamic-workflows)

Build different workflows based on your business rules:

```
#[Orchestrator(channel: 'process.user.registration')]
public function processUserRegistration(User $user): array
{
    $steps = ['validate-email', 'create-account'];

    // Different flow for enterprise users
    if ($user->isEnterprise()) {
        $steps[] = 'setup-organization';
        $steps[] = 'assign-account-manager';
        $steps[] = 'configure-billing';
    } else {
        $steps[] = 'setup-trial';
        $steps[] = 'send-welcome-email';
    }

    // International users need additional steps
    if ($user->isInternational()) {
        $steps[] = 'setup-localization';
        $steps[] = 'configure-timezone';
    }

    $steps[] = 'activate-account';
    $steps[] = 'track-registration-metrics';

    return $steps;
}
```

### Headers and Context

[](#headers-and-context)

Pass metadata between steps without modifying your main payload:

```
#[Handler(channel: 'enrich-customer-data', returnsHeaders: true)]
public function enrichCustomerData(Order $order): array
{
    return [
        'customer_tier' => $order->getCustomer()->getTier(),
        'loyalty_points' => $order->getCustomer()->getLoyaltyPoints(),
        'region' => $order->getShippingAddress()->getCountry(),
    ];
}

#[Handler(channel: 'apply-regional-pricing')]
public function applyRegionalPricing(
    Order $order,
    #[Header('customer_tier')] string $tier,
    #[Header('region')] string $region
): Order {
    // Use header values for business logic
    $discount = $this->calculateRegionalDiscount($tier, $region);
    return $order->applyDiscount($discount);
}
```

### Async Steps

[](#async-steps)

For heavy processing, mark steps as async:

```
#[Handler(channel: 'generate-invoice', async: true)]
public function generateInvoice(Order $order): Order
{
    // This will be queued for background processing
    $this->invoiceGenerator->generate($order);
    return $order;
}
```

Process queued steps with configurable retry:

```
// Process with default 3 retries
$orchestrator->processAsyncStep('generate-invoice');

// Process with custom retry limit
$orchestrator->processAsyncStep('generate-invoice', maxRetries: 5);

// No retries — fail immediately on error
$orchestrator->processAsyncStep('generate-invoice', maxRetries: 0);
```

Failed steps are automatically re-queued until retries are exhausted, then a `WorkflowException` is thrown.

### Step Timeouts

[](#step-timeouts)

Enforce time limits on individual handlers:

```
#[Handler(channel: 'call-external-api', timeout: 30)]
public function callExternalApi(Order $order): Order
{
    // If this takes longer than 30 seconds, a WorkflowException is thrown
    $response = $this->apiClient->submit($order);
    return $order->withResponse($response);
}
```

The timeout is measured in seconds using wall-clock time. A value of `0` (the default) means no time limit.

### Middleware

[](#middleware)

Add cross-cutting concerns that run before every workflow:

```
use WorkflowOrchestrator\Contracts\MiddlewareInterface;
use WorkflowOrchestrator\Message\WorkflowMessage;

class LoggingMiddleware implements MiddlewareInterface
{
    public function handle(WorkflowMessage $message, callable $next): WorkflowMessage
    {
        Log::info('Workflow started', ['id' => $message->getId()]);
        return $next($message);
    }
}

class AuthorizationMiddleware implements MiddlewareInterface
{
    public function handle(WorkflowMessage $message, callable $next): WorkflowMessage
    {
        if (!$message->getHeader('authorized')) {
            throw new \RuntimeException('Unauthorized workflow execution');
        }
        return $next($message);
    }
}

$orchestrator = WorkflowOrchestrator::create()
    ->withMiddleware(new LoggingMiddleware())
    ->withMiddleware(new AuthorizationMiddleware())
    ->register(OrderProcessor::class);
```

Middleware executes in the order added. Each call to `withMiddleware()` returns a new immutable instance.

### Event Listeners

[](#event-listeners)

Track step execution for logging, metrics, or monitoring:

```
use WorkflowOrchestrator\Contracts\EventListenerInterface;
use WorkflowOrchestrator\Message\WorkflowMessage;

class MetricsListener implements EventListenerInterface
{
    public function onStepStarted(string $stepName, WorkflowMessage $message): void
    {
        Metrics::increment("workflow.step.{$stepName}.started");
    }

    public function onStepCompleted(string $stepName, WorkflowMessage $message, float $duration): void
    {
        Metrics::timing("workflow.step.{$stepName}.duration", $duration);
        Metrics::increment("workflow.step.{$stepName}.completed");
    }

    public function onStepFailed(string $stepName, WorkflowMessage $message, \Throwable $error, float $duration): void
    {
        Metrics::increment("workflow.step.{$stepName}.failed");
        Log::error("Step {$stepName} failed after {$duration}s", [
            'error' => $error->getMessage(),
            'workflow_id' => $message->getId(),
        ]);
    }
}

$orchestrator = WorkflowOrchestrator::create()
    ->withEventListener(new MetricsListener())
    ->register(OrderProcessor::class);
```

Events fire for every step execution, including async steps. The `$duration` parameter is measured in seconds with nanosecond precision.

Container Integration
---------------------

[](#container-integration)

Use with your preferred dependency injection container:

```
use Psr\Container\ContainerInterface;

// With PSR-11 container
$orchestrator = new WorkflowOrchestrator($myContainer);

// With custom queue for async processing
$orchestrator = new WorkflowOrchestrator(
    container: $myContainer,
    queue: new RedisQueue($redisConnection)
);
```

Queue Implementations
---------------------

[](#queue-implementations)

The Workflow Orchestrator supports multiple queue implementations for async step processing. Choose the one that best fits your infrastructure needs.

### In-Memory Queue (Default)

[](#in-memory-queue-default)

The default queue implementation stores messages in memory and is suitable for development and testing:

```
use WorkflowOrchestrator\Queue\InMemoryQueue;

$orchestrator = WorkflowOrchestrator::create()
    ->withQueue(new InMemoryQueue())
    ->register(OrderProcessor::class);
```

**Note:** In-memory queues lose data when the process ends and don't support distributed processing.

### SQLite Queue

[](#sqlite-queue)

For persistent storage without external dependencies, use the SQLite queue:

```
use WorkflowOrchestrator\Queue\SqliteQueue;

// Create PDO connection
$pdo = new PDO('sqlite:/path/to/your/database.sqlite');
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

// Create SQLite queue
$queue = new SqliteQueue($pdo);

$orchestrator = WorkflowOrchestrator::create()
    ->withQueue($queue)
    ->register(OrderProcessor::class);
```

**Features:**

- ✅ Persistent storage
- ✅ ACID transactions
- ✅ Automatic table creation
- ✅ No external dependencies
- ✅ Single-file database

**Setup Requirements:**

- PHP PDO extension (included by default)
- Write permissions for SQLite database file

**Custom Table Name:**

```
$queue = new SqliteQueue($pdo, 'my_custom_queue_table');
```

### Redis Queue

[](#redis-queue)

For high-performance, distributed queue processing:

```
use WorkflowOrchestrator\Queue\RedisQueue;

// Create Redis connection
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// Optional: $redis->auth('your-password');
// Optional: $redis->select(1); // Use specific database

// Create Redis queue
$queue = new RedisQueue($redis);

$orchestrator = WorkflowOrchestrator::create()
    ->withQueue($queue)
    ->register(OrderProcessor::class);
```

**Features:**

- ✅ High performance
- ✅ Distributed processing
- ✅ Blocking operations
- ✅ Multiple queue support
- ✅ Automatic cleanup
- ✅ Queue introspection

**Setup Requirements:**

- Redis server
- PHP Redis extension: `composer require ext-redis` or install via system package manager

**Install Redis Extension:**

```
# Ubuntu/Debian
sudo apt-get install php-redis

# macOS (via Homebrew)
brew install php
pecl install redis

# Windows (via PECL)
pecl install redis
```

**Custom Key Prefix:**

```
$queue = new RedisQueue($redis, 'my_app:queue:');
```

**Advanced Redis Usage:**

```
// Blocking pop with timeout
$message = $queue->blockingPop('high-priority', 30); // 30 second timeout

// Check queue size
$size = $queue->size('email-notifications');

// Get all queue names
$queues = $queue->getQueueNames();

// Peek at next message without removing it
$nextMessage = $queue->peek('data-processing');
```

### Queue Selection Guide

[](#queue-selection-guide)

FeatureInMemorySQLiteRedisPersistence❌✅✅Distributed❌❌✅Performance⚡⚡⚡⚡⚡⚡⚡⚡Setup Complexity⚡⚡⚡⚡⚡⚡External Dependencies❌❌✅Blocking Operations❌❌✅**Recommendations:**

- **Development/Testing:** InMemoryQueue
- **Single-server production:** SqliteQueue
- **Multi-server/high-volume:** RedisQueue

### Custom Queue Implementation

[](#custom-queue-implementation)

Implement your own queue by creating a class that implements `QueueInterface`:

```
use WorkflowOrchestrator\Contracts\QueueInterface;
use WorkflowOrchestrator\Message\WorkflowMessage;

class MyCustomQueue implements QueueInterface
{
    public function push(string $queue, WorkflowMessage $message): void
    {
        // Your implementation
    }

    public function pop(string $queue): ?WorkflowMessage
    {
        // Your implementation
    }

    public function size(string $queue): int
    {
        // Your implementation
    }

    public function clear(string $queue): void
    {
        // Your implementation
    }
}
```

Requirements
------------

[](#requirements)

- PHP 8.3+
- PSR-11 container (optional, includes simple container)
- `ext-pdo` for SqliteQueue
- `ext-redis` for RedisQueue (optional)

Changelog
---------

[](#changelog)

See [CHANGELOG.md](CHANGELOG.md) for version history.

License
-------

[](#license)

MIT

Contributing
------------

[](#contributing)

Pull requests welcome! Please ensure:

- Tests pass (`vendor/bin/phpunit`)
- Code follows PSR-12 standards
- New features include tests and documentation

###  Health Score

38

—

LowBetter than 85% of packages

Maintenance82

Actively maintained with recent releases

Popularity0

Limited adoption so far

Community6

Small or concentrated contributor base

Maturity55

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~24 days

Recently: every ~0 days

Total

7

Last Release

97d ago

PHP version history (2 changes)v1.0.0-beta.1PHP ^8.2

v1.0.0PHP ^8.3

### Community

Maintainers

![](https://www.gravatar.com/avatar/27d71933a44feff8905b90da6867281d8bf6b88e4fe2b94f85c4483687f4ad59?d=identicon)[drnasin](/maintainers/drnasin)

---

Top Contributors

[![drnasin](https://avatars.githubusercontent.com/u/3784166?v=4)](https://github.com/drnasin "drnasin (68 commits)")

---

Tags

asyncworkflowmessagingorchestration

###  Code Quality

TestsPHPUnit

### Embed Badge

![Health badge](/badges/drnasin-workflow-orchestrator/health.svg)

```
[![Health](https://phpackages.com/badges/drnasin-workflow-orchestrator/health.svg)](https://phpackages.com/packages/drnasin-workflow-orchestrator)
```

###  Alternatives

[revolt/event-loop

Rock-solid event loop for concurrent PHP applications.

92343.6M138](/packages/revolt-event-loop)[amphp/parallel

Parallel processing component for Amp.

85046.2M74](/packages/amphp-parallel)[react/dns

Async DNS resolver for ReactPHP

536114.1M100](/packages/react-dns)[hollodotme/fast-cgi-client

A PHP fast CGI client to send requests (a)synchronously to PHP-FPM.

56423.9M25](/packages/hollodotme-fast-cgi-client)[enqueue/enqueue

Message Queue Library

19820.0M56](/packages/enqueue-enqueue)[amphp/socket

Non-blocking socket connection / server implementations based on Amp and Revolt.

26539.0M119](/packages/amphp-socket)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
