PHPackages                             chocofamilyme/pubsub - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Utility &amp; Helpers](/categories/utility)
4. /
5. chocofamilyme/pubsub

ActiveLibrary[Utility &amp; Helpers](/categories/utility)

chocofamilyme/pubsub
====================

Библиотека для реализаций паттерна Pub/Sub

4.0.0(4y ago)421.5k↓41.2%11BSD-3-ClausePHPPHP &gt;= 7.1

Since Sep 10Pushed 4y ago6 watchersCompare

[ Source](https://github.com/chocofamilyme/pubsub)[ Packagist](https://packagist.org/packages/chocofamilyme/pubsub)[ Docs](https://github.com/chocofamilyme/pubsub)[ RSS](/packages/chocofamilyme-pubsub/feed)WikiDiscussions master Synced 1mo ago

READMEChangelog (10)Dependencies (3)Versions (67)Used By (1)

Библиотека для реализации паттерна pub/sub для фреймворка Phalcon
=================================================================

[](#библиотека-для-реализации-паттерна-pubsub-для-фреймворка-phalcon)

Библиотека реализует событийную архитектуру приложений (Event-Driven Architecture). Работает с фреймворком Phalcon 3.x, но при желании можно легко адаптировать под другие фреймворки.

Рабочий пример можно посмотреть вот здесь:

### Возможности

[](#возможности)

- Транзакционное сохранение моделей ORM и публикация события
- Публикация событий без транзакции
- Подписка на события
- Повторная отправка события в ту же очередь при необходимости
- Сохранение в общую очередь всех не обработанных и истекших сообщений. Из этой очереди потом можно сохранить куда-то в БД и обработать индивидуально

### Требования

[](#требования)

- Phalcon 3.x+
- PHP 7.0+

### Установка

[](#установка)

```
composer require chocofamilyme/pubsub

```

### Настройка

[](#настройка)

На данный момент библиотека работает только с RabbitMQ, при желаении можно добавить другие.

#### Настройка конфигов

[](#настройка-конфигов)

```
return [
    'eventsource' => [
        'default' => 'rabbitmq',

        'drivers' => [
            'rabbitmq' => [
                'adapter'    => 'RabbitMQ',
                'hosts' => [
                    [
                        'host'     => 'host',
                        'port'     => 5672,
                        'user'     => 'user',
                        'password' => 'password',
                        'vhost'    => '/',
                    ],
                ],

                // Не объязательные параметры
                'heartbeat'          => 60,
                'read_write_timeout' => 60,
                'connection_timeout' => 60,
                'wait_timeout'       => 0,
            ],
        ],
    ],
];
```

Полный список смотрите -

#### Добавляем брокер в DI контейнер

[](#добавляем-брокер-в-di-контейнер)

```
$di = \Phalcon\Di::getDefault();
$config      = $di->get('config')->eventsource;
$config      = $config->drivers[$config->default];

$serviceName = $di->get('config')->domain;
$cache       = $di->get('cache');

$di->setShared('eventsource',
    function () use ($config, $serviceName, $cache) {
        $adapter = $config->adapter;
        $config  = array_merge($config->toArray(), ['app_id' => $serviceName]);
        $class   = 'Chocofamily\PubSub\Provider\\'.$adapter;

        $repeater = new Repeater($cache);
        return $class::getInstance($config, $repeater);
    }
);
```

Здесь `$cache` объект реализующий интерефейс `Phalcon\Cache\BackendInterface`. Кэш используется для подсчета количества повторной обработки определенного сообщения.

#### Таблица параметров с настройками

[](#таблица-параметров-с-настройками)

КлючЗначениеОписаниеconnectionПо умолчанию PhpAmqpLib\\Connection\\AMQPLazyConnection::class[php-amqplib](https://github.com/php-amqplib/php-amqplib/tree/master/PhpAmqpLib/Connection)connection\_timeoutПо умолчанию 3.0 (сек)Максимальное время на соединение с сервером Rabbitmqread\_write\_timeoutПо умолчанию 3.0 (сек)Максимальное время на получениеheartbeatПо умолчанию 60 (сек)[RabbitMQ Doc](https://www.rabbitmq.com/heartbeats.html)keepaliveПо умолчанию false[RabbitMQ Doc](https://www.rabbitmq.com/heartbeats.html#tcp-keepalives)contextПо умолчанию null[RabbitMQ Doc](https://www.rabbitmq.com)prefetch\_countПо умолчанию 1[RabbitMQ Doc](https://www.rabbitmq.com/consumer-prefetch.html)no\_ackПо умолчанию false[RabbitMQ Doc](https://www.rabbitmq.com/nack.html)durableПо умолчанию true[RabbitMQ Doc](https://www.rabbitmq.com/queues.html)exclusiveПо умолчанию false[RabbitMQ Doc](https://www.rabbitmq.com/queues.html)queueПо умолчанию \[\][RabbitMQ Doc](https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf)basic\_consume\_exclusiveПо умолчанию false[RabbitMQ Doc](https://www.rabbitmq.com/consumers.html)wait\_allowed\_methodsПо умолчанию null[php-amqplib](https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Channel/AbstractChannel.php)wait\_non\_blockingПо умолчанию true[php-amqplib](https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Channel/AbstractChannel.php)wait\_timeoutПо умолчанию 0Максимальное время ожидания до получения первого сообщенияexchange\_typeПо умолчанию topic[RabbitMQ Doc](https://www.rabbitmq.com/tutorials/amqp-concepts.html))app\_idПо умолчанию ''Индификатор приложения которое создает сообщениеsleepПо умолчанию 0.1 секундыЕсли wait\_non\_blocking равно true, то если нету задачи ждать время sleep### Использование

[](#использование)

#### Публикация

[](#публикация)

Публиковать сообщения можно используя класс `Chocofamily\PubSub\Publisher`. Минимальный рабочий пример:

```
$publisher = new Publisher($di->getShared('eventsource'));

$payload = [
	'event_id' => 11995,
	'name' => 'docx',
	'age' => 25
];

$routeKey = 'order.created';

$publisher->send($payload, $routeKey);
```

Для RabbitMQ переменная `$routeKey` должна состоять минимум из двух частей разделенных точкой `.`. Пример `order.created`. Имя Exchange будет содержать первый блок, т.е. `order`. После этого если зайдете в админку rabbitmq должен создаться exchange с именем `order`. Обновленно: начиная с версии 2.\* можно указать `exchange`, которому привяжется маршрут `$routeKey, пример:

```
$publisher = new Publisher($di->getShared('eventsource'));

$payload = [
	'event_id' => 11995,
	'name' => 'docx',
	'age' => 25
];

$exchange = 'order';
$routeKey = 'order.created';

$publisher->send($payload, $routeKey, $exchange);
```

#### Подписка на событие

[](#подписка-на-событие)

Для подписки на события используется класс `Chocofamily\PubSub\Subscriber`. Минимальный рабочий пример:

```
$params = [
    'queue_name' => 'restapi_orderx',
];

$taskName = 'your_task_name';

$subscriber = new Subscriber($di->getShared('eventsource'), 'order.created.*', $params, $taskName);

$subscriber->subscribe(function ($headers, $body) {
    echo print_r($headers, 1). PHP_EOL;
    echo print_r($body, 1). PHP_EOL;
});
```

Обновленно: начиная с версии 2.\* можно указать `exchange` и связать с ним маршрут. Теперь можно указать массив маршрутов. Пример:

```
$params = [
    'queue_name' => 'restapi_orderx',
];

$taskName = 'your_task_name';

$routeKeys = [
    'order.created',
    'order.payed',
];

$exchange = 'order';

$subscriber = new Subscriber($di->getShared('eventsource'), $routeKeys, $params, $taskName, $exchange);

$subscriber->subscribe(function ($headers, $body) {
    echo print_r($headers, 1). PHP_EOL;
    echo print_r($body, 1). PHP_EOL;
});
```

Чтобы обратно отправить сообщение в очередь необходимо в кэлбэк функции кинуть исключение `Chocofamily\PubSub\Exceptions\RetryException`. Сообщение может максимум 5 раз обработаться повторно, после этого он попадает в очередь мертвых сообщений (exchange = DLX).

В подписчик можно передавать следующие настройки:

```
durable: bool — сохранять на диск данные
queue: array — настройки самой очереди
prefetch_count: int — количество предзагрузки сообщений
no_ack: — требуется ли подтверждение сообщений
app_id — уникальный ID приложения. Можно использовать для идентификации откуда собите пошло изначально

```

#### Публикация используя транзакции БД

[](#публикация-используя-транзакции-бд)

Этот способ необходим для атомарности сохранения сущности в БД и публикования события. Следующая картинка хорошо иллюстрирует как это работает: [![alt text](https://camo.githubusercontent.com/a071cb893f6e3c8b5656be683b285ae6d4f4a38528751d5e16019e0c127e8ee3/68747470733a2f2f696d6167652e6962622e636f2f6e767a6e78392f72696368617264736f6e5f6d6963726f73657276696365735f70617274355f6c6f63616c5f7472616e73616374696f6e5f65313434393136353835323333322e6a7067)](https://camo.githubusercontent.com/a071cb893f6e3c8b5656be683b285ae6d4f4a38528751d5e16019e0c127e8ee3/68747470733a2f2f696d6167652e6962622e636f2f6e767a6e78392f72696368617264736f6e5f6d6963726f73657276696365735f70617274355f6c6f63616c5f7472616e73616374696f6e5f65313434393136353835323333322e6a7067)

Для этого необходимо создать таблицу events:

```
create table events
(
	id serial not null
		constraint events_pkey
			primary key,
	type smallint not null,
	model_id int not null,
	model_type varchar(100) not null,
	exchange   varchar(100) not null,
	routing_key varchar(100) not null,
	payload json not null,
	status smallint not null,
	created_at timestamp default now() not null,
	updated_at timestamp
);
```

Пример использования:

```
use Chocofamily\PubSub\Services\EventPrepare;

...

$order = new Order([
    'user_id' => 11166541,
    'status'  => 0,
    'total'   => 5852,
]);

$eventSource = $di->get('eventsource');

$event = new EventPrepare($order, new OrderSerialize(['name' => 'docx']), 1);
$event->up($eventSource, 'order.created.-5');

```

Модель **Order** должна реализовывать итерфейс ModelInterface.

Обновленно: начиная с версии 2.\* можно указать `exchange` и связпть с ним маршрут. Привер:

```
use Chocofamily\PubSub\Services\EventPrepare;

...

$order = new Order([
    'user_id' => 11166541,
    'status'  => 0,
    'total'   => 5852,
]);

$eventSource = $di->get('eventsource');

$routeKey = 'order.created.-5';
$exchange = 'order';

$event = new EventPrepare($order, new OrderSerialize(['name' => 'docx']), 1);
$event->up($eventSource, $routeKey, $exchange);

```

Метод `up` работает так

- db transaction start
- order-&gt;save();
- eventModel-&gt;save()
- db transaction commit
- event publish

#### Повторная отправка события

[](#повторная-отправка-события)

Для повторной отправке событие используется класс `Chocofamily\PubSub\Services\EventRepeater`. Рабочий пример:

```
use Chocofamily\PubSub\Services\EventRepeater;

...

$dateStart = \DateTime::createFromFormat('Y-m-d', '2018-01-01');

$eventDataProvider = new Chocofamily\PubSub\Provider\Event($di->get('eventsource'), $dateStart);

try {
    $event = new EventRepeater($eventDataProvider);
    $event->retry();
} catch (\Exception $e) {
    $message = sprintf('%d %s in %s:%s', $e->getCode(), $e->getMessage(), $e->getFile(), $e->getLine());
    $di->get('logger')->error($message);
}
```

#### Очистка журнала событий

[](#очистка-журнала-событий)

Для очистки событий используется класс `Chocofamily\PubSub\Services\EventCleaner` с методом `clean`.

Рабочий пример:

```
use Chocofamily\PubSub\Services\EventCleaner;

...

try {
    $event = new EventCleaner($di->get('modelsManager'));
    $event->clean();
} catch (ModelException $e) {
    $message = sprintf('%d %s in %s:%s', $e->getCode(), $e->getMessage(), $e->getFile(), $e->getLine());
    $di->get('logger')->error($message);
}
```

По умолчанию удаляетя событие больше 1 месяца. Если передать дату как второй параметр в конструкторе, то будет удалятся все событие до указонной даты:

```
use Chocofamily\PubSub\Services\EventCleaner;

...

$dateTime  = new \DateTime();
$dateTime = $dateTime->modify('-1 day');

try {
    $event = new EventCleaner($di->get('modelsManager'), $dateTime);
    $event->clean();
} catch (ModelException $e) {
    $message = sprintf('%d %s in %s:%s', $e->getCode(), $e->getMessage(), $e->getFile(), $e->getLine());
    $di->get('logger')->error($message);
}
```

@todo

- Написать интерфейс для транзакций и убрать зависимость от фреймворка

###  Health Score

38

—

LowBetter than 85% of packages

Maintenance20

Infrequent updates — may be unmaintained

Popularity30

Limited adoption so far

Community19

Small or concentrated contributor base

Maturity71

Established project with proven stability

 Bus Factor2

2 contributors hold 50%+ of commits

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~24 days

Recently: every ~165 days

Total

49

Last Release

1642d ago

Major Versions

1.1.4 → 2.0.02019-04-03

1.1.5 → 2.0.52019-05-29

1.1.6 → 2.0.72019-07-19

2.2.1 → 3.0.02019-11-11

3.5.0 → 4.0.02021-11-19

PHP version history (2 changes)0.0.2PHP &gt;= 7.0.0

4.0.0PHP &gt;= 7.1

### Community

Maintainers

![](https://avatars.githubusercontent.com/u/42729426?v=4)[Chocofamily](/maintainers/chocofamilyme)[@chocofamilyme](https://github.com/chocofamilyme)

---

Top Contributors

[![docxplusgmoon](https://avatars.githubusercontent.com/u/3988870?v=4)](https://github.com/docxplusgmoon "docxplusgmoon (34 commits)")[![Vadim89](https://avatars.githubusercontent.com/u/3715929?v=4)](https://github.com/Vadim89 "Vadim89 (29 commits)")[![ludovicose](https://avatars.githubusercontent.com/u/8954176?v=4)](https://github.com/ludovicose "ludovicose (3 commits)")[![aziza-kamet](https://avatars.githubusercontent.com/u/19660649?v=4)](https://github.com/aziza-kamet "aziza-kamet (2 commits)")[![igor875126](https://avatars.githubusercontent.com/u/11134711?v=4)](https://github.com/igor875126 "igor875126 (2 commits)")[![sepaker](https://avatars.githubusercontent.com/u/8132455?v=4)](https://github.com/sepaker "sepaker (1 commits)")

---

Tags

eventsphalconpubsubMicroserviceeventsource

###  Code Quality

TestsCodeception

### Embed Badge

![Health badge](/badges/chocofamilyme-pubsub/health.svg)

```
[![Health](https://phpackages.com/badges/chocofamilyme-pubsub/health.svg)](https://phpackages.com/packages/chocofamilyme-pubsub)
```

###  Alternatives

[doctrine/event-manager

The Doctrine Event Manager is a simple PHP event system that was built to be used with the various Doctrine projects.

6.1k501.1M115](/packages/doctrine-event-manager)[psr/event-dispatcher

Standard interfaces for event handling.

2.3k618.8M865](/packages/psr-event-dispatcher)[laminas/laminas-eventmanager

Trigger and listen to events within a PHP application

1.0k69.8M225](/packages/laminas-laminas-eventmanager)[simshaun/recurr

PHP library for working with recurrence rules

1.6k15.7M40](/packages/simshaun-recurr)[phalcon/zephir

Zephir is a compiled high level language aimed to the creation of C-extensions for PHP

3.4k414.1k18](/packages/phalcon-zephir)[hhxsv5/php-sse

A simple and efficient library implemented HTML5's server-sent events by PHP, is used to real-time push events from server to client, and easier than Websocket, instead of AJAX request.

452178.0k3](/packages/hhxsv5-php-sse)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
