PHPackages                             queuety/queuety - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. queuety/queuety

ActiveWordpress-plugin

queuety/queuety
===============

A job queue and durable workflow engine for WordPress

v0.12.0(1mo ago)00GPL-2.0-or-laterPHPPHP &gt;=8.2CI passing

Since Mar 29Pushed 1mo agoCompare

[ Source](https://github.com/inline0/queuety)[ Packagist](https://packagist.org/packages/queuety/queuety)[ RSS](/packages/queuety-queuety/feed)WikiDiscussions main Synced 1mo ago

READMEChangelog (1)Dependencies (4)Versions (2)Used By (0)

    ![Queuety](./.github/logo-light.svg)

 The WordPress workflow engine

 [![CI](https://github.com/inline0/queuety/actions/workflows/ci.yml/badge.svg)](https://github.com/inline0/queuety/actions/workflows/ci.yml) [![license](https://camo.githubusercontent.com/fc7ef0bd4e7daa962ca0072e865f9970ee5441edb26f5fca76d6e7f0d3805a70/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f6c6963656e73652d47504c2d2d322e302d626c75652e737667)](https://github.com/inline0/queuety/blob/main/LICENSE)

---

What is Queuety?
----------------

[](#what-is-queuety)

Queuety is a WordPress plugin that provides a fast job queue and durable workflow engine. Workers claim jobs directly from MySQL via PDO and process them inside a long-running WP-CLI process.

**The problem:** WordPress has no real background job system. `wp_cron` only fires on page visits. Action Scheduler boots the entire WordPress stack for every batch. An LLM API call that takes 60 seconds gets killed by PHP's 30-second timeout. There's no way to run multi-step processes that survive crashes and resume where they left off.

**Queuety solves this** with two primitives:

1. **Jobs** for fire-and-forget background work
2. **Workflows** for durable multi-step processes with persistent state

Quick Start
-----------

[](#quick-start)

**Prerequisites:**

- PHP 8.2+
- WordPress 6.4+
- MySQL 5.7+ or MariaDB 10.3+
- `pdo_mysql` enabled for the PHP runtime that loads WordPress and WP-CLI

For Composer-managed WordPress installs (for example Bedrock):

```
composer require queuety/queuety
wp plugin activate queuety
```

For a packaged plugin zip:

```
wp plugin install /path/to/queuety.zip --activate
```

For local development from this repository:

```
composer install
wp plugin activate queuety
```

If `pdo_mysql` is missing, the plugin now stays loaded but inert and shows an admin notice instead of fataling during activation or bootstrap.

Out of the box, the plugin also schedules a one-shot worker through WordPress cron every minute, so basic queue processing works without shell access. Dedicated `wp queuety work` processes are still the recommended higher-throughput production mode.

Dispatch a job using the modern dispatch API:

```
use Queuety\Contracts\Job;
use Queuety\Dispatchable;

readonly class SendEmailJob implements Job {
    use Dispatchable;

    public function __construct(
        public string $to,
        public string $subject,
        public string $body,
    ) {}

    public function handle(): void {
        wp_mail( $this->to, $this->subject, $this->body );
    }
}

SendEmailJob::dispatch( 'user@example.com', 'Welcome', 'Hello from Queuety!' );
```

Or use the classic handler name API:

```
use Queuety\Queuety;

Queuety::dispatch( 'send_email', [ 'to' => 'user@example.com' ] );
```

Run a durable workflow with timers and signals:

```
$workflow_id = Queuety::workflow( 'approval_flow' )
    ->then( SubmitRequestHandler::class )
    ->sleep( hours: 24 )
    ->await_approval()
    ->then( ProcessApprovalHandler::class )
    ->on_cancel( CleanupHandler::class )
    ->dispatch( [ 'request_id' => 99 ] );

// Later, from another process:
Queuety::signal( $workflow_id, 'approval', [ 'approved_by' => 'admin@example.com' ] );
```

Build runtime-discovered branch work with compensation:

```
use Queuety\Enums\JoinMode;

Queuety::workflow( 'agent_run' )
    ->then( PlanTasksStep::class )
    ->fan_out(
        items_key: 'tasks',
        handler_class: ExecuteTaskStep::class,
        result_key: 'task_results',
        join_mode: JoinMode::Quorum,
        quorum: 2,
        reducer_class: SummarizeTaskResults::class,
    )
    ->compensate_with( ReleaseTaskLocks::class )
    ->compensate_on_failure()
    ->then( FinalizeRunStep::class )
    ->dispatch( [ 'run_id' => 99 ] );
```

Coordinate one workflow with another:

```
use Queuety\Enums\WaitMode;

$research_id = Queuety::workflow( 'research_run' )
    ->then( PlanResearchStep::class )
    ->fan_out( 'tasks', ExecuteResearchTask::class, 'results' )
    ->dispatch( [ 'brief_id' => 42 ] );

Queuety::workflow( 'editorial_run' )
    ->await_workflow( $research_id, 'research' )
    ->then( DraftBriefStep::class )
    ->dispatch();
```

Hand work off to independent child workflows and join later:

```
use Queuety\Enums\WaitMode;

$agent_task = Queuety::workflow( 'agent_task' )
    ->then( ResearchTopicStep::class )
    ->then( SummarizeTopicStep::class );

Queuety::workflow( 'brief_research' )
    ->then( PlanTopicsStep::class )
    ->spawn_workflows( 'topics', $agent_task, 'child_workflow_ids' )
    ->await_workflows( 'child_workflow_ids', WaitMode::All, 'child_results' )
    ->then( SynthesizeBriefStep::class )
    ->dispatch( [ 'brief_id' => 42 ] );
```

For a fuller planner/executor walkthrough with human review and cross-workflow waits, see the `Agent Orchestration` docs page.

Add workflow guardrails for agent runs:

```
Queuety::workflow( 'agent_research' )
    ->version( 'research.v2' )
    ->idempotency_key( 'brief:42:research' )
    ->max_transitions( 20 )
    ->max_fan_out_items( 12 )
    ->max_state_bytes( 32768 )
    ->then( PlanResearchStep::class )
    ->fan_out( 'tasks', ExecuteResearchTask::class, 'results' )
    ->dispatch( [ 'brief_id' => 42 ] );
```

Dispatch a batch with callbacks:

```
$batch = Queuety::create_batch( [
    new ImportUsersJob( $chunk_1 ),
    new ImportUsersJob( $chunk_2 ),
    new ImportUsersJob( $chunk_3 ),
] )
    ->name( 'Import users' )
    ->then( ImportCompleteHandler::class )
    ->catch( ImportFailedHandler::class )
    ->finally( ImportCleanupHandler::class )
    ->allow_failures()
    ->on_queue( 'imports' )
    ->dispatch();
```

Add middleware to a job:

```
use Queuety\Middleware\RateLimited;
use Queuety\Middleware\ThrottlesExceptions;

readonly class CallExternalApiJob implements Job {
    use Dispatchable;

    public function __construct( public int $record_id ) {}

    public function handle(): void {
        // Call external API...
    }

    public function middleware(): array {
        return [
            new RateLimited( max: 60, window: 60 ),
            new ThrottlesExceptions( max_attempts: 3, decay_minutes: 5 ),
        ];
    }
}
```

Start a worker:

```
wp queuety work
```

Features
--------

[](#features)

- **Fast execution** -- workers use direct MySQL queue access and avoid per-job cron/bootstrap overhead
- **Durable workflows** -- multi-step processes with persistent state that survive PHP timeouts, crashes, and retries
- **Dispatchable jobs** -- self-contained readonly job classes with the `Dispatchable` trait and `Contracts\Job` interface
- **Middleware pipeline** -- onion-style middleware for rate limiting, throttling, uniqueness, and custom logic
- **Batching** -- dispatch groups of jobs with `then`, `catch`, and `finally` callbacks plus progress tracking
- **Job chaining** -- sequential job execution where each job depends on the previous one completing
- **Durable timers** -- `sleep()` steps that survive process restarts and resume at the right time
- **Signals and human gates** -- `wait_for_signal()`, `wait_for_signals()`, `await_approval()`, and `await_input()` pause workflows until external input arrives
- **Workflow dependencies** -- `await_workflow()` and `await_workflows()` coordinate top-level workflows without forcing them into one workflow definition
- **Async workflow handoffs** -- `spawn_workflows()` turns runtime-discovered items into independent top-level workflows that can be awaited later, including named spawned groups
- **Dynamic fan-out** -- `fan_out()` expands runtime-discovered work with `All`, `FirstSuccess`, and `Quorum` join modes
- **Durable artifacts** -- store named workflow outputs outside the main state bag and inspect them later through status, CLI, export, and replay
- **Workflow guardrails** -- `version()`, a deterministic definition hash, `idempotency_key()`, `max_transitions()`, `max_fan_out_items()`, and `max_state_bytes()` make long-running agent workflows easier to inspect and safer to operate
- **Step compensation** -- `compensate_with()` and `compensate_on_failure()` provide saga-style rollback hooks for completed steps
- **Streaming steps** -- `StreamingStep` interface with `ChunkStore` for persisting streamed data chunk by chunk
- **Cache layer** -- pluggable cache with `MemoryCache` and `ApcuCache` backends, auto-detected via `CacheFactory`
- **Heartbeats** -- long-running steps send heartbeats to prevent premature stale-job recovery
- **Workflow cancellation** -- cancel running workflows and trigger registered cleanup handlers
- **Workflow event log** -- full timeline of step transitions with state snapshots and time-travel debugging
- **State pruning** -- automatic removal of old step outputs to keep workflow state lean
- **Schedule overlap policies** -- Allow, Skip, or Buffer for recurring jobs
- **Multi-queue worker priorities** -- process multiple queues with strict priority ordering
- **Parallel steps** -- run steps concurrently and wait for all to complete before advancing
- **Conditional branching** -- skip to named steps based on prior state
- **Sub-workflows** -- spawn child workflows that feed results back to the parent
- **Priority queues** -- 4 levels (Low, Normal, High, Urgent) via type-safe enums
- **Rate limiting** -- per-handler execution limits with sliding window
- **Recurring jobs** -- interval-based (`every('1 hour')`) and cron-based (`cron('0 3 * * *')`) scheduling
- **Job dependencies** -- job B waits for job A to complete before running
- **Unique jobs** -- prevent duplicate dispatches for the same handler and payload
- **Job properties** -- `$tries`, `$timeout`, and `$backoff` declared directly on job classes
- **`failed()` hook** -- called on the job instance when all retries are exhausted
- **Conditional dispatch** -- `dispatch_if()` and `dispatch_unless()` on the `Dispatchable` trait
- **Synchronous dispatch** -- `dispatch_sync()` runs a job immediately without the queue
- **Timeout enforcement** -- kill jobs that exceed max execution time
- **Worker concurrency** -- `--workers=N` forks multiple processes with automatic restart on crash
- **Permanent logging** -- queryable database log of every job and workflow execution
- **Metrics API** -- throughput, latency percentiles, and error rates per handler
- **Webhooks** -- HTTP notifications on job/workflow events
- **Testing utilities** -- `QueueFake` for asserting dispatched jobs and `create_batch()` batches in tests
- **PHP attributes** -- `#[QueuetyHandler('name')]` for auto-registration
- **ThrottlesExceptions** -- back off when external services are down to prevent job storms
- **Debug mode** -- verbose worker logging for development

How It Works
------------

[](#how-it-works)

Workers run inside a long-lived WP-CLI process and claim jobs directly from MySQL via PDO. The queue, workflow state, logs, batches, signals, and streaming chunks all live in MySQL, so worker restarts do not lose orchestration state.

Workflows break long-running work into steps. Each step persists its output to a shared state bag in the database. If PHP dies mid-step, the worker retries that step with all prior state intact. The step boundary is a single MySQL transaction: state update, job completion, and next step enqueue all happen atomically.

```
Workflow: generate_report (3 steps)

Step 0: fetch_data     -> state: {user_data: {...}}
Step 1: call_llm       -> PHP dies -> retry -> state: {user_data: {...}, llm_response: "..."}
Step 2: format_output  -> state: {user_data: {...}, llm_response: "...", report_url: "/reports/42.pdf"}

Workflow: completed

```

Job Handlers
------------

[](#job-handlers)

Classic handler interface:

```
class SendEmailHandler implements Queuety\Handler {
    public function handle(array $payload): void {
        wp_mail($payload['to'], $payload['subject'], $payload['body']);
    }

    public function config(): array {
        return [
            'queue' => 'emails',
            'max_attempts' => 5,
        ];
    }
}

Queuety::register('send_email', SendEmailHandler::class);
```

Modern dispatchable job class:

```
use Queuety\Contracts\Job;
use Queuety\Dispatchable;

readonly class SendEmailJob implements Job {
    use Dispatchable;

    public int $tries = 5;
    public int $timeout = 30;
    public array $backoff = [10, 30, 60];

    public function __construct(
        public string $to,
        public string $subject,
        public string $body,
    ) {}

    public function handle(): void {
        wp_mail($this->to, $this->subject, $this->body);
    }

    public function failed(\Throwable $e): void {
        error_log("Email to {$this->to} failed: {$e->getMessage()}");
    }
}
```

Workflow Steps
--------------

[](#workflow-steps)

```
class CallLLMHandler implements Queuety\Step {
    public function handle(array $state): array {
        $response = $this->callAPI($state['prompt']);
        return ['llm_response' => $response]; // merged into state
    }

    public function config(): array {
        return ['max_attempts' => 5];
    }
}
```

Streaming Steps
---------------

[](#streaming-steps)

For LLM responses or large downloads, streaming steps persist data chunk by chunk so progress survives crashes:

```
use Queuety\Contracts\StreamingStep;

class StreamLLMHandler implements StreamingStep {
    public function stream(array $state, array $existing_chunks = []): \Generator {
        $offset = count($existing_chunks);
        foreach ($this->streamApi($state['prompt'], $offset) as $chunk) {
            yield $chunk; // persisted to DB immediately
        }
    }

    public function on_complete(array $chunks, array $state): array {
        return ['response' => implode('', $chunks)];
    }

    public function config(): array {
        return ['max_attempts' => 3];
    }
}
```

WP-CLI Commands
---------------

[](#wp-cli-commands)

CommandDescription`wp queuety work [--queue=] [--once] [--workers=]`Start a worker (or N workers)`wp queuety work --queue=high,default,low`Process multiple queues with priority ordering`wp queuety flush`Process all pending jobs and exit`wp queuety dispatch  --payload='{}'`Dispatch a job`wp queuety status`Show queue stats`wp queuety list [--queue=] [--status=]`List jobs`wp queuety inspect `Show full job details and log history`wp queuety retry `Retry a job`wp queuety retry-buried`Retry all buried jobs`wp queuety bury `Bury a job`wp queuety delete `Delete a job`wp queuety recover`Recover stale jobs`wp queuety purge [--older-than=]`Purge completed jobs`wp queuety pause `Pause a queue`wp queuety resume `Resume a queue`wp queuety metrics`Show per-handler metrics`wp queuety discover  --namespace=`Auto-discover handlers`wp queuety workflow status `Show workflow progress`wp queuety workflow retry `Retry from failed step`wp queuety workflow pause `Pause a workflow`wp queuety workflow resume `Resume a workflow`wp queuety workflow list [--status=]`List workflows`wp queuety workflow cancel `Cancel a workflow and run cleanup handlers`wp queuety workflow timeline `Show the full event timeline for a workflow`wp queuety workflow state-at  `Show workflow state snapshot at a specific step`wp queuety schedule list`List recurring schedules`wp queuety schedule add  [--every=] [--cron=]`Add a recurring schedule`wp queuety schedule remove `Remove a schedule`wp queuety schedule run`Manually trigger scheduler tick`wp queuety log [--workflow=] [--job=]`Query log entries`wp queuety log purge --older-than=`Prune old logs`wp queuety webhook add  `Register a webhook`wp queuety webhook list`List webhooks`wp queuety webhook remove `Remove a webhookConfiguration
-------------

[](#configuration)

All constants are optional. Define in `wp-config.php`:

ConstantDefaultDescription`QUEUETY_RETENTION_DAYS``7`Auto-purge completed jobs after N days`QUEUETY_LOG_RETENTION_DAYS``0`Auto-purge logs after N days (0 = forever)`QUEUETY_MAX_EXECUTION_TIME``300`Max seconds per job before timeout`QUEUETY_WORKER_SLEEP``1`Seconds to sleep when queue is empty`QUEUETY_WORKER_MAX_JOBS``1000`Max jobs before worker restarts`QUEUETY_WORKER_MAX_MEMORY``128`Max MB before worker restarts`QUEUETY_RETRY_BACKOFF``exponential`Backoff strategy (exponential, linear, fixed)`QUEUETY_STALE_TIMEOUT``600`Seconds before stuck jobs are recovered`QUEUETY_CACHE_TTL``5`Default cache TTL in seconds`QUEUETY_DEBUG``false`Enable verbose worker logging`QUEUETY_TABLE_JOBS``queuety_jobs`Jobs table name`QUEUETY_TABLE_WORKFLOWS``queuety_workflows`Workflows table name`QUEUETY_TABLE_LOGS``queuety_logs`Logs table name`QUEUETY_TABLE_SCHEDULES``queuety_schedules`Schedules table name`QUEUETY_TABLE_SIGNALS``queuety_signals`Signals table name`QUEUETY_TABLE_WORKFLOW_DEPENDENCIES``queuety_workflow_dependencies`Workflow dependency waits table name`QUEUETY_TABLE_WORKFLOW_DISPATCH_KEYS``queuety_workflow_dispatch_keys`Durable workflow idempotency table name`QUEUETY_TABLE_CHUNKS``queuety_chunks`Streaming chunks table name`QUEUETY_TABLE_QUEUE_STATES``queuety_queue_states`Queue states table name`QUEUETY_TABLE_WEBHOOKS``queuety_webhooks`Webhooks table nameDevelopment
-----------

[](#development)

```
# Install dependencies
composer install
npm install

# Check coding standards
composer cs

# Auto-fix coding standards
composer cs:fix

# Run all tests
composer test

# Run specific test suites
composer test:unit
composer test:integration

# Run E2E tests (requires Docker and Node.js)
npm run test:e2e
npm run test:e2e:wp-env
```

License
-------

[](#license)

GPL-2.0-or-later

###  Health Score

34

—

LowBetter than 77% of packages

Maintenance91

Actively maintained with recent releases

Popularity0

Limited adoption so far

Community6

Small or concentrated contributor base

Maturity36

Early-stage or recently created project

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Unknown

Total

1

Last Release

46d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/dc81a975082b6843421de211768cfb67d8237bed4c972c9a1eaad46aac14b372?d=identicon)[dnnsjsk](/maintainers/dnnsjsk)

---

Top Contributors

[![dnnsjsk](https://avatars.githubusercontent.com/u/73965001?v=4)](https://github.com/dnnsjsk "dnnsjsk (72 commits)")

###  Code Quality

TestsPHPUnit

Code StylePHP\_CodeSniffer

### Embed Badge

![Health badge](/badges/queuety-queuety/health.svg)

```
[![Health](https://phpackages.com/badges/queuety-queuety/health.svg)](https://phpackages.com/packages/queuety-queuety)
```

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
