PHPackages                             micromus/kafka-bus-domain - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Queues &amp; Workers](/categories/queues)
4. /
5. micromus/kafka-bus-domain

Abandoned → [kafka-bus/messages](/?search=kafka-bus%2Fmessages)Library[Queues &amp; Workers](/categories/queues)

micromus/kafka-bus-domain
=========================

Messages Builder for Kafka Bus

v1.0.0(1mo ago)03[1 PRs](https://github.com/micromus/kafka-bus-messages/pulls)MITPHPPHP ^8.2CI passing

Since Oct 19Pushed 1w ago1 watchersCompare

[ Source](https://github.com/micromus/kafka-bus-messages)[ Packagist](https://packagist.org/packages/micromus/kafka-bus-domain)[ Docs](https://github.com/micromus/kafka-bus-messages)[ GitHub Sponsors](https://github.com/Micromus)[ RSS](/packages/micromus-kafka-bus-domain/feed)WikiDiscussions 1.x Synced 2d ago

READMEChangelog (10)Dependencies (17)Versions (23)Used By (0)

Kafka Bus Messages for PHP
==========================

[](#kafka-bus-messages-for-php)

[![Latest Version on Packagist](https://camo.githubusercontent.com/7547c80ab7f510ad9967dc6442af9e8194e6c5c9dededa76266698d46eb1b298/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f762f6d6963726f6d75732f6b61666b612d6275732d6d657373616765732e7376673f7374796c653d666c61742d737175617265)](https://packagist.org/packages/micromus/kafka-bus-messages)[![GitHub Tests Action Status](https://camo.githubusercontent.com/9b765c154934fdce54c87e2aadbe1c527fb374f250b8b2c633b7106335f28313/68747470733a2f2f696d672e736869656c64732e696f2f6769746875622f616374696f6e732f776f726b666c6f772f7374617475732f6d6963726f6d75732f6b61666b612d6275732f72756e2d74657374732e796d6c3f6272616e63683d312e78266c6162656c3d7465737473267374796c653d666c61742d737175617265)](https://github.com/micromus/kafka-bus/actions?query=workflow%3Arun-tests+branch%3A1.x)[![GitHub Code Style](https://camo.githubusercontent.com/a11fb804dbd4983589708fb21a60938f928ca87daf0cde8eda7bdcfe789e7d70/68747470733a2f2f696d672e736869656c64732e696f2f6769746875622f616374696f6e732f776f726b666c6f772f7374617475732f6d6963726f6d75732f6b61666b612d6275732f7068702d636f64652d7374796c652e796d6c3f6272616e63683d312e78266c6162656c3d636f64652d7374796c65267374796c653d666c61742d737175617265)](https://github.com/micromus/kafka-bus/actions?query=workflow%3Acode-style+branch%3A1.x)[![GitHub PHPStan](https://camo.githubusercontent.com/500221057232f65927ab01b0371660baa1f4961fba3d77a2add65a352a7b82af/68747470733a2f2f696d672e736869656c64732e696f2f6769746875622f616374696f6e732f776f726b666c6f772f7374617475732f6d6963726f6d75732f6b61666b612d6275732f7068707374616e2e796d6c3f6272616e63683d312e78266c6162656c3d7068707374616e267374796c653d666c61742d737175617265)](https://github.com/micromus/kafka-bus/actions?query=workflow%3Aphpstan+branch%3A1.x)

A PHP library for structuring, serializing, and deserializing Kafka messages. It provides typed message payloads with automatic casting, domain event messages, and factory helpers for both production and testing.

Installation
------------

[](#installation)

```
composer require micromus/kafka-bus-messages
```

Core Concepts
-------------

[](#core-concepts)

- **`Payload`** — a flexible key-value container that supports typed attribute casting (dates, integers, floats, nested payloads, collections).
- **`JsonMessage`** — a `Payload` that serializes itself directly to a JSON Kafka message.
- **`DomainMessage`** — a structured message that wraps an attributes object with a domain event type (`create`, `update`, `delete`) and a list of dirty (changed) fields.
- **Casters** — classes that convert raw values on read (`cast`) and convert them back for serialization (`rollback`).

Usage
-----

[](#usage)

### 1. Defining a Domain Message

[](#1-defining-a-domain-message)

Extend `Payload` and define casters in `definitionCasters()` to type your fields automatically.

```
use Micromus\KafkaBusMessages\Data\Payload;
use Micromus\KafkaBusMessages\Data\Casters\PayloadCaster;
use Micromus\KafkaBusMessages\Data\Casters\CollectionCaster;
use Micromus\KafkaBusMessages\Data\Casters\IntegerCaster;

/**
 * @property int            $id
 * @property string         $name
 * @property CategoryPayload    $category
 * @property AttributePayload[] $attributes
 */
class ProductMessage extends \Micromus\KafkaBusMessages\DomainMessage
{
    public function getKey(): ?string
    {
        return (string) $this->id;
    }

    protected function definitionCasters(): array
    {
        return [
            'id'         => new IntegerCaster(),
            'category'   => new PayloadCaster(CategoryPayload::class),
            'attributes' => new CollectionCaster(new PayloadCaster(AttributePayload::class)),
        ];
    }
}

// Create from a raw array (e.g. decoded JSON)
$product = ProductMessage::from([
    'id'   => '42',
    'name' => 'Laptop',
    'category' => ['id' => 1, 'name' => 'Electronics'],
    'attributes' => [
        ['id' => 10, 'name' => 'Color', 'value' => 'Silver'],
    ],
]);

echo $product->id;               // int(42)
echo $product->category->name;   // string("Electronics")
echo $product->attributes[0]->value; // string("Silver")
```

### 2. Available Casters

[](#2-available-casters)

CasterDescription`IntegerCaster`Casts value to `int``FloatCaster`Casts value to `float``DateTimeCaster`Parses/formats `DateTimeInterface` with a configurable format`PayloadCaster`Hydrates a nested `Payload` subclass from an array`CollectionCaster`Applies another caster to each item in an array`NullableCaster`Wraps any caster to allow `null` values```
use Micromus\KafkaBusMessages\Data\Casters\DateTimeCaster;
use Micromus\KafkaBusMessages\Data\Casters\NullableCaster;
use Micromus\KafkaBusMessages\Data\Casters\FloatCaster;

protected function definitionCasters(): array
{
    return [
        'published_at' => new DateTimeCaster('Y-m-d\TH:i:s.uP'), // default format
        'deleted_at'   => new NullableCaster(new DateTimeCaster()),
        'price'        => new FloatCaster(),
    ];
}
```

### 3. Sending a JSON Message

[](#3-sending-a-json-message)

`JsonMessage` extends `Payload` and implements `ProducerMessageInterface`, so it can be published directly to Kafka.

```
use Micromus\KafkaBusMessages\JsonMessage;

$message = new JsonMessage([
    'order_id' => 123,
    'status'   => 'shipped',
]);

// Produces: {"order_id":123,"status":"shipped"}
$message->toPayload();
```

---

### 4. Sending a Domain Message

[](#4-sending-a-domain-message)

`DomainMessage` wraps an attributes object with a domain event type and a list of changed fields.

```
use Micromus\KafkaBusMessages\DomainMessage;
use Micromus\KafkaBusMessages\DomainEventEnum;

$attributes = [
    'id'   => 42,
    'name' => 'Laptop Pro',
    'category'   => ['id' => 1, 'name' => 'Electronics'],
    'attributes' => [],
];

// create / update / delete
$message = new ProductMessage(
    attributes: $attributes,
    event: DomainEventEnum::Update,
    dirty: ['name'],
);

// Produces JSON:
// {
//   "event": "update",
//   "attributes": { "id": 42, "name": "Laptop Pro", ... },
//   "dirty": ["name"]
// }
$message->toPayload();

// The Kafka partition key comes from getKey() on the attributes object
$message->getKey(); // "42"

// Send to bus
$bus->publish($message);
```

### 5. Consuming a Domain Message

[](#5-consuming-a-domain-message)

Use `DomainMessageFactory` to deserialize an incoming Kafka message into a typed `DomainMessage`.

```
use Micromus\KafkaBusMessages\Factories\DomainMessageFactory;

class ProductConsumer
{
    #[MessageFactory(new DomainMessageFactory(ProductMessage::class))]
    public function __invoke(ProductMessage $message)
    {
        echo $message->event->value;          // "update"
        echo $message->name;      // "Laptop Pro"
    }
}
```

### 6. Testing Helpers

[](#6-testing-helpers)

The library ships with factory base classes to generate realistic test data via [Faker](https://github.com/FakerPHP/Faker).

**Define a test factory:**

```
use Micromus\KafkaBusMessages\Testing\DomainMessageTestFactory;

/**
 * @extends DomainMessageTestFactory
 */
final class ProductTestFactory extends DomainMessageTestFactory
{
    protected string $messageClass = ProductMessage::class;

    public function definition(): array
    {
        return [
            'id'         => $this->faker->numberBetween(1, 9999),
            'name'       => $this->faker->sentence(),
            'category'   => CategoryPayloadTestFactory::new()->makeArray(),
            'attributes' => [
                AttributePayloadTestFactory::new()->makeArray(),
            ],
        ];
    }
}
```

**Use it in tests:**

```
// Build a typed DomainMessage with default fake data
$message = ProductMessageTestFactory::new()->message();

// Override specific fields
$message = ProductMessageTestFactory::new()
    ->withEvent(DomainEventEnum::Delete)
    ->withDirty(['name', 'category'])
    ->message(['name' => 'Laptop Pro']);

// Build a raw RdKafka\Message for lower-level consumer tests
$rdKafkaMessage = ProductMessageTestFactory::new()->make();

// Build just the raw array
$array = ProductTestFactory::new()->makeArray();
```

For payload-only factories, extend `PayloadTestFactory`:

```
use Micromus\KafkaBusMessages\Testing\PayloadTestFactory;

/**
 * @extends PayloadTestFactory
 */
final class CategoryPayloadTestFactory extends PayloadTestFactory
{
    protected string $payloadClass = CategoryPayload::class;

    public function definition(): array
    {
        return [
            'id'   => $this->faker->numberBetween(1, 9999),
            'name' => $this->faker->word(),
        ];
    }
}

$category = CategoryPayloadTestFactory::new()->payload();
```

Testing
-------

[](#testing)

```
composer test
```

Changelog
---------

[](#changelog)

Please see [CHANGELOG](CHANGELOG.md) for more information on what has changed recently.

Contributing
------------

[](#contributing)

Please see [CONTRIBUTING](CONTRIBUTING.md) for details.

Security Vulnerabilities
------------------------

[](#security-vulnerabilities)

Please review [our security policy](../../security/policy) on how to report security vulnerabilities.

Credits
-------

[](#credits)

- [Kirill Popkov](https://github.com/popkovkirill)
- [All Contributors](../../contributors)

License
-------

[](#license)

The MIT License (MIT). Please see [License File](LICENSE.md) for more information.

###  Health Score

45

—

FairBetter than 91% of packages

Maintenance95

Actively maintained with recent releases

Popularity3

Limited adoption so far

Community11

Small or concentrated contributor base

Maturity61

Established project with proven stability

 Bus Factor1

Top contributor holds 57.8% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~48 days

Recently: every ~1 days

Total

13

Last Release

39d ago

Major Versions

v0.2.0 → v1.0.0-RC12025-09-02

### Community

Maintainers

![](https://www.gravatar.com/avatar/920047b57e71b2959cf0934a7d6e45a3a18b639a3bc297cab6e0f0ccb488d535?d=identicon)[kEERill](/maintainers/kEERill)

---

Top Contributors

[![popkovkirill](https://avatars.githubusercontent.com/u/6718174?v=4)](https://github.com/popkovkirill "popkovkirill (26 commits)")[![dependabot[bot]](https://avatars.githubusercontent.com/in/29110?v=4)](https://github.com/dependabot[bot] "dependabot[bot] (12 commits)")[![github-actions[bot]](https://avatars.githubusercontent.com/in/15368?v=4)](https://github.com/github-actions[bot] "github-actions[bot] (5 commits)")[![keerill](https://avatars.githubusercontent.com/u/74487982?v=4)](https://github.com/keerill "keerill (2 commits)")

---

Tags

messagedtokafkaapache kafkamicromuskafka-bus

###  Code Quality

TestsPest

Static AnalysisPHPStan

Code StylePHP CS Fixer

Type Coverage Yes

### Embed Badge

![Health badge](/badges/micromus-kafka-bus-domain/health.svg)

```
[![Health](https://phpackages.com/badges/micromus-kafka-bus-domain/health.svg)](https://phpackages.com/packages/micromus-kafka-bus-domain)
```

###  Alternatives

[php-amqplib/php-amqplib

Formerly videlalvaro/php-amqplib. This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.

4.6k132.3M1.0k](/packages/php-amqplib-php-amqplib)[aws/aws-php-sns-message-validator

Amazon SNS message validation for PHP

22223.4M118](/packages/aws-aws-php-sns-message-validator)[ahmed-bhs/doctrine-doctor

Runtime analysis tool for Doctrine ORM integrated into Symfony Web Profiler. Unlike static linters, it analyzes actual query execution at runtime to detect performance bottlenecks, security vulnerabilities, and best practice violations during development with real execution context and data.

9410.8k](/packages/ahmed-bhs-doctrine-doctor)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
