PHPackages                             roi-up-agency/php-kafka-schema-registry - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. roi-up-agency/php-kafka-schema-registry

ActiveLibrary

roi-up-agency/php-kafka-schema-registry
=======================================

Library to consume and produce in Apache Kafka &lt; 2.1 and Schema Registry, using php traits. This lib use the base present in Nicofuma/poc-php-kafka lib.

1.8(5y ago)1272MITPHPPHP &gt;=7.1.0

Since May 22Pushed 5y ago2 watchersCompare

[ Source](https://github.com/roi-up-agency/php-kafka-schema-registry)[ Packagist](https://packagist.org/packages/roi-up-agency/php-kafka-schema-registry)[ RSS](/packages/roi-up-agency-php-kafka-schema-registry/feed)WikiDiscussions master Synced yesterday

READMEChangelog (10)Dependencies (4)Versions (12)Used By (0)

php-kafka-schema-registry
=========================

[](#php-kafka-schema-registry)

[![release](https://camo.githubusercontent.com/ab313bd6bebedaadfaf47cd72115d7671a3da01ca9a94556e6f93d8ef79f11ea/687474703a2f2f6769746875622d72656c656173652d76657273696f6e2e6865726f6b756170702e636f6d2f6769746875622f726f692d75702d6167656e63792f7068702d6b61666b612d736368656d612d72656769737472792f72656c656173652e7376673f7374796c653d666c6174)](https://github.com/roi-up-agency/php-kafka-schema-registry/releases/latest)

Library to consume and produce in Apache Kafka &lt; 2.1 and Schema Registry, using php traits. This lib use the base present in Nicofuma/poc-php-kafka lib.

Installation
------------

[](#installation)

```

composer require roi-up-agency/php-kafka-schema-registry 1.0.*

```

.env Params
-----------

[](#env-params)

All .env params can be setted in consumer class

```
#Schema registry url
SCHEMA_REGISTRY_URL=http://xx.xx.xx.xx:2181

#List of brokers
KAFKA_BROKERS=xx.xx.xx.xx:9092,xx.xx.xx.xx:9093...

#Consumer group id or null
KAFKA_CONSUMER_GROUP_ID=myConsumerGroupId

```

### Notes

[](#notes)

- The examples use laravel commands, but you can use it in any php class
- This vendor use **subject and version** to encode / decode records

Consumption Usage
-----------------

[](#consumption-usage)

```
class MyConsumerCommand
{
    use \Kafka\SchemaRegistry\Traits\ConsumerTrait;
    ...

    public function handle()
    {
        //Prepare basic configs as broker list or RebalanceCb
        $this->prepare(
            $chemaRegistryUrl /* Mandatory if .env SCHEMA_REGISTRY_URL not configured */,
            $brokerList /* Mandatory if .env KAFKA_BROKERS not configured*/,
          );

        //Execute if you want to set a consumer group Id to your app
        $this->setConsumerGroup();

        //Before call listen method, you can set customs config params via setConf or setTopicConf

        //listen method init the Avro Consumer and subscribe it to the desired topics

        //You can manage your callback with a callback class
        $this->listen(
            ['topic_xxx'] /* List of topics you want to subscribe your consumer */,
            CallbackClass::class /* CallbackClass will be used to manage your messages. This class must implements Kafka\SchemaRegistry\Interfaces\ConsumerCallbackInterface*/
        );

        //Or sending a function which receive the message as second param
        $this->listen(
            ['topic_xxx'],
            function($message){
              //Exists a helper to handle returned message with some getters and setters
              //Getters returns an object if key or value are arrays

              $kafkaMessage = new \Kafka\SchemaRegistry\Helpers\KafkaMessage($message);
              dd($kafkaMessage->getKey(), $kafkaMessage->getValue(), $kafkaMessage->getTopic());
              /*
              For this value
              $item = [
                "time" => time(),
                "site" => "www.hola.es",
                "ip"   => "192.168.2.".mt_rand(0, 255)
              ];
              $kafkaMessage->getValue()->time;
              */
            }
        );
    }`

    ...
}

```

### Consumer default config

[](#consumer-default-config)

##### Consumer

[](#consumer)

- "metadata.broker.list" (ConsumerConfParam::METADATA\_BROKER\_LIST) = broker list given

##### Topic

[](#topic)

- "auto.offset.reset" (TopicConfParam::AUTO\_OFFSET\_RESET) = 'smallest'

#### Important:

[](#important)

Your CallbackClass must implements Kafka\\SchemaRegistry\\Interfaces\\ConsumerCallbackInterface presents in vendor

Production Usage
----------------

[](#production-usage)

```
class MyProducerCommand
{
    use \Kafka\SchemaRegistry\Traits\ProducerTrait;
    ...

    public function handle()
    {

        //Set the schema and version which we're going to use
        //No that by default, the schema naming strategy is for schema value is topic-value and for the key topic-key
        //This suffix is added before produce by library
        $this->setSchema('topic', 1);
        $this->setKeySchema('topic', 1);

        //Prepare basic configs as broker list or copresion type
        $this->prepare(
            $chemaRegistryUrl /* Mandatory if .env SCHEMA_REGISTRY_URL not configured */,
            $brokerList /* Mandatory if .env KAFKA_BROKERS not configured*/,
        );

        //Example of schema generated via https://github.com/Landoop/schema-registry-ui
        /*{
          "type": "record",
          "name": "visits",
          "namespace": "com.landoop",
          "doc": "This is a sample Avro schema to get you started. Please edit",
          "fields": [
            {
              "name": "time",
              "type": "int"
            },
            {
              "name": "site",
              "type": "string"
            },
            {
              "name": "ip",
              "type": "string"
            }
          ]
        }*/
        //You can use object or array to generate your items
        $item = new \stdClass();
         $item->time = time();
         $item->site = "www.example.com";
         $item->ip = "192.168.2.".mt_rand(0, 255);

         /*$item = [
           "time" => time(),
           "site" => "www.hola.es",
           "ip"   => "192.168.2.".mt_rand(0, 255)
         ];*/

        $data = [$item];

        //Schema for visits-key
        /*{
          "type": "record",
          "name": "visits_key",
          "namespace": "com.landoop",
          "doc": "This is a sample Avro schema to get you started. Please edit",
          "fields": [
            {
              "name": "key",
              "type": "string"
            }
          ]
        }*/
        $key  = ["key" => "2"];

        //Produce data
        $this->produce('topic', $data /* Array of items*/, $key /* Or null */);

    }`

    ...
}

```

### Producer default config

[](#producer-default-config)

##### Producer

[](#producer)

- "compression.type" (ProducerConfParam::COMPRESSION\_TYPE) = 'snappy'
- "linger.ms" (ProducerConfParam::LINGER\_MS) = '20'
- "broker.version.fallback" (ProducerConfParam::BROKER\_VERSION\_FALLBACK) = '2.0.1'
- "queue.buffering.max.kbytes"(ProducerConfParam::QUEUE\_BUFFERING\_MAX\_KBYTES) = (string)32\*1024

##### Topic

[](#topic-1)

None

Available Config
----------------

[](#available-config)

You can add any extra configuration you need using ConsumerConfParam o ProducerConfParam constants before call listen or produce methods depens your context.

To set configs:

```
//Example for producer enable idempotence
$this->setConfig(ProducerConfParam::ENABLE_IDEMPOTENCE, true);

//Example for topic change autocommin interval idempotence
$this->setConfig(TopicConfParam::AUTO_COMMIT_INTERVAL_MS, 500):

```

You can get a full list of current config using dump method present in rdKafka lib To get configs:

```
dd($this->getConfig()->dump());

//Or

dd($this->getTopicConf()->dump());

```

More references at
------------------

[](#more-references-at)

- [confluent-schema-registry-docs](https://docs.confluent.io/current/schema-registry/docs/index.html), Schema registry - Confluent platform documentation
- [schema-registry-ui](https://github.com/Landoop/schema-registry-ui), View, create, evolve and manage your Avro Schemas for multiple Kafka clusters
- [kafka-site](https://kafka.apache.org/20/documentation.html), Kafka site documentation
- [avro-site](http://avro.apache.org/docs/current), Avro site documentation

###  Health Score

28

—

LowBetter than 54% of packages

Maintenance20

Infrequent updates — may be unmaintained

Popularity13

Limited adoption so far

Community8

Small or concentrated contributor base

Maturity61

Established project with proven stability

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~36 days

Recently: every ~89 days

Total

11

Last Release

2183d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/52db86196e778f12e6aa5b8e4803052700bcf08dae3dfbb6ee957c4209fcb2e1?d=identicon)[jsolam](/maintainers/jsolam)

---

Top Contributors

[![jsolam](https://avatars.githubusercontent.com/u/34037044?v=4)](https://github.com/jsolam "jsolam (6 commits)")

---

Tags

kafkaschema-registry

### Embed Badge

![Health badge](/badges/roi-up-agency-php-kafka-schema-registry/health.svg)

```
[![Health](https://phpackages.com/badges/roi-up-agency-php-kafka-schema-registry/health.svg)](https://phpackages.com/packages/roi-up-agency-php-kafka-schema-registry)
```

###  Alternatives

[laravel/framework

The Laravel Framework.

34.6k509.9M17.0k](/packages/laravel-framework)[laravel/dusk

Laravel Dusk provides simple end-to-end testing and browser automation.

1.9k36.7M259](/packages/laravel-dusk)[netflie/whatsapp-cloud-api

The first PHP SDK to send and receive messages using a cloud-hosted version of the WhatsApp Business Platform

640431.7k4](/packages/netflie-whatsapp-cloud-api)[tempest/framework

The PHP framework that gets out of your way.

2.1k23.1k9](/packages/tempest-framework)[flix-tech/avro-serde-php

A library to serialize and deserialize Avro records making use of the confluent schema registry

674.0M17](/packages/flix-tech-avro-serde-php)[saithink/saiadmin

webman plugin

2709.9k1](/packages/saithink-saiadmin)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
