PHPackages                             danil-kashin/file-queue - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. danil-kashin/file-queue

ActiveLibrary[Queues &amp; Workers](/categories/queues)

danil-kashin/file-queue
=======================

A simple, file-based FIFO queue that keeps messages in order across restarts.

v1.2.0(1mo ago)050BSD-3-ClausePHPPHP &gt;=8.1

Since Mar 30Pushed 1mo agoCompare

[ Source](https://github.com/KashinDanil/FileQueue)[ Packagist](https://packagist.org/packages/danil-kashin/file-queue)[ RSS](/packages/danil-kashin-file-queue/feed)WikiDiscussions main Synced 3w ago

READMEChangelogDependencies (6)Versions (5)Used By (0)

FileQueue
=========

[](#filequeue)

A simple, file-based FIFO queue for PHP. Messages are persisted to disk using an append-only log with exclusive file locking, so the queue survives process restarts and is safe to use across multiple processes.

Requirements
------------

[](#requirements)

- PHP 8.1+
- Extension: `json`

Installation
------------

[](#installation)

```
composer require danil-kashin/file-queue
```

Usage
-----

[](#usage)

### 1. Create a worker

[](#1-create-a-worker)

Extend `FileQueueWorker` and implement `processMessage()`. Return `true` on success, `false` on failure. Failed messages are dequeued and not retried automatically.

The worker scans a directory for queue files and processes **one message per tick**, rotating across discovered queues. Each queue may contribute up to **10 messages per cycle** (a full pass over all queues) before the worker moves on, which ensures equal resource distribution across queues and avoids the noisy-neighbor problem. The worker handles `SIGTERM`/`SIGINT` for graceful shutdown.

```
use DanilKashin\FileQueue\Queue\QueueMessage;
use DanilKashin\FileQueue\Workers\FileQueueWorker;

class OrderWorker extends FileQueueWorker
{
    protected function processMessage(QueueMessage $message): bool
    {
        return handleOrder($message->payload);
    }
}
```

For each processed message the worker prints `+` (success) or `-` (failure) to stdout.

#### Tuning

[](#tuning)

- Override `getMessagesPerQueueLimit()` to change the per-queue per-cycle limit (default: 10).
- Pass `maxTicks` as the second constructor argument to stop the worker after a fixed number of ticks — useful for tests, supervised one-shot runs, or external schedulers that prefer to recycle the process periodically.

```
class OrderWorker extends FileQueueWorker
{
    protected function getMessagesPerQueueLimit(): int
    {
        return 50;
    }

    protected function processMessage(QueueMessage $message): bool
    {
        return handleOrder($message->payload);
    }
}

$worker = new OrderWorker(queuesDir: '/var/queues', maxTicks: 1000);
$worker->run();
```

### 2. Run the worker

[](#2-run-the-worker)

The worker is a long-running process — run it in the background so your application keeps responding. Progress is written to stdout (`+` / `-` per message and `.` per tick); errors go to stderr.

```
vendor/bin/run_worker "App\Workers\OrderWorker" --queuesDir=/var/queues > worker.log 2>&1 &
```

### 3. Enqueue messages

[](#3-enqueue-messages)

Messages can be enqueued from anywhere in your application as long as they share the same `baseDir`. No running worker is required at enqueue time; messages will be picked up on the next tick.

```
use DanilKashin\FileQueue\Queue\FileQueue;
use DanilKashin\FileQueue\Queue\QueueMessage;

$queue = new FileQueue(queueName: 'orders', baseDir: '/var/queues');
$queue->enqueue(new QueueMessage(['order_id' => 42, 'status' => 'pending']));
```

Architecture
------------

[](#architecture)

 ```
flowchart LR
    App["Your App"] -->|"enqueue(QueueMessage)"| FileQueue
    FileQueue -->|"dequeue()"| FileQueueWorker
    FileQueueWorker -->|"processMessage(QueueMessage)"| YourWorker["Your Worker"]
```

      Loading If you need to supply queues from a custom source (e.g. a database-backed list of queue names), extend `QueueWorker` directly and implement both `getQueues()` and `processMessage()`.

Queue API
---------

[](#queue-api)

```
$queue = new FileQueue(queueName: 'orders', baseDir: '/var/queues');

// Write
$queue->enqueue(new QueueMessage(['order_id' => 42]));

// Read
$message = $queue->dequeue(); // QueueMessage|null
if ($message !== null) {
    $payload = $message->payload; // ['order_id' => 42]
}

// Inspect
$queue->isEmpty(); // bool
$queue->size();    // int — counts remaining (unconsumed) messages

// Cleanup
$queue->compact();
```

### Compaction

[](#compaction)

Over time the data file grows as messages are appended and consumed. `compact()` rewrites the file to contain only unread messages, reclaiming disk space.

No need to call it regularly if your `FileQueueWorker` runs continuously and the queues are emptied regularly. When the last message is dequeued, all associated files are removed immediately, so no manual cleanup is needed.

How it works
------------

[](#how-it-works)

Each queue is backed by three files:

FilePurpose`{name}.queue.data`Append-only binary log of framed records`{name}.queue.pointer`Read offset — tracks the next unread message`{name}.queue.lock`Exclusive lock file — guards all mutationsRecords are size-prefixed with a 4-byte big-endian length header followed by the JSON payload. Reads advance the pointer without touching the data file. `compact()` rewrites the data file to drop consumed records. All mutations are guarded by an exclusive lock.

Exceptions
----------

[](#exceptions)

ClassWhen thrown`QueueException`I/O failure (unreadable file, write error, …)`CorruptedQueueException`Truncated or structurally invalid record`CorruptedQueueException` extends `QueueException`, so catching `QueueException` covers both.

###  Health Score

41

—

FairBetter than 87% of packages

Maintenance90

Actively maintained with recent releases

Popularity11

Limited adoption so far

Community6

Small or concentrated contributor base

Maturity46

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~13 days

Total

4

Last Release

50d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/808cc658e54eb99caa9573723ab97993e9fcd90d2b6cd0940a1ba7eb8950f8cf?d=identicon)[KashinDanil](/maintainers/KashinDanil)

---

Top Contributors

[![KashinDanil](https://avatars.githubusercontent.com/u/51259240?v=4)](https://github.com/KashinDanil "KashinDanil (4 commits)")

###  Code Quality

TestsPHPUnit

### Embed Badge

![Health badge](/badges/danil-kashin-file-queue/health.svg)

```
[![Health](https://phpackages.com/badges/danil-kashin-file-queue/health.svg)](https://phpackages.com/packages/danil-kashin-file-queue)
```

###  Alternatives

[league/geotools

Geo-related tools PHP 7.3+ library

1.4k5.5M30](/packages/league-geotools)[illuminate/bus

The Illuminate Bus package.

6045.5M507](/packages/illuminate-bus)[uecode/qpush-bundle

Asynchronous processing for Symfony using Push Queues

1672.5M2](/packages/uecode-qpush-bundle)[mayconbordin/l5-stomp-queue

Stomp Queue Driver for Laravel 5

121.1k](/packages/mayconbordin-l5-stomp-queue)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
