PHPackages                             kafka-bus/laravel-bridge - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Utility &amp; Helpers](/categories/utility)
4. /
5. kafka-bus/laravel-bridge

ActiveLibrary[Utility &amp; Helpers](/categories/utility)

kafka-bus/laravel-bridge
========================

This is my package laravel-kafka-bus

v1.0.0(2d ago)00MITPHPPHP ^8.2CI passing

Since Jun 7Pushed 2d agoCompare

[ Source](https://github.com/kafka-bus/laravel-bridge)[ Packagist](https://packagist.org/packages/kafka-bus/laravel-bridge)[ Docs](https://github.com/kafka-bus/laravel-bridge)[ GitHub Sponsors](https://github.com/Micromus)[ RSS](/packages/kafka-bus-laravel-bridge/feed)WikiDiscussions 1.x Synced 2d ago

READMEChangelog (1)Dependencies (13)Versions (2)Used By (0)

Kafka Bus for Laravel
=====================

[](#kafka-bus-for-laravel)

[![Latest Version on Packagist](https://camo.githubusercontent.com/825e066d88c98a19957c0ceab5403aa049afd0ee8d5ed095e9b4b5529de92a54/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f762f6b61666b612d6275732f6c61726176656c2d6272696467652e7376673f7374796c653d666c61742d737175617265)](https://packagist.org/packages/kafka-bus/laravel-bridge)[![GitHub Tests Action Status](https://camo.githubusercontent.com/6802f204c285626dd659aed219babcf0b8c9e7a38206f4926f59f68c947a0781/68747470733a2f2f696d672e736869656c64732e696f2f6769746875622f616374696f6e732f776f726b666c6f772f7374617475732f6b61666b612d6275732f6c61726176656c2d6272696467652f72756e2d74657374732e796d6c3f6272616e63683d312e78266c6162656c3d7465737473267374796c653d666c61742d737175617265)](https://github.com/kafka-bus/laravel-bridge/actions?query=workflow%3Arun-tests+branch%3A1.x)[![GitHub Code Style](https://camo.githubusercontent.com/06987f1adc8b99989add966c2a6760b27d3b86b8ebc2e765a145f56264f72323/68747470733a2f2f696d672e736869656c64732e696f2f6769746875622f616374696f6e732f776f726b666c6f772f7374617475732f6b61666b612d6275732f6c61726176656c2d6272696467652f7068702d636f64652d7374796c652e796d6c3f6272616e63683d312e78266c6162656c3d636f64652d7374796c65267374796c653d666c61742d737175617265)](https://github.com/kafka-bus/laravel-bridge/actions?query=workflow%3Acode-style+branch%3A1.x)[![GitHub PHPStan](https://camo.githubusercontent.com/9ff672609d205f813fd8eae189ac646a88c766ed0dcb2155938b0d7ac99795dc/68747470733a2f2f696d672e736869656c64732e696f2f6769746875622f616374696f6e732f776f726b666c6f772f7374617475732f6b61666b612d6275732f6c61726176656c2d6272696467652f7068707374616e2e796d6c3f6272616e63683d312e78266c6162656c3d7068707374616e267374796c653d666c61742d737175617265)](https://github.com/kafka-bus/laravel-bridge/actions?query=workflow%3Aphpstan+branch%3A1.x)[![Total Downloads](https://camo.githubusercontent.com/87af2348a8775d4a6308634493e19ab115a78658e791e55e2296341b0c66eb66/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f64742f6b61666b612d6275732f6c61726176656c2d6272696467652e7376673f7374796c653d666c61742d737175617265)](https://packagist.org/packages/kafka-bus/laravel-bridge)

Laravel integration for [`kafka-bus`](https://github.com/kafka-bus/kafka-bus) — a configuration-driven Apache Kafka client built on top of `ext-rdkafka`. The package wires producers, consumer workers, topic routing, and middleware into the framework, and ships an optional **Commiter** component for idempotent message handling backed by the database.

Requirements
------------

[](#requirements)

- PHP `^8.2`
- Laravel `^10.0 || ^11.0 || ^12.0`
- `ext-rdkafka`

Installation
------------

[](#installation)

Install the package via Composer:

```
composer require kafka-bus/laravel-bridge
```

Publish the main configuration file:

```
php artisan vendor:publish --tag=kafka-bus
```

To use the Commiter component (idempotency and commit tracking), additionally publish its configuration and migrations:

```
php artisan vendor:publish --tag=kafka-bus-commiter
php artisan migrate
```

Configuration
-------------

[](#configuration)

The main configuration lives in `config/kafka-bus.php` and is split into four sections:

- `connections` — Kafka broker connections and driver-specific options.
- `topics` — logical topic keys mapped to physical Kafka topic names.
- `consumers` — workers, topic-to-handler bindings, middleware, and consumer options.
- `producers` — message-to-topic routes, middleware, and producer options.

### Connections

[](#connections)

Each connection is selected by a `driver` and a set of `options` passed straight to `librdkafka`. The `default` key picks the active connection by name.

```
'default' => env('KAFKA_CONNECTION', 'kafka'),

'connections' => [
    'kafka' => [
        'driver' => 'kafka',
        'options' => [
            'metadata.broker.list' => env('KAFKA_BROKER_LIST', 'localhost:9092'),
            'security.protocol'    => env('KAFKA_SECURITY_PROTOCOL', 'SASL_PLAINTEXT'),
            'sasl.mechanisms'      => env('KAFKA_SASL_MECHANISMS', 'PLAIN'),
            'sasl.username'        => env('KAFKA_SASL_USERNAME'),
            'sasl.password'        => env('KAFKA_SASL_PASSWORD'),
            'debug'                => env('KAFKA_DEBUG', false),
        ],
    ],

    'testing' => [
        'driver'  => 'null',
        'options' => [],
    ],
],
```

The `null` driver is useful for tests — calls to the bus succeed without touching a real broker.

### Topics

[](#topics)

Topic names usually depend on the environment. The bus prepends `topic_prefix` to every physical topic name, and the `topics` map binds a short logical key to that physical name.

```
'topic_prefix' => env('KAFKA_PREFIX', env('APP_ENV', 'local').'.'),

'topics' => [
    'products' => 'fact.products.1',
    'orders'   => 'fact.orders.1',
],
```

With `APP_ENV=production`, `products` resolves to `production.fact.products.1`.

### Producers

[](#producers)

A producer route binds a message class to a logical topic key. The shortest form maps the class directly to a topic key:

```
'producers' => [
    'middleware' => [
        // KafkaBus\Commiter\Middleware\ProducerIdempotencyMiddleware::class,
    ],

    'routes' => [
        App\Kafka\Messages\ProductMessage::class => 'products',
    ],

    'flush_timeout' => 5000,
    'flush_retries' => 5,

    'additional_options' => [
        'compression.codec' => env('KAFKA_PRODUCER_COMPRESSION_CODEC', 'snappy'),
    ],
],
```

The verbose form lets you override timeouts, append per-route middleware, and pass driver options:

```
'routes' => [
    App\Kafka\Messages\ProductMessage::class => [
        'topic_key'          => 'products',
        'middleware'         => [App\Kafka\Middleware\AuditTrailMiddleware::class],
        'additional_options' => [],
        'flush_timeout'      => 5000,
        'flush_retries'      => 5,
    ],
],
```

Publish a message through the bus:

```
use KafkaBus\Core\Interfaces\Bus\BusInterface;

public function execute(BusInterface $bus): void
{
    $bus->publish(new \App\Kafka\Messages\ProductMessage(/* ... */));
}
```

Or via the `KafkaBus` facade:

```
use KafkaBus\Laravel\Facades\KafkaBus;

KafkaBus::publish(new \App\Kafka\Messages\ProductMessage(/* ... */));
```

### Consumers

[](#consumers)

Workers are the units consumed by the artisan command. Each worker subscribes to one or more topics and dispatches incoming messages to handler classes.

```
'consumers' => [
    'middleware' => [
        // KafkaBus\Commiter\Middleware\ConsumerCommiterMiddleware::class,
    ],

    'workers' => [
        // Multi-topic worker with per-worker overrides
        'default' => [
            'middleware'   => [],
            'auto_commit'  => false,
            'consume_timeout' => 20000,
            'topics' => [
                'products' => App\Kafka\Consumers\ProductsTopicConsumer::class,
                'orders'   => [
                    'handler'    => App\Kafka\Consumers\OrdersTopicConsumer::class,
                    'middleware' => [App\Kafka\Middleware\TenantContextMiddleware::class],
                ],
            ],
        ],

        // Single-topic worker, worker name == topic key
        'products' => App\Kafka\Consumers\ProductsTopicConsumer::class,

        // Single-topic worker with overrides, worker name == topic key
        'orders' => [
            'middleware' => [],
            'handler'    => App\Kafka\Consumers\OrdersTopicConsumer::class,
        ],

        // Single-topic worker where worker name != topic key
        'products-secondary' => [
            'topic_key'  => 'products',
            'middleware' => [],
            'handler'    => App\Kafka\Consumers\ProductsTopicConsumer::class,
        ],
    ],

    'auto_commit'     => env('KAFKA_CONSUMER_AUTO_COMMIT', false),
    'consume_timeout' => 5_000,

    'additional_options' => [
        'group.id'              => env('KAFKA_CONSUMER_GROUP_ID', env('APP_NAME')),
        'max.poll.interval.ms'  => env('KAFKA_MAX_POLL_INTERVAL_MS', 300_000),
        'session.timeout.ms'    => env('KAFKA_SESSION_TIMEOUT_MS', 45_000),
        'heartbeat.interval.ms' => env('KAFKA_HEARTBEAT_INTERVAL_MS', 3_000),
        'auto.offset.reset'     => 'beginning',
    ],
],
```

Each worker resolves options in this order: `additional_options`, `auto_commit`, `consume_timeout`, and middleware are taken from the worker entry, then merged with the global `consumers.*` defaults.

Run a worker:

```
php artisan kafka:consume default
```

Artisan commands
----------------

[](#artisan-commands)

CommandDescription`kafka:consume {workerName}`Start a long-running consumer for the given worker.`kafka:worker:list`Show registered workers, their topic keys, resolved topic names, handlers, consumer middleware, and route middleware.`kafka:route:list`Show registered producer routes (message class → topic) and middleware.`kafka:offset:show {workerName}`Show current / min / max offsets for every partition of every topic the worker subscribes to.`kafka:offset:set {workerName} {topicKey} {offset} {--partition=}`Set the committed offset for a topic. `offset` accepts `earliest`, `latest`, or a numeric value. Omit `--partition` to apply to all partitions of the topic.### Inspecting workers and routes

[](#inspecting-workers-and-routes)

```
php artisan kafka:worker:list
```

```
+----------+-----------+----------------------------+------------------------------------------+---------------------+----------------+
| Worker   | Topic key | Topic name                 | Handler                                  | Consumer Middleware | Route Middleware|
+----------+-----------+----------------------------+------------------------------------------+---------------------+----------------+
| default  | products  | production.fact.products.1 | App\Kafka\Consumers\ProductsTopicConsumer|                     |                |
| default  | orders    | production.fact.orders.1   | App\Kafka\Consumers\OrdersTopicConsumer  | App\...\AuditMiddleware | App\...\TenantMiddleware |
| products | products  | production.fact.products.1 | App\Kafka\Consumers\ProductsTopicConsumer|                     |                |
+----------+-----------+----------------------------+------------------------------------------+---------------------+----------------+

```

```
php artisan kafka:route:list
```

```
+----------------------------------------+-----------+----------------------------+------------+
| Message                                | Topic key | Topic name                 | Middleware |
+----------------------------------------+-----------+----------------------------+------------+
| App\Kafka\Messages\ProductMessage      | products  | production.fact.products.1 | 0          |
| App\Kafka\Messages\OrderMessage        | orders    | production.fact.orders.1   | 1          |
+----------------------------------------+-----------+----------------------------+------------+

```

### Inspecting and resetting offsets

[](#inspecting-and-resetting-offsets)

```
php artisan kafka:offset:show default
```

```
+-----------+----------------------------+-----------+---------+-----+-----+
| Topic key | Topic name                 | Partition | Current | Min | Max |
+-----------+----------------------------+-----------+---------+-----+-----+
| products  | production.fact.products.1 | 0         | 142     | 0   | 200 |
| products  | production.fact.products.1 | 1         | 90      | 0   | 150 |
+-----------+----------------------------+-----------+---------+-----+-----+

```

Reset all partitions of a topic to the earliest available offset:

```
php artisan kafka:offset:set default products earliest
```

Move a single partition to an explicit numeric offset:

```
php artisan kafka:offset:set default products 150 --partition=0
```

Jump every partition to the high-water mark (skip backlog):

```
php artisan kafka:offset:set default products latest
```

The command prints the resulting offsets:

```
+-----------+----------------------------+-----------+-----+-----+
| Topic key | Topic name                 | Partition | Old | New |
+-----------+----------------------------+-----------+-----+-----+
| products  | production.fact.products.1 | 0         | 142 | 0   |
| products  | production.fact.products.1 | 1         | 90  | 0   |
+-----------+----------------------------+-----------+-----+-----+

```

> The worker must not be running while you reset its offsets — otherwise the active consumer group will overwrite the new position on its next commit.

Commiter
--------

[](#commiter)

The Commiter component (powered by `micromus/kafka-bus-commiter`) provides:

- **Consumer idempotency** — every incoming message is tracked in the `kafka_bus_commits` table; duplicates are skipped, retries are counted, and a configurable max-attempt threshold can stop poison messages.
- **Producer idempotency keys** — outgoing messages implementing `HasIdempotency` automatically receive an `x-idempotency-key` header, which the consumer side uses as the dedup key.

It is registered automatically by `CommiterServiceProvider` (loaded via package auto-discovery).

### Configuration

[](#configuration-1)

```
// config/kafka-bus-commiter.php
return [
    'connection' => env('KAFKA_COMMITER_CONNECTION'),
    'table'      => 'kafka_bus_commits',
    'repository' => env('KAFKA_COMMITER_REPOSITORY', 'idempotency'),

    'repositories' => [
        'idempotency' => \KafkaBus\Commiter\Repositories\IdempotencyMessageRepository::class,
        'native'      => \KafkaBus\Commiter\Repositories\NativeMessageRepository::class,
    ],
];
```

- `connection` — Laravel database connection name; `null` uses the default connection.
- `table` — name of the commits table created by the published migration.
- `repository` — strategy for deriving the dedup key:
    - `idempotency` — reads the `x-idempotency-key` header combined with the topic name, falling back to the raw Kafka message id if the header is missing.
    - `native` — uses the raw Kafka message id only.
- `repositories` — registry of repository implementations; add your own class here and reference it via `KAFKA_COMMITER_REPOSITORY`.

### Enabling the consumer middleware

[](#enabling-the-consumer-middleware)

Add `ConsumerCommiterMiddleware` to the consumer middleware stack — either globally for every worker, or only for specific workers/topics:

```
'consumers' => [
    'middleware' => [
        \KafkaBus\Commiter\Middleware\ConsumerCommiterMiddleware::class,
    ],

    'workers' => [
        'orders' => [
            'middleware' => [
                \KafkaBus\Commiter\Middleware\ConsumerCommiterMiddleware::class,
            ],
            'handler' => App\Kafka\Consumers\OrdersTopicConsumer::class,
        ],
    ],
],
```

For each message the middleware:

1. Resolves a dedup key via the configured repository.
2. If the key was already committed — the message is skipped and a warning is logged.
3. If the per-key attempt count exceeds `maxAttempt` (when configured) — the message is skipped and an error is logged.
4. Otherwise the pipeline is executed; on success the key is committed, on failure the attempt counter is incremented and the exception is re-thrown.

### Producing idempotent messages

[](#producing-idempotent-messages)

Implement `HasIdempotency` on the producer message and enable `ProducerIdempotencyMiddleware`:

```
use KafkaBus\Commiter\Interfaces\HasIdempotency;
use KafkaBus\Core\Messages\ProducerMessage;

final class ProductMessage extends ProducerMessage implements HasIdempotency
{
    public function __construct(private string $productId) {}

    public function getIdempotencyKey(): string
    {
        return $this->productId;
    }
}
```

```
'producers' => [
    'middleware' => [
        \KafkaBus\Commiter\Middleware\ProducerIdempotencyMiddleware::class,
    ],
],
```

The middleware adds the `x-idempotency-key` header to every outgoing message; consumers running `ConsumerCommiterMiddleware` with the `idempotency` repository will use it as the dedup key.

Testing
-------

[](#testing)

```
composer test
```

### KafkaBus::fake()

[](#kafkabusfake)

The `KafkaBus` facade ships a first-class fake that works exactly like `Event::fake()` or `Mail::fake()`. Call `KafkaBus::fake()` at the start of a test to replace the real `BusInterface` binding with an in-memory `FakeBus`. From that point on every call to the facade is forwarded to the fake — `publish()` calls are intercepted and stored, consumer pipelines can be triggered directly, and the full set of assertion methods becomes available.

```
use KafkaBus\Laravel\Facades\KafkaBus;

KafkaBus::fake();
```

#### Asserting producer messages

[](#asserting-producer-messages)

```
use App\Kafka\Messages\ProductMessage;
use KafkaBus\Laravel\Facades\KafkaBus;

it('publishes a product message', function () {
    KafkaBus::fake();

    app(CreateProductAction::class)->execute(productId: 1);

    KafkaBus::assertPublished(ProductMessage::class);
});
```

Assert with a callback to inspect the serialised `ProducerMessage` (after the full producer pipeline, including middleware). The callback receives a `Micromus\KafkaBus\Producers\Messages\ProducerMessage` instance:

```
KafkaBus::assertPublished(
    ProductMessage::class,
    fn($msg) => str_contains($msg->payload, '"id":1')
        && isset($msg->headers['x-idempotency-key'])
);
```

Other available assertions:

```
// Assert published exactly N times
KafkaBus::assertPublishedTimes(ProductMessage::class, 2);

// Assert a specific message was NOT published
KafkaBus::assertNotPublished(ProductMessage::class);

// Assert no messages were published at all
KafkaBus::assertNothingPublished();
```

Retrieve the published messages directly for custom assertions:

```
// list — serialised messages including payload, headers, topic
$messages = KafkaBus::getPublished(ProductMessage::class);
$all      = KafkaBus::allPublished();
```

#### Dispatching and asserting consumer messages

[](#dispatching-and-asserting-consumer-messages)

`addMessage()` queues an `RdKafka\Message` into the fake connection. Once queued, call `listen()` to run the full consumer path — `ConnectionFaker` → `ConsumerFaker` → `ConsumerStream` → consumer middleware → route middleware → handler → commit — without touching a real broker.

```
use KafkaBus\Core\Testing\Consumers\MessageFactory;
use KafkaBus\Laravel\Facades\KafkaBus;

it('handles a product message', function () {
    KafkaBus::fake();

    $message = MessageFactory::for()
        ->withTopicKey('products')
        ->withHeaders(['x-idempotency-key' => 'abc-123'])
        ->make('{"id":1,"name":"Widget"}');

    KafkaBus::addMessage($message);
    KafkaBus::listen('products');

    // Assert side effects produced by the handler
    expect(Product::find(1))->not->toBeNull();
});
```

Queue multiple messages before triggering `listen()`:

```
$factory = MessageFactory::for()->withTopicKey('products');

KafkaBus::addMessage($factory->make('{"id":1}'));
KafkaBus::addMessage($factory->make('{"id":2}'));

KafkaBus::listen('products');
```

#### Asserting committed messages

[](#asserting-committed-messages)

After `listen()` each successfully processed message is committed into the fake connection. Use the commit assertions to verify that your handler ran and the offset was acknowledged:

```
KafkaBus::addMessage(
    MessageFactory::for()->withTopicKey('products')->make('{"id":1}')
);

KafkaBus::listen('products');

// Assert at least one message was committed on the topic
KafkaBus::assertCommitted('products');

// Assert with a condition on the ConsumerMessageInterface
KafkaBus::assertCommitted(
    'products',
    fn($msg) => $msg->payload() === '{"id":1}'
        && $msg->headers()['x-idempotency-key'] === 'abc-123'
);

// Assert exact count
KafkaBus::assertCommittedTimes('products', 2);

// Assert nothing was committed (e.g. before listen() is called)
KafkaBus::assertNothingCommitted();
```

Retrieve committed messages directly for custom assertions:

```
$committed = KafkaBus::getCommitted('products'); // list

expect($committed[0]->payload())->toBe('{"id":1}');
expect($committed[0]->headers())->toHaveKey('x-idempotency-key');
```

Changelog
---------

[](#changelog)

Please see [CHANGELOG](CHANGELOG.md) for more information on what has changed recently.

Contributing
------------

[](#contributing)

Please see [CONTRIBUTING](CONTRIBUTING.md) for details.

Security Vulnerabilities
------------------------

[](#security-vulnerabilities)

Please review [our security policy](../../security/policy) on how to report security vulnerabilities.

Credits
-------

[](#credits)

- [Kirill Popkov](https://github.com/popkovkirill)
- [All Contributors](../../contributors)

License
-------

[](#license)

The MIT License (MIT). Please see [License File](LICENSE.md) for more information.

###  Health Score

39

—

LowBetter than 84% of packages

Maintenance99

Actively maintained with recent releases

Popularity0

Limited adoption so far

Community6

Small or concentrated contributor base

Maturity45

Maturing project, gaining track record

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~0 days

Total

2

Last Release

2d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/920047b57e71b2959cf0934a7d6e45a3a18b639a3bc297cab6e0f0ccb488d535?d=identicon)[kEERill](/maintainers/kEERill)

---

Top Contributors

[![popkovkirill](https://avatars.githubusercontent.com/u/6718174?v=4)](https://github.com/popkovkirill "popkovkirill (4 commits)")

---

Tags

laravelmicromuskafka-bus-laravel

###  Code Quality

TestsPest

Static AnalysisPHPStan

Code StyleLaravel Pint

### Embed Badge

![Health badge](/badges/kafka-bus-laravel-bridge/health.svg)

```
[![Health](https://phpackages.com/badges/kafka-bus-laravel-bridge/health.svg)](https://phpackages.com/packages/kafka-bus-laravel-bridge)
```

###  Alternatives

[psalm/plugin-laravel

Psalm plugin for Laravel

3325.1M337](/packages/psalm-plugin-laravel)[spatie/laravel-data

Create unified resources and data transfer objects

1.8k33.0M871](/packages/spatie-laravel-data)[codewithdennis/filament-select-tree

The multi-level select field enables you to make single selections from a predefined list of options that are organized into multiple levels or depths.

327482.0k25](/packages/codewithdennis-filament-select-tree)[nativephp/desktop

NativePHP for Desktop

37833.6k8](/packages/nativephp-desktop)[worksome/exchange

Check Exchange Rates for any currency in Laravel.

124581.3k](/packages/worksome-exchange)[hydrat/filament-table-layout-toggle

Filament plugin adding a toggle button to tables, allowing user to switch between Grid and Table layouts.

63105.4k2](/packages/hydrat-filament-table-layout-toggle)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
