PHPackages                             wapplersystems/messenger - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. wapplersystems/messenger

ActiveTypo3-cms-extension[Queues &amp; Workers](/categories/queues)

wapplersystems/messenger
========================

Default symfony messenger commands

93224[2 issues](https://github.com/WapplerSystems/messenger/issues)PHP

Since Jan 13Pushed 3y ago2 watchersCompare

[ Source](https://github.com/WapplerSystems/messenger)[ Packagist](https://packagist.org/packages/wapplersystems/messenger)[ RSS](/packages/wapplersystems-messenger/feed)WikiDiscussions main Synced 5d ago

READMEChangelogDependenciesVersions (1)Used By (0)

TYPO3 symfony messenger adapter
===============================

[](#typo3-symfony-messenger-adapter)

Integrates Symfony Messenger into TYPO3

Available commands
------------------

[](#available-commands)

This extension makes the following commands available:

```
 messenger
  messenger:stop-workers                      Stop workers after their current message
  messenger:setup-transports                  Prepare the required infrastructure for the transport
  messenger:consume                           Consume messages
  messenger:failed:retry                      Retry one or more messages from the failure transport
  messenger:failed:show                       Show one or more messages from the failure transport
  messenger:failed:remove                     Remove given messages from the failure transport

```

Integration guide
-----------------

[](#integration-guide)

Here is an example which uses a sql table.

Require the `wapplersystems/messenger` package in your composer.json.

Create these files in your own extension:

Configuration/Services.php

```
namespace TYPO3\CMS\Core;

use Symfony\Component\DependencyInjection\Compiler\PassConfig;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Loader\Configurator\ContainerConfigurator;
use WapplerSystems\Messenger\DependencyInjection\MessengerExtensionConfigPass;

return static function (ContainerConfigurator $container, ContainerBuilder $containerBuilder) {

    $containerBuilder->addCompilerPass(new MessengerExtensionConfigPass(''),PassConfig::TYPE_BEFORE_OPTIMIZATION, 91);

};

```

Configuration/Services.yaml

```
  Vendor\YourExt\Service\FoobarService:
    public: true
    arguments:
      $bus: '@messenger.bus.default'

```

ext\_tables.sql

```
CREATE TABLE tx_yourext_foobar
(
	uid            int(11)                  NOT NULL auto_increment,
	failed         tinyint(4)   DEFAULT '0' NOT NULL,
	handled        tinyint(4)   DEFAULT '0' NOT NULL,
	envelope       text,
	delivered_at   int(11)      DEFAULT NULL,

	PRIMARY KEY (uid)
);

```

Configuration/Messenger.yaml

```
transports:
  foobar:
    dsn: 'foobar-transport://'
    failure_transport: false
    options: { }
routing:
  'Vendor\YourExt\Job\Foobar':
    senders:
      - messenger.transport.foobar

```

FoobarTransportFactory.php

```
namespace Vendor\YourExt\Messenger\Transport\FoobarTransportFactory;

use Vendor\YourExt\Messenger\FoobarTransport;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use TYPO3\CMS\Core\Utility\GeneralUtility;

class FoobarTransportFactory implements TransportFactoryInterface
{
    public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
    {
        return GeneralUtility::makeInstance(FoobarTransport::class);
    }

    public function supports(string $dsn, array $options): bool
    {
        return str_starts_with($dsn, 'foobar-transport://');
    }
}

```

Foobar.php

```
class Foobar
{

}

```

FoobarHandler.php

```

use Vendor\YourExt\Job\Foobar;
use Vendor\YourExt\Service\ReportsService;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use TYPO3\CMS\Core\Utility\GeneralUtility;

class FoobarHandler implements MessageHandlerInterface {
{
    public function __invoke(Foobar $foobar)
    {

    }
}

```

FoobarService.php

```
use Symfony\Component\Messenger\MessageBusInterface;
use Vendor\YourExt\Job\Foobar;

class FoobarService
{
    public function __construct(MessageBusInterface $bus)
    {
        $this->bus = $bus;
    }

    public function createJob() {
        $foobar = new Foobar();
        $this->bus->dispatch($foobar);
    }
}

```

ReportJobTransport.php

```
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use TYPO3\CMS\Core\Database\Connection;
use TYPO3\CMS\Core\Database\ConnectionPool;
use TYPO3\CMS\Core\Utility\GeneralUtility;

class FoobarTransport implements TransportInterface {

    public function __construct(SerializerInterface $serializer = null)
    {
        $this->serializer = $serializer ?? new PhpSerializer();
    }

    public function get(): iterable
    {
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable('tx_yourext_foobar');
        $row = $queryBuilder->select('*')
            ->from('tx_yourext_foobar')
            ->where($queryBuilder->expr()->eq('handled',0))
            ->setMaxResults(1)
            ->execute()
            ->fetchAssociative();

        if ($row === false) {
            return [];
        }

        $envelope = $this->serializer->decode([
            'body' => $row['envelope'],
        ]);

        return [$envelope->with(new TransportMessageIdStamp($row['uid']))];
    }

    public function ack(Envelope $envelope): void
    {
        $stamp = $envelope->last(TransportMessageIdStamp::class);
        if (!$stamp instanceof TransportMessageIdStamp) {
            throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
        }

        GeneralUtility::makeInstance(ConnectionPool::class)
            ->getConnectionForTable('tx_yourext_foobar')
            ->update(
                'tx_yourext_foobar',
                ['handled' => 1, 'delivered_at' => time()],
                ['uid' => (int)$stamp->getId()],
                [Connection::PARAM_INT]
            );

    }

    public function reject(Envelope $envelope): void
    {
        $stamp = $envelope->last(TransportMessageIdStamp::class);
        if (!$stamp instanceof TransportMessageIdStamp) {
            throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
        }

        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable('tx_yourext_foobar');
        $queryBuilder->delete('tx_yourext_foobar')
            ->where($queryBuilder->expr()->eq('uid',$queryBuilder->createNamedParameter($stamp->getId(),\PDO::PARAM_INT)))
            ->executeQuery();

    }

    public function send(Envelope $envelope): Envelope
    {
        $encodedMessage = $this->serializer->encode($envelope);

        GeneralUtility::makeInstance(ConnectionPool::class)->getConnectionForTable('tx_yourext_foobar')
            ->insert(
                'tx_yourext_foobar',
                ['envelope' => $encodedMessage['body'], 'delivered_at' => null],
                ['handled' => 0]
            );
        $uid = (int)GeneralUtility::makeInstance(ConnectionPool::class)->getConnectionForTable('tx_yourext_foobar')->lastInsertId('tx_yourext_foobar');

        return $envelope->with(new TransportMessageIdStamp($uid));
    }

}

```

###  Health Score

19

—

LowBetter than 10% of packages

Maintenance16

Infrequent updates — may be unmaintained

Popularity20

Limited adoption so far

Community13

Small or concentrated contributor base

Maturity23

Early-stage or recently created project

 Bus Factor2

2 contributors hold 50%+ of commits

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

### Community

Maintainers

![](https://www.gravatar.com/avatar/a944bb90af783b13d74049f2d8adcff598f4e7cb0aad9d7040a6af0bb8c23984?d=identicon)[svewap](/maintainers/svewap)

---

Top Contributors

[![svewap](https://avatars.githubusercontent.com/u/1734738?v=4)](https://github.com/svewap "svewap (9 commits)")[![sabbelasichon](https://avatars.githubusercontent.com/u/13050560?v=4)](https://github.com/sabbelasichon "sabbelasichon (6 commits)")[![thommyhh](https://avatars.githubusercontent.com/u/13288620?v=4)](https://github.com/thommyhh "thommyhh (6 commits)")

---

Tags

messengerqueuesymfonytypo3-cms-extension

### Embed Badge

![Health badge](/badges/wapplersystems-messenger/health.svg)

```
[![Health](https://phpackages.com/badges/wapplersystems-messenger/health.svg)](https://phpackages.com/packages/wapplersystems-messenger)
```

###  Alternatives

[league/geotools

Geo-related tools PHP 7.3+ library

1.4k5.3M26](/packages/league-geotools)[amphp/parser

A generator parser to make streaming parsers simple.

14952.8M16](/packages/amphp-parser)[amphp/serialization

Serialization tools for IPC and data storage in PHP.

13451.1M18](/packages/amphp-serialization)[enqueue/enqueue

Message Queue Library

19820.0M56](/packages/enqueue-enqueue)[deliciousbrains/wp-background-processing

WP Background Processing can be used to fire off non-blocking asynchronous requests or as a background processing tool, allowing you to queue tasks.

1.1k409.8k6](/packages/deliciousbrains-wp-background-processing)[react/async

Async utilities and fibers for ReactPHP

2238.8M171](/packages/react-async)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
