PHPackages                             dev-this/ksqldb-php - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Utility &amp; Helpers](/categories/utility)
4. /
5. dev-this/ksqldb-php

ActiveLibrary[Utility &amp; Helpers](/categories/utility)

dev-this/ksqldb-php
===================

Asynchronous ksqlDB PHP client for PHP 8

45421PHP

Since Aug 13Pushed 4y agoCompare

[ Source](https://github.com/dev-this/ksqldb-php)[ Packagist](https://packagist.org/packages/dev-this/ksqldb-php)[ RSS](/packages/dev-this-ksqldb-php/feed)WikiDiscussions master Synced 1w ago

READMEChangelogDependenciesVersions (1)Used By (0)

ksqlDB PHP client
=================

[](#ksqldb-php-client)

> Currently under development. API stability is not guaranteed until v1.

Requires PHP 8

```
composer require dev-this/ksqldb-php
```

Features
========

[](#features)

- Asynchronous operations (thanks to [amphp/amp](https://github.com/amphp/amp)!)
- All the Confluent [desired client features](https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/contributing/#functionality)
    - Push/pull query support (HTTP/2)()
    - Terminating push queries
    - Inserting new rows of data into existing ksqlDB streams
    - Listing existing streams, tables, topics and queries
    - Creation and deletion of streams and tables
    - Terminating persistent queries
- Native `Amp\Promise` functionality

Usage
-----

[](#usage)

### Create a client

[](#create-a-client)

There is a factory available for client creation

`DevThis\KsqlDB\ClientFactory::create(string $hostname): DevThis\KsqlDB\Client`

No HTTP connection will be established until a client command has been called.

**Usage:**

```
$hostname = 'http://localhost:8088';

$client = (new DevThis\KsqlDB\Factory\ClientFactory())->create($hostname);
```

### Streaming callbacks

[](#streaming-callbacks)

Streaming a query requires a callback class that implements a callback interface. Establishing a stream is purposefully blocking until the header has been received (along with query ID).

`DevThis\KsqlDB\Factory\ClientFactory::stream(Statement $statement, StreamCallback $callback): Amp\Promise`

Callback class must implement `StreamCallback`

```
interface StreamCallback {
    // Invoked once, at the start of the stream
    // StreamHeader has getters for the query ID, and column names and their data types.
    public function onHeader(StreamHeader $header): void;

    // OnEvent will be invoked on each new event
    // StreamEvent is an \ArrayObject
    public function onEvent(StreamEvent $event): void;
}
```

**Usage:**

```
use DevThis\KsqlDB\Interfaces\StreamCallback;
use DevThis\KsqlDB\Statement

$transactionStatement = new Statement("SELECT * FROM transactions EMIT CHANGES;");

$transactionHandler = new class implements StreamCallback {
    public function onHeader(StreamHeader $header): void
    {
        echo sprintf(">Query ID: %s\n", $header->getQueryId());
    }

    public function onEvent(StreamEvent $event): void
    {
        echo "Processing new transaction\n";
        // do something with $event...
    }
}

$stream = $client->stream($transactionStatement, $transactionHandler);
// Query ID
echo $stream->getQueryId();

// Terminate the query
$client->terminate($stream);

// wait indefinitely
\Amp\Promise\wait($promise);
```

### Executing a statement

[](#executing-a-statement)

Executing a statement works similarly to Streaming a statement. The main difference is that executed statements are not continous operations.

`DevThis\KsqlDB\Client::execute(Statement $statement): ArrayObject`

`ArrayObject` will contain the response.

Functional example
==================

[](#functional-example)

Asynchronous application that will eat its own dogfood. Consuming the very events it created:

```
use DevThis\KsqlDB\Interfaces\StreamCallback;
use DevThis\KsqlDB\Statement;
use DevThis\KsqlDB\Factory\ClientFactory;
use DevThis\KsqlDB\StreamEvent;
use DevThis\KsqlDB\StreamHeader;

$client = (new ClientFactory())->create('http://localhost:8088');

$createStatement = new Statement("CREATE STREAM cool_data (
    id VARCHAR KEY,
    message VARCHAR,
    timestamp VARCHAR,
) WITH (
    kafka_topic = 'cool_data',
    partitions = 1,
    value_format = 'avro',
    timestamp = 'timestamp',
    timestamp_format = 'yyyy-MM-dd''T''HH:mm:ss'
);");
$streamStatement = new Statement("SELECT * FROM cool_data EMIT CHANGES;");
$coolDataCallback = new class implements \DevThis\KsqlDB\Interfaces\StreamCallback {
    private const SCHEMA_ID = 0;
    private const SCHEMA_MESSAGE = 1;
    private const SCHEMA_TIMESTAMP = 2;

    public function onHeader(StreamHeader $header): void
    {
        echo sprintf(">Query ID: %s\n", $header->getQueryId());
        echo sprintf(">Columns: %s", print_r($header->getColumns(), true));
        echo "--------------------\n";
    }

    public function onEvent(StreamEvent $event): void
    {
        echo "Processing new transaction\n";
        echo sprintf(">ID: %s\n", $event[static::SCHEMA_ID]);
        echo sprintf(">Message: %s\n", $event[static::SCHEMA_MESSAGE]);
        echo sprintf(">Timestamp: %s\n", $event[static::SCHEMA_TIMESTAMP]);
    }
};

$stream = $client->execute($createStatement);

// Run event loop
// https://amphp.org/amp/event-loop/
\Amp\Loop::run(function () use ($client) {
    $stream = $client->streamAsync($streamStatement, $coolDataCallback);

    Loop::repeat(1000, static function() {
        // insert into stream example.
    });

    // Terminate stream after 100 seconds.
    Loop::delay(1000 * 100, static function () use ($client, $stream) {
        $client->terminateStream($stream->getQueryId());
    });
});
```

Alternatives
------------

[](#alternatives)

-
-

###  Health Score

20

—

LowBetter than 14% of packages

Maintenance20

Infrequent updates — may be unmaintained

Popularity18

Limited adoption so far

Community7

Small or concentrated contributor base

Maturity29

Early-stage or recently created project

 Bus Factor1

Top contributor holds 100% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

### Community

Maintainers

![](https://www.gravatar.com/avatar/b9a7f713c1bb3fe4c3191cc8f8f92a1b20ed4883efb70ac94b36921eac208029?d=identicon)[VeryStrongFingers](/maintainers/VeryStrongFingers)

---

Top Contributors

[![VeryStrongFingers](https://avatars.githubusercontent.com/u/7744427?v=4)](https://github.com/VeryStrongFingers "VeryStrongFingers (8 commits)")

### Embed Badge

![Health badge](/badges/dev-this-ksqldb-php/health.svg)

```
[![Health](https://phpackages.com/badges/dev-this-ksqldb-php/health.svg)](https://phpackages.com/packages/dev-this-ksqldb-php)
```

###  Alternatives

[eftec/minilang

A mini scripting language for php

113.2k2](/packages/eftec-minilang)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
