PHPackages                             duyler/worker-pool - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. duyler/worker-pool

ActiveLibrary[Queues &amp; Workers](/categories/queues)

duyler/worker-pool
==================

Process manager with load balancing for Duyler Framework

00PHPCI passing

Since May 22Pushed 2w agoCompare

[ Source](https://github.com/duyler/worker-pool)[ Packagist](https://packagist.org/packages/duyler/worker-pool)[ RSS](/packages/duyler-worker-pool/feed)WikiDiscussions main Synced 1w ago

READMEChangelogDependenciesVersions (2)Used By (0)

Duyler Worker Pool
==================

[](#duyler-worker-pool)

[![Quality Gate Status](https://camo.githubusercontent.com/df376366d7603f7955917afa92e8ed17ba8adc9f5623f9e512f8e524b3e2ac1d/68747470733a2f2f736f6e6172636c6f75642e696f2f6170692f70726f6a6563745f6261646765732f6d6561737572653f70726f6a6563743d6475796c65725f776f726b65722d706f6f6c266d65747269633d616c6572745f737461747573)](https://sonarcloud.io/summary/new_code?id=duyler_worker-pool)[![Coverage](https://camo.githubusercontent.com/0740398a170b773281b68e5a3dba0c64179350044530026d47920ea813dad1bb/68747470733a2f2f736f6e6172636c6f75642e696f2f6170692f70726f6a6563745f6261646765732f6d6561737572653f70726f6a6563743d6475796c65725f776f726b65722d706f6f6c266d65747269633d636f766572616765)](https://sonarcloud.io/summary/new_code?id=duyler_worker-pool)[![type-coverage](https://camo.githubusercontent.com/ce83c1c9d00cc875abc9db56ee459c2ce4c0e028e90bc001b927bbaaacdfb9cc/68747470733a2f2f73686570686572642e6465762f6769746875622f6475796c65722f776f726b65722d706f6f6c2f636f7665726167652e737667)](https://shepherd.dev/github/duyler/worker-pool)[![psalm-level](https://camo.githubusercontent.com/b23cd9c44d60757c9d2db8a479cf710c4eab5e8a6020aed3374c088b2a39f701/68747470733a2f2f73686570686572642e6465762f6769746875622f6475796c65722f776f726b65722d706f6f6c2f6c6576656c2e737667)](https://shepherd.dev/github/duyler/worker-pool)[![PHP Version](https://camo.githubusercontent.com/2c979308bd18ddc5eacece95b68553e3bb48fcfd41b9b942f71e06a78a8358b6/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f646570656e64656e63792d762f6475796c65722f776f726b65722d706f6f6c2f7068703f76657273696f6e3d6465762d6d61696e)](https://camo.githubusercontent.com/2c979308bd18ddc5eacece95b68553e3bb48fcfd41b9b942f71e06a78a8358b6/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f646570656e64656e63792d762f6475796c65722f776f726b65722d706f6f6c2f7068703f76657273696f6e3d6465762d6d61696e)

Process manager with load balancing for Duyler Framework.

Features
--------

[](#features)

- Multi-process worker pool with fork-based workers
- Two architecture modes: Shared Socket (SO\_REUSEPORT) and Centralized (FD Passing)
- Load balancing: Least Connections, Round Robin
- IPC via Unix domain sockets with JSON-serialized messages
- File descriptor passing (SCM\_RIGHTS) for connection distribution
- Signal handling: SIGTERM, SIGINT, SIGCHLD, SIGUSR1, SIGUSR2
- Auto CPU core detection for optimal worker count
- Event-driven workers with Fiber-based event loop integration
- Callback workers for simple connection handling
- HTTP worker adapter with PSR-7 request parsing
- Process monitoring with uptime, idle time, memory tracking
- Auto-restart on worker failure with configurable delay
- Graceful shutdown

Requirements
------------

[](#requirements)

- PHP 8.5+
- ext-sockets
- ext-pcntl
- ext-posix
- duyler/http-server
- nyholm/psr7
- psr/log (optional, for logging)

Installation
------------

[](#installation)

```
composer require duyler/worker-pool
```

Quick Start
-----------

[](#quick-start)

### Event-Driven Worker Mode

[](#event-driven-worker-mode)

This is the recommended mode for production HTTP servers. Each worker runs its own event loop with a full `Server` instance. The kernel distributes connections across workers via `SO_REUSEPORT`.

```
use Duyler\HttpServer\Config\ServerConfig;
use Duyler\WorkerPool\Config\WorkerPoolConfig;
use Duyler\WorkerPool\Master\MasterFactory;
use Duyler\WorkerPool\Worker\EventDrivenWorkerInterface;

// Implement EventDrivenWorkerInterface (see Worker Types section for full example)
$worker = new MyApp(); // implements EventDrivenWorkerInterface

$serverConfig = new ServerConfig(host: '0.0.0.0', port: 8080);
$poolConfig = WorkerPoolConfig::auto($serverConfig);

$master = MasterFactory::createRecommended(
    config: $poolConfig,
    serverConfig: $serverConfig,
    eventDrivenWorker: $worker,
);

$master->start(); // blocks until shutdown
```

### Callback Worker Mode

[](#callback-worker-mode)

For simpler use cases where you handle raw sockets directly.

```
use Duyler\HttpServer\Config\ServerConfig;
use Duyler\WorkerPool\Config\WorkerPoolConfig;
use Duyler\WorkerPool\Master\MasterFactory;
use Duyler\WorkerPool\Worker\WorkerCallbackInterface;

$callback = new class implements WorkerCallbackInterface
{
    public function handle(mixed $clientSocket, array $metadata): void
    {
        $response = "HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nHello";
        socket_write($clientSocket, $response);
        socket_close($clientSocket);
    }
};

$serverConfig = new ServerConfig(host: '0.0.0.0', port: 8080);
$poolConfig = new WorkerPoolConfig(serverConfig: $serverConfig, workerCount: 4);

$master = MasterFactory::createRecommended(
    config: $poolConfig,
    serverConfig: $serverConfig,
    workerCallback: $callback,
);

$master->start();
```

Configuration
-------------

[](#configuration)

### WorkerPoolConfig

[](#workerpoolconfig)

```
use Duyler\HttpServer\Config\ServerConfig;
use Duyler\WorkerPool\Config\BalancerType;
use Duyler\WorkerPool\Config\WorkerPoolConfig;

$config = new WorkerPoolConfig(
    serverConfig: new ServerConfig(),  // Required. HTTP server configuration.
    workerCount: 0,                    // Number of workers. 0 = auto-detect CPU cores.
    balancer: BalancerType::LeastConnections,  // Balancer algorithm (for CentralizedMaster).
    backlog: 128,                      // Socket listen backlog.
    maxQueueSize: 1000,                // Max connections in centralized queue.
    maxIpcMessageSize: 1048576,        // Max IPC message size in bytes (min 1024).
    enableStickySession: false,        // Sticky session support (planned).
    enableGracefulReload: false,       // Graceful reload on SIGUSR1 (planned).
    autoRestart: true,                 // Auto-restart workers on failure.
    restartDelay: 1,                   // Seconds to wait before respawning a dead worker.
    fallbackCpuCores: 4,              // Fallback CPU core count if detection fails.
    pollInterval: 1000,               // Main loop poll interval in microseconds (min 100).
);
```

#### Factory Method

[](#factory-method)

`WorkerPoolConfig::auto()` creates a config with automatic CPU core detection:

```
$config = WorkerPoolConfig::auto(
    serverConfig: new ServerConfig(),
    balancer: BalancerType::LeastConnections,
);
```

#### Validation Rules

[](#validation-rules)

- `workerCount`: 1 to 1024 (or 0 for auto-detection)
- `backlog`: must be positive
- `maxQueueSize`: must be positive
- `maxIpcMessageSize`: at least 1024 bytes
- `restartDelay`: non-negative
- `fallbackCpuCores`: positive
- `pollInterval`: at least 100 microseconds

Load Balancing
--------------

[](#load-balancing)

Load balancing is used in `CentralizedMaster` mode. The `BalancerType` enum defines the available algorithms:

- `BalancerType::LeastConnections` -- picks the worker with the fewest active connections
- `BalancerType::RoundRobin` -- cycles through workers in order
- `BalancerType::Weighted` -- planned for future implementation

### Least Connections

[](#least-connections)

Best when request processing times vary. Workers with fewer active connections receive new ones first.

```
use Duyler\WorkerPool\Balancer\LeastConnectionsBalancer;

$balancer = new LeastConnectionsBalancer();
```

### Round Robin

[](#round-robin)

Best when request processing times are roughly equal. Distributes connections sequentially.

```
use Duyler\WorkerPool\Balancer\RoundRobinBalancer;

$balancer = new RoundRobinBalancer();
```

### When to Use Which

[](#when-to-use-which)

ScenarioRecommendationUniform request timesRound RobinVaried request timesLeast ConnectionsNeed simple fairnessRound RobinNeed adaptive distributionLeast ConnectionsArchitecture
------------

[](#architecture)

### Shared Socket (SO\_REUSEPORT)

[](#shared-socket-so_reuseport)

Each worker binds its own socket to the same port using `SO_REUSEPORT`. The kernel distributes incoming connections across workers. No IPC overhead for connection routing.

```
                        Client requests
                              |
                              v
                    +---------+---------+
                    |   Kernel balances  |
                    +---------+---------+
                              |
           +--------+--------+--------+--------+
           |        |        |        |        |
           v        v        v        v        v
        +-----+  +-----+  +-----+  +-----+  +-----+
        | W1  |  | W2  |  | W3  |  | W4  |  | WN  |
        |:8080|  |:8080|  |:8080|  |:8080|  |:8080|
        +-----+  +-----+  +-----+  +-----+  +-----+

        Master process monitors workers (health, respawn)

```

Requirements: `SO_REUSEPORT` support (Linux, Docker, macOS via Docker).

Use `SharedSocketMaster` when you want simple architecture, kernel-level load balancing is sufficient, or you need maximum compatibility.

### Centralized (FD Passing)

[](#centralized-fd-passing)

The master process accepts all connections and distributes them to workers via IPC using file descriptor passing (`SCM_RIGHTS`). This enables custom load balancing and sticky sessions.

```
                        Client requests
                              |
                              v
                    +---------+---------+
                    |   Master accepts   |
                    |   all connections  |
                    +---------+---------+
                              |
                    +---------+---------+
                    |  Connection Queue  |
                    +---------+---------+
                              |
                    +---------+---------+
                    | Load Balancer picks|
                    | a worker           |
                    +---------+---------+
                              |
              +-------+------+------+-------+
              |       |      |      |       |
              v       v      v      v       v
           +----+  +----+  +----+  +----+  +----+
           | W1 |  | W2 |  | W3 |  | W4 |  | WN |
           +----+  +----+  +----+  +----+  +----+
              ^       ^      ^      ^       ^
              |       |      |      |       |
              +-- FD passed via Unix socket -+

```

Requirements: Linux only (`SCM_RIGHTS`, `socket_sendmsg`, `socket_recvmsg`).

Use `CentralizedMaster` when you need custom load balancing, sticky sessions, or centralized connection queue.

### Choosing with MasterFactory

[](#choosing-with-masterfactory)

`MasterFactory` selects the best architecture automatically based on platform capabilities. It also handles creating the required DI wrappers (`SocketWrapper`, `ForkWrapper`, `SocketMsgWrapper`) so you do not have to wire them manually.

```
use Duyler\WorkerPool\Balancer\LeastConnectionsBalancer;
use Duyler\WorkerPool\Master\MasterFactory;

// Automatically picks CentralizedMaster on Linux (FD passing available),
// SharedSocketMaster on other platforms.
$master = MasterFactory::createRecommended(
    config: $poolConfig,
    serverConfig: $serverConfig,
    eventDrivenWorker: new MyApp(),
);

// Or create with explicit balancer (CentralizedMaster when FD passing supported)
$master = MasterFactory::create(
    config: $poolConfig,
    serverConfig: $serverConfig,
    eventDrivenWorker: new MyApp(),
    balancer: new LeastConnectionsBalancer(),
);

// Check what the factory would pick
echo MasterFactory::recommendedMaster();
// "CentralizedMaster - Centralized queue with custom load balancing" (Linux)
// "SharedSocketMaster - Distributed architecture with kernel load balancing" (other)

// Get comparison of both modes
$comparison = MasterFactory::getComparison();
```

Worker Types
------------

[](#worker-types)

### EventDrivenWorkerInterface

[](#eventdrivenworkerinterface)

The primary worker type for applications with their own event loop. The `run()` method is called once on worker startup and never returns. The master passes connections to the `Server` instance, and the application polls `hasRequest()` in its loop.

```
use Duyler\HttpServer\ServerInterface;
use Duyler\WorkerPool\Worker\EventDrivenWorkerInterface;

class MyApp implements EventDrivenWorkerInterface
{
    public function run(int $workerId, ServerInterface $server): void
    {
        // Initialize once (database, event bus, etc.)
        $db = new Database();

        // Application event loop
        while (true) {
            if ($server->hasRequest()) {
                $requestData = $server->getRequest();
                if ($requestData !== null) {
                    $response = $this->handle($requestData->request, $db);
                    $server->respond($requestData->respond($response));
                }
            }
            usleep(1000);
        }
    }
}
```

The `ServerInterface` instance passed to `run()` provides:

- `hasRequest(): bool` -- check for pending requests
- `getRequest(): ?RequestData` -- get next request
- `respond(ResponseData $responseData): void` -- send response
- `hasPendingResponse(): bool` -- check for pending responses
- `enableNotification(): void` -- enable notification socket pair
- `getSocketResource(): mixed` -- get socket for EvIo integration
- `setEventLoopActive(bool $active): void` -- set event loop active flag

### WorkerCallbackInterface

[](#workercallbackinterface)

A simpler worker type for direct socket handling. The `handle()` method is called for each incoming connection with the raw client socket.

```
use Duyler\WorkerPool\Worker\WorkerCallbackInterface;

class RawHandler implements WorkerCallbackInterface
{
    public function handle(mixed $clientSocket, array $metadata): void
    {
        // $clientSocket is Socket|resource
        // $metadata contains 'worker_id', 'client_ip'

        socket_write($clientSocket, "HTTP/1.1 200 OK\r\n\r\nOK");
        socket_close($clientSocket);
    }
}
```

### HttpWorkerAdapter

[](#httpworkeradapter)

A ready-made adapter that handles HTTP parsing and PSR-7 conversion for callback-style workers. Useful as a starting point or for simple HTTP endpoints. Requires a `SocketWrapperInterface` instance for socket operations.

```
use Duyler\WorkerPool\Socket\SocketWrapper;
use Duyler\WorkerPool\Worker\HttpWorkerAdapter;

$adapter = new HttpWorkerAdapter(new SocketWrapper());
$adapter->handleConnection($clientSocket, ['worker_id' => $workerId]);
// Returns "Hello from Worker Pool!" as a plain text 200 response.
// Override processRequest() for custom logic.
```

IPC System
----------

[](#ipc-system)

### UnixSocketChannel

[](#unixsocketchannel)

Point-to-point IPC channel over Unix domain sockets with length-prefixed JSON messages. Requires a `SocketWrapperInterface` instance for socket operations.

```
use Duyler\WorkerPool\IPC\UnixSocketChannel;
use Duyler\WorkerPool\IPC\Message;
use Duyler\WorkerPool\IPC\MessageType;
use Duyler\WorkerPool\Socket\SocketWrapper;

$socketWrapper = new SocketWrapper();

// Server side
$channel = new UnixSocketChannel('/tmp/worker.sock', socketWrapper: $socketWrapper, isServer: true);
$channel->connect();
$channel->send(Message::workerReady(workerId: 1));

// Client side
$channel = new UnixSocketChannel('/tmp/worker.sock', socketWrapper: $socketWrapper, isServer: false);
$channel->connect();
$msg = $channel->receive();
// $msg->type === MessageType::WorkerReady
// $msg->data === ['worker_id' => 1]
```

### Message

[](#message)

Structured IPC message with type, data, and timestamp.

```
use Duyler\WorkerPool\IPC\Message;
use Duyler\WorkerPool\IPC\MessageType;

// Create messages
$msg = new Message(type: MessageType::Shutdown, data: ['reason' => 'restart']);

// Factory methods
Message::workerReady(workerId: 1);
Message::connectionClosed(connectionId: 42);
Message::workerMetrics(metrics: ['memory' => 64000]);
Message::shutdown();
Message::reload();

// Serialization
$serialized = $msg->serialize(); // JSON string
$restored = Message::unserialize($serialized);
```

### MessageType

[](#messagetype)

```
enum MessageType: string
{
    case ConnectionClosed = 'connection_closed';
    case WorkerReady = 'worker_ready';
    case WorkerMetrics = 'worker_metrics';
    case Shutdown = 'shutdown';
    case Reload = 'reload';
}
```

### FdPasser

[](#fdpasser)

Passes file descriptors between processes via `SCM_RIGHTS`. Used internally by `CentralizedMaster` and `ConnectionRouter`. Requires Linux. Takes `SocketWrapperInterface` and `SocketMsgWrapperInterface` via constructor injection.

```
use Duyler\WorkerPool\IPC\FdPasser;
use Duyler\WorkerPool\Socket\SocketMsgWrapper;
use Duyler\WorkerPool\Socket\SocketWrapper;

$fdPasser = new FdPasser(new SocketWrapper(), new SocketMsgWrapper());

// Check platform support
$supported = $fdPasser->isSupported(); // true on Linux with socket_sendmsg

// Send a file descriptor
$fdPasser->sendFd(
    controlSocket: $masterToWorkerSocket,
    fdToSend: $clientSocket,
    metadata: ['client_ip' => '192.168.1.1', 'worker_id' => 2],
);

// Receive a file descriptor
$result = $fdPasser->receiveFd($workerSocket);
// $result === ['fd' => Socket, 'metadata' => ['client_ip' => '...', 'worker_id' => 2]]
// or null if no FD available
```

Signal Handling
---------------

[](#signal-handling)

The worker pool handles POSIX signals for graceful lifecycle management.

### Master Process Signals

[](#master-process-signals)

SignalBehaviorSIGTERMTriggers graceful shutdown. Sends SIGTERM to all workers, waits for them to exit.SIGINTSame as SIGTERM. Triggered by Ctrl+C.SIGUSR1Reload trigger (via `SignalManager`). Available for graceful reload.SIGUSR2Available for custom handlers.### Worker Process Signals

[](#worker-process-signals)

Workers inherit signal handling from the master. In event-driven mode, the application is responsible for handling signals within its event loop.

### SignalHandler

[](#signalhandler)

Low-level signal registration and dispatch:

```
use Duyler\WorkerPool\Signal\SignalHandler;

$handler = new SignalHandler();
$handler->register(SIGTERM, function (): void {
    // Handle shutdown
});
$handler->dispatch(); // call pcntl_signal_dispatch()
$handler->unregister(SIGTERM);
```

### SignalManager

[](#signalmanager)

Higher-level manager with shutdown/reload state tracking:

```
use Duyler\WorkerPool\Signal\SignalHandler;
use Duyler\WorkerPool\Signal\SignalManager;

$manager = new SignalManager(new SignalHandler());

$manager->setupMasterSignals(
    onShutdown: function (int $signal): void { /* cleanup */ },
    onReload: function (int $signal): void { /* reload config */ },
);

// Check flags
$manager->isShutdownRequested(); // bool
$manager->isReloadRequested();   // bool
```

API Reference
-------------

[](#api-reference)

### MasterInterface

[](#masterinterface)

```
interface MasterInterface
{
    public function start(): void;
    public function stop(): void;
    public function isRunning(): bool;
    /** @return array */
    public function getMetrics(): array;
}
```

### SharedSocketMaster

[](#sharedsocketmaster)

```
final class SharedSocketMaster extends AbstractMaster
{
    public function __construct(
        WorkerPoolConfig $config,
        ServerConfig $serverConfig,
        SocketWrapperInterface $socketWrapper,
        ForkWrapperInterface $forkWrapper,
        ?WorkerCallbackInterface $workerCallback = null,
        ?EventDrivenWorkerInterface $eventDrivenWorker = null,
        ?LoggerInterface $logger = null,
    );
    public function start(): void;
    public function stop(): void;
    public function isRunning(): bool;
    /** @return array */
    public function getMetrics(): array;
}
```

Note: Use `MasterFactory::createRecommended()` to avoid constructing DI wrappers manually.

Metrics returned by `getMetrics()`:

```
[
    'architecture' => 'shared_socket',
    'total_workers' => 4,
    'active_workers' => 4,
    'total_connections' => 128,
    'is_running' => true,
]
```

### CentralizedMaster

[](#centralizedmaster)

```
final class CentralizedMaster extends AbstractMaster
{
    public function __construct(
        WorkerPoolConfig $config,
        BalancerInterface $balancer,
        SocketWrapperInterface $socketWrapper,
        SocketMsgWrapperInterface $socketMsgWrapper,
        ForkWrapperInterface $forkWrapper,
        ?ServerConfig $serverConfig = null,
        ?WorkerCallbackInterface $workerCallback = null,
        ?EventDrivenWorkerInterface $eventDrivenWorker = null,
        ?LoggerInterface $logger = null,
    );
    public function start(): void;
    public function stop(): void;
    public function isRunning(): bool;
    /** @return array */
    public function getMetrics(): array;
    public function getBalancer(): BalancerInterface;
}
```

Note: Use `MasterFactory::create()` to avoid constructing DI wrappers manually.

Metrics returned by `getMetrics()`:

```
[
    'total_workers' => 4,
    'alive_workers' => 4,
    'total_connections' => 128,
    'total_requests' => 1024,
    'queue_size' => 3,
    'is_running' => true,
]
```

### MasterFactory

[](#masterfactory)

```
final class MasterFactory
{
    public static function create(
        WorkerPoolConfig $config,
        ServerConfig $serverConfig,
        ?WorkerCallbackInterface $workerCallback = null,
        ?EventDrivenWorkerInterface $eventDrivenWorker = null,
        ?BalancerInterface $balancer = null,
        ?LoggerInterface $logger = null,
        ?SocketWrapperInterface $socketWrapper = null,
        ?SocketMsgWrapperInterface $socketMsgWrapper = null,
        ?ForkWrapperInterface $forkWrapper = null,
    ): MasterInterface;

    public static function createRecommended(
        WorkerPoolConfig $config,
        ServerConfig $serverConfig,
        ?WorkerCallbackInterface $workerCallback = null,
        ?EventDrivenWorkerInterface $eventDrivenWorker = null,
        ?LoggerInterface $logger = null,
        ?SocketWrapperInterface $socketWrapper = null,
        ?SocketMsgWrapperInterface $socketMsgWrapper = null,
        ?ForkWrapperInterface $forkWrapper = null,
    ): MasterInterface;

    public static function recommendedMaster(): string;
    /** @return array */
    public static function getComparison(): array;
}
```

The `socketWrapper`, `socketMsgWrapper`, and `forkWrapper` parameters default to their concrete implementations (`SocketWrapper`, `SocketMsgWrapper`, `ForkWrapper`). Pass custom implementations for testing or when you need to override low-level behavior.

### BalancerInterface

[](#balancerinterface)

```
interface BalancerInterface
{
    /** @param array $connections worker_id => active_connections */
    public function selectWorker(array $connections): ?int;
    public function onConnectionEstablished(int $workerId): void;
    public function onConnectionClosed(int $workerId): void;
    public function onWorkerRemoved(int $workerId): void;
    public function reset(): void;
}
```

### ProcessInfo

[](#processinfo)

Immutable value object representing a worker process:

```
final readonly class ProcessInfo
{
    public float $startedAt;
    public float $lastActivityAt;

    public function __construct(
        public int $workerId,
        public int $pid,
        public ProcessState $state,
        ForkWrapperInterface $forkWrapper,
        public int $connections = 0,
        public int $totalRequests = 0,
        ?float $startedAt = null,
        ?float $lastActivityAt = null,
        public int $memoryUsage = 0,
    );

    public function withState(ProcessState $state): self;
    public function withConnections(int $connections): self;
    public function withIncrementedRequests(): self;
    public function withMemoryUsage(int $memoryUsage): self;
    public function getUptime(): float;      // seconds since start
    public function getIdleTime(): float;     // seconds since last activity
    public function isAlive(): bool;          // checks via forkWrapper.kill(pid, 0)
    /** @return array */
    public function toArray(): array;
}
```

### ProcessState

[](#processstate)

```
enum ProcessState: string
{
    case Starting = 'starting';
    case Ready = 'ready';
    case Busy = 'busy';
    case Stopping = 'stopping';
    case Stopped = 'stopped';
    case Failed = 'failed';
}
```

### BalancerType

[](#balancertype)

```
enum BalancerType: string
{
    case LeastConnections = 'least_connections';
    case RoundRobin = 'round_robin';
    case Weighted = 'weighted';
}
```

### DI Wrappers

[](#di-wrappers)

The refactored codebase uses constructor injection for all low-level system calls. These wrappers enable unit testing without real sockets, processes, or signals.

#### SocketWrapperInterface

[](#socketwrapperinterface)

```
interface SocketWrapperInterface
{
    public function create(int $domain, int $type, int $protocol): Socket|false;
    public function bind(Socket $socket, string $address, int $port = 0): bool;
    public function listen(Socket $socket, int $backlog = 0): bool;
    public function accept(Socket $socket): Socket|false;
    public function read(Socket $socket, int $length, int $type = PHP_BINARY_READ): string|false;
    public function write(Socket $socket, string $data, ?int $length = null): int|false;
    public function close(Socket $socket): void;
    public function setNonBlock(Socket $socket): void;
    public function setOption(Socket $socket, int $level, int $name, int|array $value): bool;
    public function getPeerName(Socket $socket, string &$address, ?int &$port = null): bool;
    public function lastError(?Socket $socket = null): int;
    public function strerror(int $errorCode): string;
    public function select(?array &$read, ?array &$write, ?array &$except, int $timeout, int $usec = 0): int|false;
    public function createPair(int $domain, int $type, int $protocol, array &$pair): bool;
    public function connect(Socket $socket, string $address, ?int $port = null): bool;
}
```

#### SocketMsgWrapperInterface

[](#socketmsgwrapperinterface)

```
interface SocketMsgWrapperInterface
{
    public function sendmsg(Socket $socket, array $message, int $flags = 0): int|false;
    public function recvmsg(Socket $socket, array &$message, int $flags = 0): int|false;
    public function cmsgSpace(int $level, int $type, int $n = 0): ?int;
}
```

#### ForkWrapperInterface

[](#forkwrapperinterface)

```
interface ForkWrapperInterface
{
    public function fork(): int;
    public function waitpid(int $pid, int &$status, int $options = 0): int;
    public function kill(int $pid, int $signal): bool;
}
```

### SystemInfo

[](#systeminfo)

```
use Duyler\WorkerPool\Util\SystemInfo;

final class SystemInfo
{
    public function getCpuCores(int $fallback = 4): int;
    /** @return array */
    public function getOsInfo(): array;
    public function isContainerEnvironment(): bool;
    public function supportsFdPassing(): bool;    // Linux + SCM_RIGHTS
    public function supportsReusePort(): bool;     // SO_REUSEPORT defined
    public static function resetCache(): void;
}
```

### Exceptions

[](#exceptions)

```
// Base class for all worker-pool exceptions
abstract class WorkerPoolExceptionBase extends Exception
{
    public function getErrorCode(): string;   // e.g. 'WORKER_POOL_ERROR'
    public function getContext(): array;
}

// General pool errors (fork failure, socket creation, etc.)
final class WorkerPoolException extends WorkerPoolExceptionBase
{
    protected string $errorCode = 'WORKER_POOL_ERROR';
}

// IPC-related errors (socket_sendmsg unavailable, bad message format)
final class IPCException extends WorkerPoolExceptionBase
{
    protected string $errorCode = 'IPC_ERROR';
}
```

Integration with HttpServer
---------------------------

[](#integration-with-httpserver)

The worker-pool package depends on `duyler/http-server` and integrates tightly with `Duyler\HttpServer\Server`.

### How It Works

[](#how-it-works)

1. Master creates a `Server` instance in each worker process
2. Master sets the worker ID via `$server->setWorkerId($workerId)`
3. Master passes an external socket via `$server->setExternalSocketResource($socket)`
4. Master enables notifications via `$server->enableNotification()`
5. In CentralizedMaster, master registers a Fiber that receives FDs and calls `$server->addExternalConnection()`
6. The application's `EventDrivenWorkerInterface::run()` polls `$server->hasRequest()`
7. Responses go back through `$server->respond()`

### ServerConfig for Worker Pool

[](#serverconfig-for-worker-pool)

Pass the same `ServerConfig` to both `WorkerPoolConfig` and the master:

```
use Duyler\HttpServer\Config\ServerConfig;
use Duyler\WorkerPool\Config\WorkerPoolConfig;

$serverConfig = new ServerConfig(
    host: '0.0.0.0',
    port: 8080,
    maxConnections: 1000,
    requestTimeout: 30,
    connectionTimeout: 60,
);

$poolConfig = new WorkerPoolConfig(
    serverConfig: $serverConfig,
    workerCount: 0, // auto-detect
);
```

### Reactive Event Loop (EvIo)

[](#reactive-event-loop-evio)

For zero-overhead wakeup, use the notification socket with the `ev` extension:

```
use Ev;
use EvIo;
use EvSignal;
use Duyler\HttpServer\ServerInterface;
use Duyler\WorkerPool\Worker\EventDrivenWorkerInterface;

final class ReactiveApp implements EventDrivenWorkerInterface
{
    public function run(int $workerId, ServerInterface $server): void
    {
        $server->enableNotification();
        $notifySocket = $server->getSocketResource();

        $io = new EvIo($notifySocket, Ev::READ, function () use ($server): void {
            // Clear notification buffer
            $socket = $server->getSocketResource();
            if ($socket instanceof \Socket) {
                $er = error_reporting(0);
                socket_read($socket, 4096);
                error_reporting($er);
            }

            $server->setEventLoopActive(true);
            try {
                while ($server->hasRequest()) {
                    $requestData = $server->getRequest();
                    if ($requestData === null) break;
                    // ... handle request, call respond()
                }
            } finally {
                $server->setEventLoopActive(false);
            }
        });

        $sigTerm = new EvSignal(SIGTERM, function () use ($server): void {
            $server->stop();
            Ev::stop(Ev::BREAK_ALL);
        });

        Ev::run();
    }
}
```

Note: In `CentralizedMaster` mode, the Unix socket pair detects IPC activity (FD passing) but not HTTP data on passed client sockets. An `EvTimer` fallback in the event bus configuration is recommended for that mode.

Testing
-------

[](#testing)

```
make tests      # Run all tests
make coverage   # Run tests with coverage
make psalm      # Run Psalm static analysis
make cs-fix     # Run PHP-CS-Fixer
make rector     # Run Rector refactoring
```

### Test Groups

[](#test-groups)

Tests are organized into five groups by purpose:

GroupDirectoryDescriptionUnit`tests/Unit/`Isolated tests with mocked DI wrappersIntegration`tests/Integration/`Cross-component tests with real socketsFunctional`tests/worker-pool/`End-to-end worker pool scenariosPerformance`tests/Performance/`Baseline benchmarks for throughput, latency, memorySecurity`tests/Security/`Malformed requests, slowloris, oversized messages, IPC validationRun a specific test group:

```
docker-compose run --rm php vendor/bin/phpunit tests/Performance/
docker-compose run --rm php vendor/bin/phpunit tests/Security/
docker-compose run --rm php vendor/bin/phpunit tests/Unit/
docker-compose run --rm php vendor/bin/phpunit tests/Integration/
```

License
-------

[](#license)

MIT License. See [LICENSE.md](LICENSE.md) for details.

###  Health Score

21

—

LowBetter than 18% of packages

Maintenance63

Regular maintenance activity

Popularity0

Limited adoption so far

Community6

Small or concentrated contributor base

Maturity13

Early-stage or recently created project

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

### Community

Maintainers

![](https://www.gravatar.com/avatar/69f18edde71f0f80540eda4e097854eddf8eb3390f38ff2ad241b9daaf622281?d=identicon)[milinsky](/maintainers/milinsky)

---

Top Contributors

[![milinsky](https://avatars.githubusercontent.com/u/17288321?v=4)](https://github.com/milinsky "milinsky (32 commits)")

### Embed Badge

![Health badge](/badges/duyler-worker-pool/health.svg)

```
[![Health](https://phpackages.com/badges/duyler-worker-pool/health.svg)](https://phpackages.com/packages/duyler-worker-pool)
```

###  Alternatives

[league/geotools

Geo-related tools PHP 7.3+ library

1.4k5.5M29](/packages/league-geotools)[illuminate/bus

The Illuminate Bus package.

6145.5M491](/packages/illuminate-bus)[uecode/qpush-bundle

Asynchronous processing for Symfony using Push Queues

1672.5M2](/packages/uecode-qpush-bundle)[prooph/event-store-symfony-bundle

109256.9k10](/packages/prooph-event-store-symfony-bundle)[ezsystems/ezscriptmonitor-ls

eZ Publish extension that aims to avoid timeout problems and database corruption by moving long running processes from the GUI to the background.

13208.2k](/packages/ezsystems-ezscriptmonitor-ls)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
