PHPackages                             serrvius/amqp-rpc-extender - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. serrvius/amqp-rpc-extender

ActiveSymfony-messenger-bridge-extender[Queues &amp; Workers](/categories/queues)

serrvius/amqp-rpc-extender
==========================

The extension for symfony/amqp-messenger to implement RPC calls

v1.4.9(2y ago)31.4k[2 issues](https://github.com/serrvius/amqp-rpc-extender/issues)MITPHPPHP &gt;=8.1

Since Nov 27Pushed 2y ago1 watchersCompare

[ Source](https://github.com/serrvius/amqp-rpc-extender)[ Packagist](https://packagist.org/packages/serrvius/amqp-rpc-extender)[ RSS](/packages/serrvius-amqp-rpc-extender/feed)WikiDiscussions main Synced 1mo ago

READMEChangelog (5)Dependencies (4)Versions (29)Used By (0)

About
=====

[](#about)

The extension of **symfony/amqp-messenger** that allow to implement CQRS/gRPC (call and wait response) in RabbitMQ Broker, idea has to used it in microservices architecture

What it does
------------

[](#what-it-does)

This bundle use `ampq://` default transporter of symfony. The CompilerPass of bundle connect those layers of default amqp-messenger to control the messages that have specific stamp. The queues type is recommended to be used `direct`

Requirements
------------

[](#requirements)

```
 * php: >=8.1
 * ext-amqp
 * symfony: 6.1.*

```

Installation
------------

[](#installation)

```
composer require serrvius/amqp-rpc-extender
```

Usage
-----

[](#usage)

```
# GATEWAY -  messenger.yaml
framework:
  framework:
  messenger:
    serializer:
      default_serializer: messenger.transport.rpc.symfony_serializer
    default_bus: command.bus
    buses:
      command.bus: ~
      query.bus: ~

    transports:
      users_commands:
        dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
        options:
          exchange:
            name: user_command
            type: direct
          queues:
            user_create:
              binding_keys: [ user_create ]
            user_update:
              binding_keys: [ user_update ]
            user_delete:
              binding_keys: [ user_delete ]
      queries:
        dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
        options:
          exchange:
            name: queries
            type: direct
          queues:
            user_queries:
              binding_keys: [ user_queries ]
```

```
# Microservice (let say user)-  messenger.yaml
framework:
  messenger:
    serializer:
      default_serializer: messenger.transport.rpc.symfony_serializer
      symfony_serializer:
        format: json
        context: { }
    default_bus: command.bus
    buses:
      command.bus: ~
      query.bus: ~
    transports:
      user_commands:
        dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
        options:
          exchange: { name: user_command,  type: direct }
          queues:
            user_create:
              binding_keys: [ user_create, user_update, user_delete ]
      queries:
        dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
        options:
          exchange:
            name: queries
            type: direct
          queues:
            user_queries:
              binding_keys: [ user_queries ]
```

We need to replace de default symfony messenger serializer `messenger.transport.symfony_serializer` with the one that provide the extension `messenger.transport.rpc.symfony_serializer`, that is needed to encode and decode the messages in the right way.

---

#### Gateway side

[](#gateway-side)

All call initiator messages (from the gateway) classes need to implement the interface:

```
Serrvius\AmqpRpcExtender\Interfaces\AmqpRpcRequestInterface

```

the abstract method `amqpRpcStamp` should return the instance of

```
Serrvius\AmqpRpcExtender\Interfaces\AmqpRpcStampInterface

```

For command messages (classic async work - don't need the callback) the method: `amqpRpcStamp` should return: `Serrvius\AmqpRpcExtender\Stamp\AmqpRpcCommandStamp`

For query messages (need response from microservice) the method: `amqpRpcStamp` should return: `Serrvius\AmqpRpcExtender\Stamp\ Serrvius\AmqpRpcExtender\Stamp\AmqpRpcQueryStamp`

Examples of gateway messages:
-----------------------------

[](#examples-of-gateway-messages)

Query message:

```
