PHPackages                             webpatser/torque - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. webpatser/torque

ActiveLibrary[Queues &amp; Workers](/categories/queues)

webpatser/torque
================

Coroutine-based queue worker for Laravel — N jobs per process via PHP Fibers and Redis Streams

v0.4.0(2d ago)14↑650%MITPHPPHP ^8.5

Since Apr 6Pushed 2d agoCompare

[ Source](https://github.com/webpatser/torque)[ Packagist](https://packagist.org/packages/webpatser/torque)[ RSS](/packages/webpatser-torque/feed)WikiDiscussions main Synced today

READMEChangelogDependencies (11)Versions (9)Used By (0)

Torque
======

[](#torque)

**The queue that keeps spinning.** Coroutine-based queue worker for Laravel.

Torque replaces Horizon's 1-job-per-process model with N-jobs-per-process using PHP 8.5 Fibers. When a job waits on I/O, the coroutine scheduler switches to another job. Same hardware, 3-10x throughput for I/O-bound workloads.

```
4 workers x 50 coroutines = 200 concurrent jobs in ~300MB RAM

```

Requirements
------------

[](#requirements)

- PHP 8.5+
- Laravel 12+
- Redis 7+ or Valkey (Redis Streams support)
- [Revolt](https://revolt.run) event loop (installed automatically)

Installation
------------

[](#installation)

```
composer require webpatser/torque
```

Publish the config:

```
php artisan vendor:publish --tag=torque-config
```

Add the queue connection to `config/queue.php`:

```
'torque' => [
    'driver' => 'torque',
    'queue' => 'default',
    'retry_after' => 90,
    'block_for' => 2000,
    'prefix' => 'torque:',
    'redis_uri' => env('TORQUE_REDIS_URI', 'redis://127.0.0.1:6379'),
    'consumer_group' => 'torque',
],
```

Set it as default in `.env`:

```
QUEUE_CONNECTION=torque

```

Usage
-----

[](#usage)

### Starting the worker

[](#starting-the-worker)

```
php artisan torque:start
```

Options:

```
php artisan torque:start --workers=8 --concurrency=100 --queues=emails,notifications
```

### Dispatching jobs

[](#dispatching-jobs)

Standard Laravel dispatching works unchanged:

```
ProcessDocument::dispatch($document);
ProcessDocument::dispatch($document)->onQueue('high');
ProcessDocument::dispatch($document)->delay(now()->addMinutes(5));

// Batches work out of the box
Bus::batch([
    new ProcessDocument($doc1),
    new ProcessDocument($doc2),
    new ProcessDocument($doc3),
])->dispatch();
```

### Async jobs with TorqueJob

[](#async-jobs-with-torquejob)

Regular Laravel jobs work fine — they run synchronously within their coroutine slot. For full async I/O, extend `TorqueJob` and type-hint the pools you need:

```
use Webpatser\Torque\Job\TorqueJob;
use Webpatser\Torque\Pool\MysqlPool;
use Webpatser\Torque\Pool\HttpPool;

class IndexDocument extends TorqueJob
{
    public function __construct(
        private int $documentId,
    ) {}

    public function handle(MysqlPool $db, HttpPool $http): void
    {
        $result = $db->execute('SELECT * FROM documents WHERE id = ?', [$this->documentId]);
        $row = $result->fetchRow();

        $http->post('http://elasticsearch:9200/docs/_doc/' . $this->documentId, json_encode($row));
    }
}
```

### Per-Fiber state isolation

[](#per-fiber-state-isolation)

Use `CoroutineContext` when you need per-job isolated state (e.g., request-scoped data):

```
use Webpatser\Torque\Job\CoroutineContext;

// Inside a job handler
CoroutineContext::set('tenant_id', $this->tenantId);
$tenantId = CoroutineContext::get('tenant_id');
```

State is automatically cleaned up when the Fiber completes (backed by `WeakMap`).

Job Event Streams
-----------------

[](#job-event-streams)

Every job automatically records lifecycle events to a per-job Redis Stream — no code changes needed.

```
$ php artisan torque:tail --job=088066c1-b045-4fb6-bc32-ca15cfdf7d08

📦 11:08:34 queued App\Jobs\ScrapeKvK → scrpr
▶ 11:10:28 started worker=web-01-4879 attempt=1
⚠ 11:10:52 exception attempt=1 No alive nodes. All the 1 nodes seem to be down.
▶ 11:11:34 started worker=web-01-4882 attempt=2
✓ 11:11:34 completed memory=58.5MB
```

### Custom progress events

[](#custom-progress-events)

Add the `Streamable` trait to emit progress from inside your job:

```
use Webpatser\Torque\Stream\Streamable;

class ImportCsv implements ShouldQueue
{
    use Streamable;

    public function handle(): void
    {
        foreach ($this->rows as $i => $row) {
            // process...
            $this->emit("Imported row {$i}", progress: $i / count($this->rows));
        }
    }
}
```

### Reading streams programmatically

[](#reading-streams-programmatically)

```
use Webpatser\Torque\Stream\JobStream;

$stream = app(JobStream::class);

// All events so far
$events = $stream->events($uuid);

// Tail (blocks, yields events as they arrive)
foreach ($stream->tail($uuid) as $event) {
    echo $event['type'] . ': ' . ($event['data']['message'] ?? '');
}

// Check completion
$stream->isFinished($uuid); // true after completed/failed
```

Streams auto-expire after 5 minutes (configurable via `job_streams.ttl`).

CLI Commands
------------

[](#cli-commands)

CommandDescription`torque:start`Start the master + worker processes`torque:stop`Graceful shutdown (SIGTERM). Use `--force` for SIGKILL`torque:status`Show worker metrics, throughput, and queue depths`torque:monitor`Live htop-style terminal dashboard`torque:tail`Tail a job's event stream in real-time`torque:pause`Pause job processing (in-flight jobs complete)`torque:pause continue`Resume processing`torque:supervisor`Generate a Supervisor config fileConfiguration
-------------

[](#configuration)

All options are in `config/torque.php`. Key settings:

SettingDefaultDescription`workers`4Number of worker processes`coroutines_per_worker`50Concurrent job slots per worker`max_jobs_per_worker`10000Restart worker after N jobs (prevents memory leaks)`max_worker_lifetime`3600Restart worker after N seconds`block_for`2000XREADGROUP block timeout (ms)### Autoscaling

[](#autoscaling)

```
'autoscale' => [
    'enabled' => true,
    'min_workers' => 2,
    'max_workers' => 8,
    'scale_up_threshold' => 0.85,   // Scale up when 85% of slots are busy
    'scale_down_threshold' => 0.20, // Scale down when 20% of slots are busy
    'cooldown' => 30,               // Seconds between scaling decisions
],
```

### Connection pools

[](#connection-pools)

```
'pools' => [
    'redis' => ['size' => 30, 'idle_timeout' => 60],
    'mysql' => ['size' => 20, 'idle_timeout' => 60],
    'http'  => ['size' => 15, 'idle_timeout' => 30],
],
```

Dashboard
---------

[](#dashboard)

Torque includes a Livewire 4 + Flux UI Pro dashboard at `/torque` (configurable).

Features:

- Real-time metrics (throughput, latency, concurrent jobs, memory)
- Worker table with coroutine slot usage bars
- Stream/queue overview with pending and delayed counts
- Failed jobs list with retry and delete actions
- Kibana-style configurable poll interval (1s to 1m, or paused)

### Authorization

[](#authorization)

The dashboard is **denied by default**. Define the `viewTorque` gate in your `AuthServiceProvider`:

```
Gate::define('viewTorque', fn (User $user) => $user->isAdmin());
```

### Dashboard middleware

[](#dashboard-middleware)

Default: `['web', 'auth']`. Override in config:

```
'dashboard' => [
    'enabled' => true,
    'path' => 'torque',
    'middleware' => ['web', 'auth', 'can:admin'],
],
```

Failed jobs
-----------

[](#failed-jobs)

Jobs that exhaust all retries are moved to a dead-letter Redis Stream. You can:

- View them in the dashboard
- Retry or delete via dashboard or programmatically
- Listen for the `JobPermanentlyFailed` event for custom notifications

```
use Webpatser\Torque\Events\JobPermanentlyFailed;

Event::listen(JobPermanentlyFailed::class, function ($event) {
    // $event->jobName, $event->queue, $event->exceptionMessage, etc.
    Notification::route('slack', '#alerts')->notify(new YourNotification($event));
});
```

Architecture
------------

[](#architecture)

```
Master Process (torque:start)
├── Worker 1 (Revolt event loop)
│   ├── 50 Fibers (concurrent jobs)
│   ├── Redis Pool
│   ├── MySQL Pool
│   └── HTTP Pool
├── Worker 2
│   └── ...
├── Worker N
│   └── ...
└── AutoScaler (optional)

Redis Streams
├── torque:default (XREADGROUP consumer groups)
├── torque:default:delayed (sorted set)
├── torque:stream:dead-letter
├── torque:worker:* (per-worker stats with heartbeat TTL)
└── torque:job:* (per-job event streams, auto-expiring)

```

### How it works

[](#how-it-works)

1. **Master** spawns N worker processes via `pcntl_exec()` (`php artisan torque:worker`)
2. Each **worker** runs a Revolt event loop with M Fiber slots
3. Each Fiber loops: `XREADGROUP COUNT 1 BLOCK` → process → repeat. When the job does async I/O, the Fiber suspends and another job runs
4. **Work-stealing**: idle Fibers claim stale messages from dead consumers via `XAUTOCLAIM` (per-queue `retry_after` as idle threshold)
5. On completion: `XACK` + `XDEL`. On failure: retry with exponential backoff or dead-letter
6. Each Fiber has its own dedicated Redis connection for blocking reads

### Queue backend: Redis Streams

[](#queue-backend-redis-streams)

Redis Streams (not LISTs like Horizon) provide:

- **Consumer groups**: multiple workers, no duplicate processing
- **Acknowledgment**: `XACK` after success, unacked jobs auto-reclaimed via `XAUTOCLAIM`
- **Backpressure**: `XREADGROUP COUNT {slots} BLOCK {ms}` pulls exactly as many jobs as available slots
- **Pending Entries List**: Redis tracks assigned-but-unacked jobs natively

### Compatibility

[](#compatibility)

FeatureHorizonTorqueQueue backendRedis LISTRedis StreamsConcurrency1 job/processN jobs/process (Fibers)I/O modelBlocking (PDO, curl)Non-blocking (AMPHP)PHP extensionsNoneNoneEloquent in jobsFull supportSync fallback (blocking)Laravel Queue contractFullFullJob batchesYesYesDelayed jobsRedis sorted setRedis sorted setDashboardBlade + pollingLivewire 4 + Flux UIAutoscalingBalancing strategiesSlot-pressure basedProduction deployment
---------------------

[](#production-deployment)

Generate a Supervisor config:

```
php artisan torque:supervisor --workers=4 --user=forge
```

This creates `storage/torque-supervisor.conf`. Copy it to your Supervisor config directory:

```
sudo cp storage/torque-supervisor.conf /etc/supervisor/conf.d/torque.conf
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start torque
```

Dependencies
------------

[](#dependencies)

**Required** (installed automatically):

- `revolt/event-loop` — Fiber scheduler
- `amphp/amp` — async/await primitives
- `amphp/redis` — Non-blocking Redis client
- `amphp/sync` — LocalSemaphore for pool management

**Optional** (install when needed):

- `amphp/mysql` — Async MySQL for `MysqlPool`
- `amphp/http-client` — Async HTTP for `HttpPool`

License
-------

[](#license)

MIT

###  Health Score

42

—

FairBetter than 89% of packages

Maintenance100

Actively maintained with recent releases

Popularity7

Limited adoption so far

Community6

Small or concentrated contributor base

Maturity46

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~0 days

Total

8

Last Release

2d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/e442a1d15a5b64438f3b471acfded80951afb1bed23641cfd80c5254099eab9d?d=identicon)[webpatser](/maintainers/webpatser)

---

Top Contributors

[![webpatser](https://avatars.githubusercontent.com/u/25720?v=4)](https://github.com/webpatser "webpatser (11 commits)")

---

Tags

asynclaravelqueueamphpCoroutinesFibersredis-streams

###  Code Quality

TestsPest

### Embed Badge

![Health badge](/badges/webpatser-torque/health.svg)

```
[![Health](https://phpackages.com/badges/webpatser-torque/health.svg)](https://phpackages.com/packages/webpatser-torque)
```

###  Alternatives

[amphp/byte-stream

A stream abstraction to make working with non-blocking I/O simple.

392116.2M104](/packages/amphp-byte-stream)[amphp/file

Non-blocking access to the filesystem based on Amp and Revolt.

1103.3M94](/packages/amphp-file)[amphp/redis

Efficient asynchronous communication with Redis servers, enabling scalable and responsive data storage and retrieval.

165634.7k44](/packages/amphp-redis)[amphp/cluster

Building multi-core network applications with PHP.

6224.8k1](/packages/amphp-cluster)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
