PHPackages                             micromus/kafka-bus-messages - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. micromus/kafka-bus-messages

ActiveLibrary[Queues &amp; Workers](/categories/queues)

micromus/kafka-bus-messages
===========================

Messages Builder for Kafka Bus

v1.0.0(1mo ago)0337[1 PRs](https://github.com/micromus/kafka-bus-messages/pulls)1MITPHPPHP ^8.2CI passing

Since Oct 19Pushed 1mo ago1 watchersCompare

[ Source](https://github.com/micromus/kafka-bus-messages)[ Packagist](https://packagist.org/packages/micromus/kafka-bus-messages)[ Docs](https://github.com/micromus/kafka-bus-messages)[ GitHub Sponsors](https://github.com/Micromus)[ RSS](/packages/micromus-kafka-bus-messages/feed)WikiDiscussions 1.x Synced 3w ago

READMEChangelog (10)Dependencies (17)Versions (21)Used By (1)

Kafka Bus Messages for PHP
==========================

[](#kafka-bus-messages-for-php)

[![Latest Version on Packagist](https://camo.githubusercontent.com/7547c80ab7f510ad9967dc6442af9e8194e6c5c9dededa76266698d46eb1b298/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f762f6d6963726f6d75732f6b61666b612d6275732d6d657373616765732e7376673f7374796c653d666c61742d737175617265)](https://packagist.org/packages/micromus/kafka-bus-messages)[![GitHub Tests Action Status](https://camo.githubusercontent.com/9b765c154934fdce54c87e2aadbe1c527fb374f250b8b2c633b7106335f28313/68747470733a2f2f696d672e736869656c64732e696f2f6769746875622f616374696f6e732f776f726b666c6f772f7374617475732f6d6963726f6d75732f6b61666b612d6275732f72756e2d74657374732e796d6c3f6272616e63683d312e78266c6162656c3d7465737473267374796c653d666c61742d737175617265)](https://github.com/micromus/kafka-bus/actions?query=workflow%3Arun-tests+branch%3A1.x)[![GitHub Code Style](https://camo.githubusercontent.com/a11fb804dbd4983589708fb21a60938f928ca87daf0cde8eda7bdcfe789e7d70/68747470733a2f2f696d672e736869656c64732e696f2f6769746875622f616374696f6e732f776f726b666c6f772f7374617475732f6d6963726f6d75732f6b61666b612d6275732f7068702d636f64652d7374796c652e796d6c3f6272616e63683d312e78266c6162656c3d636f64652d7374796c65267374796c653d666c61742d737175617265)](https://github.com/micromus/kafka-bus/actions?query=workflow%3Acode-style+branch%3A1.x)[![GitHub PHPStan](https://camo.githubusercontent.com/500221057232f65927ab01b0371660baa1f4961fba3d77a2add65a352a7b82af/68747470733a2f2f696d672e736869656c64732e696f2f6769746875622f616374696f6e732f776f726b666c6f772f7374617475732f6d6963726f6d75732f6b61666b612d6275732f7068707374616e2e796d6c3f6272616e63683d312e78266c6162656c3d7068707374616e267374796c653d666c61742d737175617265)](https://github.com/micromus/kafka-bus/actions?query=workflow%3Aphpstan+branch%3A1.x)

A PHP library for structuring, serializing, and deserializing Kafka messages. It provides typed message payloads with automatic casting, domain event messages, and factory helpers for both production and testing.

Installation
------------

[](#installation)

```
composer require micromus/kafka-bus-messages
```

Core Concepts
-------------

[](#core-concepts)

- **`Payload`** — a flexible key-value container that supports typed attribute casting (dates, integers, floats, nested payloads, collections).
- **`JsonMessage`** — a `Payload` that serializes itself directly to a JSON Kafka message.
- **`DomainMessage`** — a structured message that wraps an attributes object with a domain event type (`create`, `update`, `delete`) and a list of dirty (changed) fields.
- **Casters** — classes that convert raw values on read (`cast`) and convert them back for serialization (`rollback`).

Usage
-----

[](#usage)

### 1. Defining a Domain Message

[](#1-defining-a-domain-message)

Extend `Payload` and define casters in `definitionCasters()` to type your fields automatically.

```
use Micromus\KafkaBusMessages\Data\Payload;
use Micromus\KafkaBusMessages\Data\Casters\PayloadCaster;
use Micromus\KafkaBusMessages\Data\Casters\CollectionCaster;
use Micromus\KafkaBusMessages\Data\Casters\IntegerCaster;

/**
 * @property int            $id
 * @property string         $name
 * @property CategoryPayload    $category
 * @property AttributePayload[] $attributes
 */
class ProductMessage extends \Micromus\KafkaBusMessages\DomainMessage
{
    public function getKey(): ?string
    {
        return (string) $this->id;
    }

    protected function definitionCasters(): array
    {
        return [
            'id'         => new IntegerCaster(),
            'category'   => new PayloadCaster(CategoryPayload::class),
            'attributes' => new CollectionCaster(new PayloadCaster(AttributePayload::class)),
        ];
    }
}

// Create from a raw array (e.g. decoded JSON)
$product = ProductMessage::from([
    'id'   => '42',
    'name' => 'Laptop',
    'category' => ['id' => 1, 'name' => 'Electronics'],
    'attributes' => [
        ['id' => 10, 'name' => 'Color', 'value' => 'Silver'],
    ],
]);

echo $product->id;               // int(42)
echo $product->category->name;   // string("Electronics")
echo $product->attributes[0]->value; // string("Silver")
```

### 2. Available Casters

[](#2-available-casters)

CasterDescription`IntegerCaster`Casts value to `int``FloatCaster`Casts value to `float``DateTimeCaster`Parses/formats `DateTimeInterface` with a configurable format`PayloadCaster`Hydrates a nested `Payload` subclass from an array`CollectionCaster`Applies another caster to each item in an array`NullableCaster`Wraps any caster to allow `null` values```
use Micromus\KafkaBusMessages\Data\Casters\DateTimeCaster;
use Micromus\KafkaBusMessages\Data\Casters\NullableCaster;
use Micromus\KafkaBusMessages\Data\Casters\FloatCaster;

protected function definitionCasters(): array
{
    return [
        'published_at' => new DateTimeCaster('Y-m-d\TH:i:s.uP'), // default format
        'deleted_at'   => new NullableCaster(new DateTimeCaster()),
        'price'        => new FloatCaster(),
    ];
}
```

### 3. Sending a JSON Message

[](#3-sending-a-json-message)

`JsonMessage` extends `Payload` and implements `ProducerMessageInterface`, so it can be published directly to Kafka.

```
use Micromus\KafkaBusMessages\JsonMessage;

$message = new JsonMessage([
    'order_id' => 123,
    'status'   => 'shipped',
]);

// Produces: {"order_id":123,"status":"shipped"}
$message->toPayload();
```

---

### 4. Sending a Domain Message

[](#4-sending-a-domain-message)

`DomainMessage` wraps an attributes object with a domain event type and a list of changed fields.

```
use Micromus\KafkaBusMessages\DomainMessage;
use Micromus\KafkaBusMessages\DomainEventEnum;

$attributes = [
    'id'   => 42,
    'name' => 'Laptop Pro',
    'category'   => ['id' => 1, 'name' => 'Electronics'],
    'attributes' => [],
];

// create / update / delete
$message = new ProductMessage(
    attributes: $attributes,
    event: DomainEventEnum::Update,
    dirty: ['name'],
);

// Produces JSON:
// {
//   "event": "update",
//   "attributes": { "id": 42, "name": "Laptop Pro", ... },
//   "dirty": ["name"]
// }
$message->toPayload();

// The Kafka partition key comes from getKey() on the attributes object
$message->getKey(); // "42"

// Send to bus
$bus->publish($message);
```

### 5. Consuming a Domain Message

[](#5-consuming-a-domain-message)

Use `DomainMessageFactory` to deserialize an incoming Kafka message into a typed `DomainMessage`.

```
use Micromus\KafkaBusMessages\Factories\DomainMessageFactory;

class ProductConsumer
{
    #[MessageFactory(new DomainMessageFactory(ProductMessage::class))]
    public function __invoke(ProductMessage $message)
    {
        echo $message->event->value;          // "update"
        echo $message->name;      // "Laptop Pro"
    }
}
```

### 6. Testing Helpers

[](#6-testing-helpers)

The library ships with factory base classes to generate realistic test data via [Faker](https://github.com/FakerPHP/Faker).

**Define a test factory:**

```
use Micromus\KafkaBusMessages\Testing\DomainMessageTestFactory;

/**
 * @extends DomainMessageTestFactory
 */
final class ProductTestFactory extends DomainMessageTestFactory
{
    protected string $messageClass = ProductMessage::class;

    public function definition(): array
    {
        return [
            'id'         => $this->faker->numberBetween(1, 9999),
            'name'       => $this->faker->sentence(),
            'category'   => CategoryPayloadTestFactory::new()->makeArray(),
            'attributes' => [
                AttributePayloadTestFactory::new()->makeArray(),
            ],
        ];
    }
}
```

**Use it in tests:**

```
// Build a typed DomainMessage with default fake data
$message = ProductMessageTestFactory::new()->message();

// Override specific fields
$message = ProductMessageTestFactory::new()
    ->withEvent(DomainEventEnum::Delete)
    ->withDirty(['name', 'category'])
    ->message(['name' => 'Laptop Pro']);

// Build a raw RdKafka\Message for lower-level consumer tests
$rdKafkaMessage = ProductMessageTestFactory::new()->make();

// Build just the raw array
$array = ProductTestFactory::new()->makeArray();
```

For payload-only factories, extend `PayloadTestFactory`:

```
use Micromus\KafkaBusMessages\Testing\PayloadTestFactory;

/**
 * @extends PayloadTestFactory
 */
final class CategoryPayloadTestFactory extends PayloadTestFactory
{
    protected string $payloadClass = CategoryPayload::class;

    public function definition(): array
    {
        return [
            'id'   => $this->faker->numberBetween(1, 9999),
            'name' => $this->faker->word(),
        ];
    }
}

$category = CategoryPayloadTestFactory::new()->payload();
```

Testing
-------

[](#testing)

```
composer test
```

Changelog
---------

[](#changelog)

Please see [CHANGELOG](CHANGELOG.md) for more information on what has changed recently.

Contributing
------------

[](#contributing)

Please see [CONTRIBUTING](CONTRIBUTING.md) for details.

Security Vulnerabilities
------------------------

[](#security-vulnerabilities)

Please review [our security policy](../../security/policy) on how to report security vulnerabilities.

Credits
-------

[](#credits)

- [Kirill Popkov](https://github.com/popkovkirill)
- [All Contributors](../../contributors)

License
-------

[](#license)

The MIT License (MIT). Please see [License File](LICENSE.md) for more information.

###  Health Score

49

—

FairBetter than 94% of packages

Maintenance93

Actively maintained with recent releases

Popularity17

Limited adoption so far

Community15

Small or concentrated contributor base

Maturity61

Established project with proven stability

 Bus Factor2

2 contributors hold 50%+ of commits

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~48 days

Recently: every ~1 days

Total

13

Last Release

31d ago

Major Versions

v0.2.0 → v1.0.0-RC12025-09-02

### Community

Maintainers

![](https://www.gravatar.com/avatar/920047b57e71b2959cf0934a7d6e45a3a18b639a3bc297cab6e0f0ccb488d535?d=identicon)[kEERill](/maintainers/kEERill)

---

Top Contributors

[![dependabot[bot]](https://avatars.githubusercontent.com/in/29110?v=4)](https://github.com/dependabot[bot] "dependabot[bot] (9 commits)")[![popkovkirill](https://avatars.githubusercontent.com/u/6718174?v=4)](https://github.com/popkovkirill "popkovkirill (8 commits)")[![github-actions[bot]](https://avatars.githubusercontent.com/in/15368?v=4)](https://github.com/github-actions[bot] "github-actions[bot] (4 commits)")[![keerill](https://avatars.githubusercontent.com/u/74487982?v=4)](https://github.com/keerill "keerill (2 commits)")

---

Tags

messagedtokafkaapache kafkamicromuskafka-bus

###  Code Quality

TestsPest

Static AnalysisPHPStan

Code StylePHP CS Fixer

Type Coverage Yes

### Embed Badge

![Health badge](/badges/micromus-kafka-bus-messages/health.svg)

```
[![Health](https://phpackages.com/badges/micromus-kafka-bus-messages/health.svg)](https://phpackages.com/packages/micromus-kafka-bus-messages)
```

###  Alternatives

[php-amqplib/php-amqplib

Formerly videlalvaro/php-amqplib. This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.

4.6k129.9M973](/packages/php-amqplib-php-amqplib)[aws/aws-php-sns-message-validator

Amazon SNS message validation for PHP

21522.7M111](/packages/aws-aws-php-sns-message-validator)[fotografde/cakephp-sms

SMS Plugin for CakePHP

1273.2k](/packages/fotografde-cakephp-sms)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
