PHPackages                             iq2i/messenger-import-bundle - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. iq2i/messenger-import-bundle

ActiveSymfony-bundle[Queues &amp; Workers](/categories/queues)

iq2i/messenger-import-bundle
============================

A Symfony bundle that tracks the completion of asynchronous imports dispatched via symfony/messenger

00PHP

Since Mar 31Pushed 3mo agoCompare

[ Source](https://github.com/IQ2i/messenger-import-bundle)[ Packagist](https://packagist.org/packages/iq2i/messenger-import-bundle)[ RSS](/packages/iq2i-messenger-import-bundle/feed)WikiDiscussions main Synced 4w ago

READMEChangelogDependenciesVersions (1)Used By (0)

Messenger Import Bundle
=======================

[](#messenger-import-bundle)

A Symfony bundle that tracks the completion of asynchronous imports dispatched via `symfony/messenger`.

When importing large files, dispatching messages asynchronously speeds up processing but makes it impossible to know when all messages have been handled. This bundle solves that problem by listening to Messenger worker events and detecting when every message in a batch has been processed — whether successfully or not.

Requirements
------------

[](#requirements)

- PHP 8.1+
- Symfony 6.4 / 7.x / 8.x
- `symfony/messenger`
- Doctrine ORM (for the provided traits)

Installation
------------

[](#installation)

```
composer require iq2i/messenger-import-bundle
```

Register the bundle in `config/bundles.php`:

```
return [
    // ...
    IQ2i\MessengerImportBundle\MessengerImportBundle::class => ['all' => true],
];
```

How it works
------------

[](#how-it-works)

1. Before dispatching messages, you create an `ImportBatch` entity that stores the total number of messages to process.
2. Each message carries the batch ID via `BatchAwareMessageInterface`.
3. The bundle's subscriber listens to `WorkerMessageHandledEvent` and `WorkerMessageFailedEvent`. After each message, it decrements the batch counter.
4. When the counter reaches zero, an `ImportBatchCompletedEvent` is dispatched. You listen to this event to send a notification, trigger a follow-up action, etc.

Setup
-----

[](#setup)

### 1. Create the batch entity

[](#1-create-the-batch-entity)

Create an entity that implements `ImportBatchInterface` and uses `ImportBatchTrait`:

```
// src/Entity/ImportBatch.php

use Doctrine\ORM\Mapping as ORM;
use IQ2i\MessengerImportBundle\Model\ImportBatchInterface;
use IQ2i\MessengerImportBundle\Model\ImportBatchTrait;

#[ORM\Entity(repositoryClass: ImportBatchRepository::class)]
class ImportBatch implements ImportBatchInterface
{
    use ImportBatchTrait;

    #[ORM\Id]
    #[ORM\GeneratedValue(strategy: 'CUSTOM')]
    #[ORM\CustomIdGenerator(class: UuidGenerator::class)]
    #[ORM\Column(type: 'uuid', unique: true)]
    private string $id;

    public function getId(): string
    {
        return $this->id;
    }
}
```

### 2. Create the batch repository

[](#2-create-the-batch-repository)

Create a repository that implements `ImportBatchRepositoryInterface` and uses `ImportBatchRepositoryTrait`:

```
// src/Repository/ImportBatchRepository.php

use Doctrine\Bundle\DoctrineBundle\Repository\ServiceEntityRepository;
use Doctrine\Persistence\ManagerRegistry;
use IQ2i\MessengerImportBundle\Model\ImportBatchRepositoryInterface;
use IQ2i\MessengerImportBundle\Model\ImportBatchRepositoryTrait;

class ImportBatchRepository extends ServiceEntityRepository implements ImportBatchRepositoryInterface
{
    use ImportBatchRepositoryTrait;

    public function __construct(ManagerRegistry $registry)
    {
        parent::__construct($registry, ImportBatch::class);
    }
}
```

### 3. Implement `BatchAwareMessageInterface` on your messages

[](#3-implement-batchawaremessageinterface-on-your-messages)

```
// src/Message/ImportProductMessage.php

use IQ2i\MessengerImportBundle\Message\BatchAwareMessageInterface;

class ImportProductMessage implements BatchAwareMessageInterface
{
    public function __construct(
        private readonly array $row,
        private readonly ?string $batchId = null,
    ) {}

    public function getRow(): array
    {
        return $this->row;
    }

    public function getBatchId(): ?string
    {
        return $this->batchId;
    }
}
```

### 4. Dispatch your messages

[](#4-dispatch-your-messages)

Initialize the batch with the total number of messages, then attach the batch ID to each message:

```
// src/Service/ProductImporter.php

class ProductImporter
{
    public function __construct(
        private readonly ImportBatchRepository $batchRepository,
        private readonly MessageBusInterface $bus,
        private readonly EntityManagerInterface $em,
    ) {}

    public function import(array $rows): void
    {
        $batch = new ImportBatch();
        $batch->initialize(count($rows));

        $this->em->persist($batch);
        $this->em->flush();

        foreach ($rows as $row) {
            $this->bus->dispatch(new ImportProductMessage($row, $batch->getId()));
        }
    }
}
```

### 5. Listen to the completion event

[](#5-listen-to-the-completion-event)

```
// src/EventSubscriber/ImportCompletedSubscriber.php

use IQ2i\MessengerImportBundle\Event\ImportBatchCompletedEvent;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;

class ImportCompletedSubscriber implements EventSubscriberInterface
{
    public static function getSubscribedEvents(): array
    {
        return [
            ImportBatchCompletedEvent::class => 'onImportCompleted',
        ];
    }

    public function onImportCompleted(ImportBatchCompletedEvent $event): void
    {
        // Send a notification, trigger a report, clean up temporary files...
    }
}
```

API reference
-------------

[](#api-reference)

### `BatchAwareMessageInterface`

[](#batchawaremessageinterface)

Implement this interface on any message that belongs to an import batch.

MethodDescription`getBatchId(): ?string`Returns the batch ID, or `null` if the message is not part of a batch### `ImportBatchInterface`

[](#importbatchinterface)

MethodDescription`initialize(int $total): void`Sets the total message count and marks the batch as started`getTotal(): int`Total number of messages in the batch`getRemaining(): int`Number of messages not yet processed`getCreatedAt(): \DateTimeImmutable`When the batch was initialized`getCompletedAt(): ?\DateTimeImmutable`When the batch completed, `null` if still in progress`isComplete(): bool`Returns `true` when all messages have been processed`markComplete(): void`Sets `completedAt` (idempotent — safe to call multiple times)### `ImportBatchCompletedEvent`

[](#importbatchcompletedevent)

Dispatched when the last message in a batch has been handled (or permanently failed).

MethodDescription`getBatchId(): string`ID of the completed batch`getTotal(): int`Total number of messages that were processedNotes
-----

[](#notes)

- **Failed messages** are counted as processed only when all retry attempts are exhausted (`willRetry() === false`). A message that will be retried does not decrement the counter.
- **`completedAt`** is set atomically inside `ImportBatchRepositoryTrait::decrement()` the first time `remaining` reaches zero. It is safe in concurrent worker environments.
- The `decrement()` operation uses a single `UPDATE ... WHERE remaining > 0` query to prevent the counter from going below zero under concurrent load.

License
-------

[](#license)

MIT — see [LICENSE](LICENSE).

###  Health Score

18

—

LowBetter than 8% of packages

Maintenance55

Moderate activity, may be stable

Popularity0

Limited adoption so far

Community6

Small or concentrated contributor base

Maturity12

Early-stage or recently created project

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

### Community

Maintainers

![](https://www.gravatar.com/avatar/911eee5cdea5a7a363c88e69d05e828b21e544ebdf573df9f887e72b2c6f0eb8?d=identicon)[LoicSapone](/maintainers/LoicSapone)

---

Top Contributors

[![loicsapone](https://avatars.githubusercontent.com/u/4466694?v=4)](https://github.com/loicsapone "loicsapone (1 commits)")

### Embed Badge

![Health badge](/badges/iq2i-messenger-import-bundle/health.svg)

```
[![Health](https://phpackages.com/badges/iq2i-messenger-import-bundle/health.svg)](https://phpackages.com/packages/iq2i-messenger-import-bundle)
```

###  Alternatives

[league/geotools

Geo-related tools PHP 7.3+ library

1.4k5.5M30](/packages/league-geotools)[illuminate/bus

The Illuminate Bus package.

6045.5M512](/packages/illuminate-bus)[uecode/qpush-bundle

Asynchronous processing for Symfony using Push Queues

1682.5M2](/packages/uecode-qpush-bundle)[mayconbordin/l5-stomp-queue

Stomp Queue Driver for Laravel 5

121.1k](/packages/mayconbordin-l5-stomp-queue)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
