PHPackages                             lelikptz/async-consumer - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. lelikptz/async-consumer

ActiveLibrary[Queues &amp; Workers](/categories/queues)

lelikptz/async-consumer
=======================

Async consumer based on Fibers

v0.1.1(2y ago)04MITPHPPHP &gt;=8.1

Since Oct 19Pushed 2y ago1 watchersCompare

[ Source](https://github.com/lelikptz/async-consumer)[ Packagist](https://packagist.org/packages/lelikptz/async-consumer)[ RSS](/packages/lelikptz-async-consumer/feed)WikiDiscussions main Synced 1mo ago

READMEChangelog (1)Dependencies (7)Versions (3)Used By (0)

Async consumer
==============

[](#async-consumer)

Асинхронный consumer реализованный с помощью Fiber. Для работы необходимо имплементировать TaskInterface. Реализация должна возвращать статус неблокирующей операции, которую можно распараллелить.

В [Task.php](src%2FTask%2FHttp%2FTask.php) пример имплементации TaskInterface где неблокирующей операцией является http запрос через guzzle.

Пример использования Http\\Task:
--------------------------------

[](#пример-использования-httptask)

Имплементируем фабрику для создания реквеста:

```
final class Factory implements RequestFactoryInterface
{
    public function __construct(private readonly LoggerInterface $logger)
    {
    }

    public function create(): RequestInterface
    {
        $this->logger->info('Some logic for creating request');

        return new Request('GET', 'https://www.google.com');
    }
}
```

Имплементируем handler для обработки респонса и ошибки:

```
final class Handler implements ResponseHandlerInterface
{
    public function __construct(private readonly LoggerInterface $logger)
    {
    }

    public function onSuccess(ResponseInterface $response): void
    {
        $this->logger->info(
            sprintf(
                "Response body: %s; response code: %s",
                $response->getBody()->getContents(),
                $response->getStatusCode()
            )
        );
        $this->logger->info('Some logic with response');
        $this->logger->info('Finish');
    }

    public function onException(RequestException $exception): void
    {
        $this->logger->error($exception->getMessage());
    }
}
```

Провайдер задач собирает необходимую таску и возвращает её в консьюмер по мере готовности:

```
final class Provider implements ProviderInterface
{
    public function __construct(private readonly LoggerInterface $logger)
    {
    }

    public function get(): array
    {
        return [
            new Task(new Factory($this->logger), new Handler($this->logger)),
        ];
    }
}
```

Собираем консьюмер и запускаем как демон например через супервизор.

$pollTimeoutInMicroseconds - дэлэй между опросами провайдера

```
$logger = new ConsoleLogger(new ConsoleOutput(OutputInterface::VERBOSITY_DEBUG));
(new AsyncConsumer(new Provider($logger), new FiberExecutor(), $pollTimeoutInMicroseconds, $logger))->consume();
```

Пример использования rabbitmq как провайдера задач:
---------------------------------------------------

[](#пример-использования-rabbitmq-как-провайдера-задач)

Для использования [AMPQProvider.php](src%2FProvider%2FAMPQProvider.php) имплементируем TransformerInterface для создания TaskInterface из сообщения AMQPMessage:

```
final class Transformer implements TransformerInterface
{
    public function __construct(private readonly LoggerInterface $logger)
    {
    }

    public function transform(AMQPMessage $message): TaskInterface
    {
        return new Task(new Factory($this->logger), new Handler($this->logger));
    }
}
```

Собираем и запускаем:

$maxBatchSize - максимальный размер батча, который будем собирать из rabbitmq и по факту количество распараллеленных задач

$maxBatchCollectTimeInSeconds - время, которое ждём пока батч собирается из rabbitmq, если оно вышло запускам обработку с тем, что есть

$pollTimeoutInMicroseconds - дэлэй между опросами провайдера

```
$connection = new AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
$provider = new AMPQProvider($connection, 'provider', new Transformer($logger));
$logger = new ConsoleLogger(new ConsoleOutput(OutputInterface::VERBOSITY_DEBUG));
$batch = new BatchProvider($provider, 10, 5, $pollTimeoutInMicroseconds);

(new AsyncConsumer($batch, new FiberExecutor(), $pollTimeoutInMicroseconds, $logger))->consume();
```

###  Health Score

20

—

LowBetter than 14% of packages

Maintenance20

Infrequent updates — may be unmaintained

Popularity3

Limited adoption so far

Community7

Small or concentrated contributor base

Maturity42

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~0 days

Total

2

Last Release

939d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/993834eb81e6198771a3ae7a0465b84c6fe3b6bf682365086a4c0cb6c83f006f?d=identicon)[lelikptz](/maintainers/lelikptz)

---

Top Contributors

[![lelikptz](https://avatars.githubusercontent.com/u/5891066?v=4)](https://github.com/lelikptz "lelikptz (13 commits)")

---

Tags

asyncconsumerfaberphpasyncconsumerfiber

###  Code Quality

Static AnalysisPsalm

Code StylePHP CS Fixer

Type Coverage Yes

### Embed Badge

![Health badge](/badges/lelikptz-async-consumer/health.svg)

```
[![Health](https://phpackages.com/badges/lelikptz-async-consumer/health.svg)](https://phpackages.com/packages/lelikptz-async-consumer)
```

###  Alternatives

[cerbero/lazy-json-pages

Framework-agnostic package to load items from any paginated JSON API into a Laravel lazy collection via async HTTP requests.

19697.4k](/packages/cerbero-lazy-json-pages)[easyswoole/wechat

Coroutine safety WeChat library

8140.8k4](/packages/easyswoole-wechat)[aedart/athenaeum

Athenaeum is a mono repository; a collection of various PHP packages

245.2k](/packages/aedart-athenaeum)[amphp/cluster

Building multi-core network applications with PHP.

6224.8k1](/packages/amphp-cluster)[orisai/scheduler

Cron job scheduler - with locks, parallelism and more

4037.1k4](/packages/orisai-scheduler)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
