PHPackages                             thesis/pgmq - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. thesis/pgmq

ActiveLibrary[Queues &amp; Workers](/categories/queues)

thesis/pgmq
===========

A non-blocking php client for Postgres Message Queue (PGMQ).

0.1.5(2mo ago)68411[2 issues](https://github.com/thesis-php/pgmq/issues)MITPHPPHP ^8.3CI passing

Since Nov 18Pushed 2mo agoCompare

[ Source](https://github.com/thesis-php/pgmq)[ Packagist](https://packagist.org/packages/thesis/pgmq)[ Fund](https://www.tinkoff.ru/cf/5MqZQas2dk7)[ RSS](/packages/thesis-pgmq/feed)WikiDiscussions 0.1.x Synced 1mo ago

READMEChangelog (6)Dependencies (8)Versions (14)Used By (0)

pgmq
====

[](#pgmq)

Non-blocking php client for [pgmq](https://github.com/pgmq/pgmq). See the extension [installation guide](https://github.com/pgmq/pgmq/blob/main/INSTALLATION.md).

Installation
------------

[](#installation)

```
composer require thesis/pgmq
```

Why is almost all the API functional?
-------------------------------------

[](#why-is-almost-all-the-api-functional)

Since you most likely expect exactly-once semantics from a database-based queue, all requests — sending or processing business logic with message acknowledgments — must be transactional. And the transaction object is short-lived: it cannot be used after `rollback()` or `commit()`, so it cannot be made a dependency. That's why all the API is built on functions that take `Amp\Postgres\PostgresLink` as their first parameter, which can be either a transaction object or just a connection. And only the consumer accepts `Amp\Postgres\PostgresConnection`, because it itself opens transactions for reading and acknowledging messages transactionally.

Contents
--------

[](#contents)

- [Create queue](#create-queue)
- [Create unlogged queue](#create-unlogged-queue)
- [Create partitioned queue](#create-partitioned-queue)
- [List queues](#list-queues)
- [List queue metrics](#list-queue-metrics)
- [List queue metadata](#list-queue-metadata)
- [Drop queue](#drop-queue)
- [Purge queue](#purge-queue)
- [Send message](#send-message)
- [Send message with relative delay](#send-message-with-relative-delay)
- [Send message with absolute delay](#send-message-with-absolute-delay)
- [Send batch](#send-batch)
- [Send batch with relative delay](#send-batch-with-relative-delay)
- [Send batch with absolute delay](#send-batch-with-absolute-delay)
- [Read message](#read-message)
- [Read batch](#read-batch)
- [Pop message](#pop-message)
- [Read batch with poll](#read-batch-with-poll)
- [Set visibility timeout](#set-visibility-timeout)
- [Archive message](#archive-message)
- [Archive batch](#archive-batch)
- [Delete message](#delete-message)
- [Delete batch](#delete-batch)
- [Enable notify insert](#enable-notify-insert)
- [Disable notify insert](#disable-notify-insert)
- [Consume messages](#consume-messages)

### Create queue

[](#create-queue)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
```

### Create unlogged queue

[](#create-unlogged-queue)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createUnloggedQueue($pg, 'events');
```

### Create partitioned queue

[](#create-partitioned-queue)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createPartitionedQueue(
    pg: $pg,
    queue: 'events',
    partitionInterval: 10000,
    retentionInterval: 100000,
);
```

### List queues

[](#list-queues)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

foreach (Pgmq\listQueues($pg) as $queue) {
    $md = $queue->metadata();
    var_dump($md);
}
```

### List queue metrics

[](#list-queue-metrics)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

foreach (Pgmq\metrics($pg) as $metrics) {
    var_dump($metrics);
}
```

### List queue metadata

[](#list-queue-metadata)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

foreach (Pgmq\listQueueMetadata($pg) as $md) {
    var_dump($md);
}
```

### Drop queue

[](#drop-queue)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$queue->drop();
```

### Purge queue

[](#purge-queue)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
var_dump($queue->purge());
```

### Send message

[](#send-message)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'));
```

### Send message with relative delay

[](#send-message-with-relative-delay)

```
use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(
    new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
    TimeSpan::fromSeconds(5),
);
```

### Send message with absolute delay

[](#send-message-with-absolute-delay)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(
    new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
    new \DateTimeImmutable('+5 seconds'),
);
```

### Send batch

[](#send-batch)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch([
    new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
    new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
]);
```

### Send batch with relative delay

[](#send-batch-with-relative-delay)

```
use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch(
    [
        new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
        new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
    ],
    TimeSpan::fromSeconds(5),
);
```

### Send batch with absolute delay

[](#send-batch-with-absolute-delay)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch(
    [
        new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
        new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
    ],
    new \DateTimeImmutable('+5 seconds'),
);
```

### Read message

[](#read-message)

```
use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read(TimeSpan::fromSeconds(20));
```

### Read batch

[](#read-batch)

```
use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->readBatch(10, TimeSpan::fromSeconds(20));
```

### Pop message

[](#pop-message)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->pop();
```

### Read batch with poll

[](#read-batch-with-poll)

```
use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messages = $queue->readPoll(
    batch: 10,
    maxPoll: TimeSpan::fromSeconds(5),
    pollInterval: TimeSpan::fromMilliseconds(250),
);
```

### Set visibility timeout

[](#set-visibility-timeout)

```
use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();

if ($message !== null) {
    // handle the message

    $queue->setVisibilityTimeout($message->id, TimeSpan::fromSeconds(10));
}
```

### Archive message

[](#archive-message)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();

if ($message !== null) {
    $queue->archive($message->id);
}
```

### Archive batch

[](#archive-batch)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messages = [...$queue->readBatch(5)];

if ($messages !== []) {
    $queue->archiveBatch(array_map(
        static fn(Pgmq\Message $message): int => $messages->id),
        $messages,
    );
}
```

### Delete message

[](#delete-message)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();

if ($message !== null) {
    $queue->delete($message->id);
}
```

### Delete batch

[](#delete-batch)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messages = [...$queue->readBatch(5)];

if ($messages !== []) {
    $queue->deleteBatch(array_map(
        static fn(Pgmq\Message $message): int => $messages->id),
        $messages,
    );
}
```

### Enable notify insert

[](#enable-notify-insert)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$channel = $queue->enableNotifyInsert(); // postgres channel to listen is returned
```

### Disable notify insert

[](#disable-notify-insert)

```
use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$queue->disableNotifyInsert();
```

### Consume messages

[](#consume-messages)

This functionality is not a standard feature of the **pgmq** extension, but is provided by the library as an add-on for reliable and correct processing of message batches from the queue, with the ability to `ack`, `nack` (with delay) and archive (`term`) messages from the queue.

1. First of all, create the extension if it doesn't exist yet:

```
use Thesis\Pgmq;

Pgmq\createExtension($pg);
```

2. Then create a queue:

```
use Thesis\Pgmq;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');
```

3. Next, create the consumer object:

```
use Thesis\Pgmq;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);
```

4. Now we can proceed to configure the queue consumer handler:

```
use Thesis\Pgmq;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);

$context = $consumer->consume(
    static function (array $messages, Pgmq\ConsumeController $ctrl): void {
        var_dump($messages);
        $ctrl->ack($messages);
    },
    new Pgmq\ConsumeConfig(
        queue: 'events',
    ),
);
```

Through `Pgmq\ConsumeConfig` you can configure:

- the `batch` size of received messages;
- the message visibility timeout;
- enable monitoring for queue inserts via the LISTEN/NOTIFY mechanism;
- and set the polling interval.

At least one of these settings — `listenForInserts` or `pollTimeout` — must be specified.

Through the `Pgmq\ConsumeController`, you can:

- ack messages, causing them to be deleted from the queue;
- nack messages with a delay, setting a visibility timeout for them;
- terminate processing (when a message can no longer be retried), resulting in them being archived;
- stop the consumer.

Since receiving messages and `acking/nacking` them occur within the same transaction, for your own database queries you must use the `ConsumeController::$tx` object to ensure exactly-once semantics for message processing.

```
use Thesis\Pgmq;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);

$context = $consumer->consume(
    static function (array $messages, Pgmq\ConsumeController $ctrl): void {
        $ctrl->tx->execute('...some business logic');
        $ctrl->ack($messages);
    },
    new Pgmq\ConsumeConfig(
        queue: 'events',
    ),
);
```

Using `ConsumeContext`, you can gracefully stop the consumer, waiting for the current batch to finish processing.

```
use Thesis\Pgmq;
use function Amp\trapSignal;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);

$context = $consumer->consume(
    static function (array $messages, Pgmq\ConsumeController $ctrl): void {
        $ctrl->tx->execute('...some business logic');
        $ctrl->ack($messages);
    },
    new Pgmq\ConsumeConfig(
        queue: 'events',
    ),
);

trapSignal([\SIGINT, \SIGTERM])

$context->stop();
$context->awaitCompletion();
```

Or stop all current consumers using `$consumer->stop()`:

```
use Thesis\Pgmq;
use function Amp\trapSignal;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);

$context = $consumer->consume(...);

trapSignal([\SIGINT, \SIGTERM])

$consumer->stop();
$context->awaitCompletion();
```

License
-------

[](#license)

The MIT License (MIT). Please see [License File](LICENSE) for more information.

###  Health Score

43

—

FairBetter than 91% of packages

Maintenance77

Regular maintenance activity

Popularity26

Limited adoption so far

Community9

Small or concentrated contributor base

Maturity47

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 80% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~16 days

Total

7

Last Release

84d ago

### Community

Maintainers

![](https://avatars.githubusercontent.com/u/2552865?v=4)[Valentin Udaltsov](/maintainers/vudaltsov)[@vudaltsov](https://github.com/vudaltsov)

---

Top Contributors

[![kafkiansky](https://avatars.githubusercontent.com/u/37590388?v=4)](https://github.com/kafkiansky "kafkiansky (32 commits)")[![klimick](https://avatars.githubusercontent.com/u/34217190?v=4)](https://github.com/klimick "klimick (8 commits)")

---

Tags

asyncphp8postgresqlqueue

###  Code Quality

TestsPHPUnit

### Embed Badge

![Health badge](/badges/thesis-pgmq/health.svg)

```
[![Health](https://phpackages.com/badges/thesis-pgmq/health.svg)](https://phpackages.com/packages/thesis-pgmq)
```

###  Alternatives

[amphp/amp

A non-blocking concurrency framework for PHP applications.

4.4k123.4M323](/packages/amphp-amp)[amphp/parallel

Parallel processing component for Amp.

85046.2M74](/packages/amphp-parallel)[amphp/byte-stream

A stream abstraction to make working with non-blocking I/O simple.

393116.2M104](/packages/amphp-byte-stream)[danog/madelineproto

Async PHP client API for the telegram MTProto protocol.

3.4k855.0k18](/packages/danog-madelineproto)[amphp/sync

Non-blocking synchronization primitives for PHP based on Amp and Revolt.

19052.8M39](/packages/amphp-sync)[amphp/socket

Non-blocking socket connection / server implementations based on Amp and Revolt.

26539.0M119](/packages/amphp-socket)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
