PHPackages                             misterion/ko-worker - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [CLI &amp; Console](/categories/cli)
4. /
5. misterion/ko-worker

ActiveLibrary[CLI &amp; Console](/categories/cli)

misterion/ko-worker
===================

Ko-worker project is a base to develop amqp based message dispatchers

v1.3.1(8y ago)2713.4k5[4 issues](https://github.com/misterion/ko-worker/issues)MITPHPPHP &gt;=7.0.0

Since May 14Pushed 8y ago4 watchersCompare

[ Source](https://github.com/misterion/ko-worker)[ Packagist](https://packagist.org/packages/misterion/ko-worker)[ Docs](https://github.com/misterion/ko-worker)[ RSS](/packages/misterion-ko-worker/feed)WikiDiscussions master Synced 5d ago

READMEChangelog (2)Dependencies (5)Versions (15)Used By (0)

ko-worker
=========

[](#ko-worker)

[![Build Status](https://camo.githubusercontent.com/81655b1772b337b1d8afee932d6406ee74d584370d1a191f80990dcff8ab7cb1/68747470733a2f2f7472617669732d63692e6f72672f6d6973746572696f6e2f6b6f2d776f726b65722e737667)](https://travis-ci.org/misterion/ko-worker)[![Latest Stable Version](https://camo.githubusercontent.com/1f301894fb82851ae72833f6370a41927c65a2e7eeb02b490dbcbcc311a3fb97/68747470733a2f2f706f7365722e707567782e6f72672f6d6973746572696f6e2f6b6f2d776f726b65722f762f737461626c652e706e67)](https://packagist.org/packages/misterion/ko-worker)[![Code Coverage](https://camo.githubusercontent.com/e70ea846810081b63ab12da9682dbaf54e4055a1f5015ce77697f0c7c29e9dad/68747470733a2f2f7363727574696e697a65722d63692e636f6d2f672f6d6973746572696f6e2f6b6f2d776f726b65722f6261646765732f636f7665726167652e706e673f623d6d6173746572)](https://scrutinizer-ci.com/g/misterion/ko-worker/?branch=master)[![Scrutinizer Code Quality](https://camo.githubusercontent.com/da22a87847b4a9b893ecf21ec78fdedd7577bed0940f24b22ee83cbb07b076ee/68747470733a2f2f7363727574696e697a65722d63692e636f6d2f672f6d6973746572696f6e2f6b6f2d776f726b65722f6261646765732f7175616c6974792d73636f72652e706e673f623d6d6173746572)](https://scrutinizer-ci.com/g/misterion/ko-worker/?branch=master)[![Latest Unstable Version](https://camo.githubusercontent.com/d6509a06ecdff8cc95ef6fe7d62fe0f7ae4d98a4f83a3f48e43a2a091942efeb/68747470733a2f2f706f7365722e707567782e6f72672f6d6973746572696f6e2f6b6f2d776f726b65722f762f756e737461626c652e706e67)](https://packagist.org/packages/misterion/ko-worker)[![License](https://camo.githubusercontent.com/af1f351358eab7d5fc59f7fa60111484887d579cddcf23a5e07351e7908b083b/68747470733a2f2f706f7365722e707567782e6f72672f6d6973746572696f6e2f6b6f2d776f726b65722f6c6963656e73652e706e67)](https://packagist.org/packages/misterion/ko-worker)

About
-----

[](#about)

Ko-worker project is a PHP library that aims to abstract queue and RPC messaging patterns that can be implemented over RabbitMQ. The other goal is simplify message dispatching and RPC server development over RabbitMQ.

```
$m = new Ko\AmqpBroker($config);
$m->getProducer('upload_picture')->publish($message);
```

Each time you starting to use something like queues you should decide how to process your messages. Yes, it's sounds not complicated:

- write some console application or daemon
- grab message from queue and execute them in master or child process
- restart child process if it fail
- log crashes or so on
- ...

And Ko-Worker allows to simplify this to just 2 steps

- create config file
- run ko application

```
$ ./bin/ko --workers 5 --consumer upload_picture
```

Why Ko-worker?

- Used in real life highload [GameNet project](http://gamenet.ru)
- Close to 100% code coverage
- Simple - less then 5 minuted before you start.

Requirements
------------

[](#requirements)

```
PHP >= 5.4
pcntl extension installed
posix extension installed
amqp extension installed

```

Installation
------------

[](#installation)

### Composer

[](#composer)

The recommended way to install library is [composer](http://getcomposer.org). You can see [package information on Packagist](https://packagist.org/packages/misterion/ko-worker).

```
{
	"require": {
		"misterion/ko-worker": "*"
	}
}
```

### Do not use composer?

[](#do-not-use-composer)

Just clone the repository and take care about autoload for namespace `Ko`.

Examples
--------

[](#examples)

Look at `examples` folder

UNDONE

Usage
-----

[](#usage)

First of all your should create your RabbitMq configuration file. For example config.yaml:

```
connections:
  default:
    host:     'localhost'
    port:     5672
    login:    'guest'
    password: 'guest'
    vhost:    '/'
producers:
  social_activity:
    connection:       default
    exchange_options: {name: 'social_activity_exchange', type: direct, durable: 1, passive: 1}
consumers:
  social_activity:
    connection:    default
    queue_options:
      name: 'social_activity_queue'
      durable: 1
      autodelete: 1
      exclusive: 0
      qos: 5
      binding: {name: social_activity_exchange, routing-keys: *}
  class: \MyProject\TestAction
```

Here we configure the exchange producer and queue consumer that our application will have. In this example Ko\\AmqpBroker will contain the producer `social_activity` and consumer named `social_activity`.

You should specify a connection for the client.

If you need to add optional queue or exchange arguments, then your options can be something like this:

```
queue_options: {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}
```

another example with message TTL of 20 seconds:

```
queue_options: {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}
```

The argument value must be a list of datatype and value. Valid datatypes are:

- `S` - String
- `I` - Integer
- `D` - Decimal
- `T` - Timestamps
- `F` - Table
- `A` - Array

Adapt the `arguments` according to your needs.

If you want to bind queue with specific routing keys you can declare it in producer or consumer config:

```
queue_options:
    binding:
        - {name: "social_activity_exchange", routing_keys: 'social.#.addFriends'}
        - {name: "social_activity_exchange", routing_keys: '*.removeFriends'}
```

### Producers, Consumers, AqmBroker?

[](#producers-consumers-aqmbroker)

In a messaging application, the process sending messages to the broker is called **producer** while the process receiving those messages is called **consumer**. In your application you will have several of them that you can list under their respective entries in the configuration.

### Producer

[](#producer)

A producer will be used to send messages to the server. In the AMQP Model, messages are sent to an **exchange**, this means that in the configuration for a producer you will have to specify the connection options along with the exchange options, which usually will be the name of the exchange and the type of it.

Now let's say that you want to process some social network activity in the background. After you add friend, you will publish a message to server with the following information:

```
public function addFriendAction($name)
{
    $msg = array('user_id' => 1235, 'friend_id' => '67890');
    $this->broker->getProducer('social_activity')->publish(json_encode($msg));
}
```

Besides the message itself, the `Ko\RabbitMq\Producer#publish()` method also accepts an optional routing key parameter and an optional array of additional properties. This way, for example, you can change the application headers.

The next piece of the puzzle is to have a consumer that will take the message out of the queue and process it accordingly.

### Consumers

[](#consumers)

A consumer will connect to the server and start a **loop** waiting for incoming messages to process. Depending on the specified **class** for such consumer will be the behavior it will have. Let's review the consumer configuration from above:

```
consumers:
  social_activity:
    connection:    default
    queue_options:
      name: 'social_activity_queue'
      durable: 1
      autodelete: 1
      exclusive: 0
      qos: 5
      binding: {name: social_activity_exchange, routing-keys: *}
  class: \MyProject\TestAction
```

As we see there, the **class** option has a reference to an **\\MyProject\\TestAction** class. It should implements **Ko\\Worker\\ActionInterface**.

When the consumer gets a message from the server it will create and execute such class. If for testing or debugging purposes you need to specify a different class, then you can change it there.

Apart from the callback we also specify the connection to use, the same way as we do with a **producer**. The remaining options are the the **queue\_options**. In the **queue\_options** we will provide a **queue name** and **binding**.

Why?

As we said, messages in AMQP are published to an **exchange**. This doesn't mean the message has reached a **queue**. For this to happen, first we need to create such **queue** and then bind it to the **exchange**.

The cool thing about this is that you can bind several **queues** to one **exchange**, in that way one message can arrive to several destinations.

The advantage of this approach is the **decoupling** from the producer and the consumer. The producer does not care about how many consumers will process his messages.

All it needs is that his message arrives to the server. In this way we can expand the actions we perform every time a friend added without the need to change code in our controller.

Now, how to run a consumer? There's a command for it that can be executed like this:

```
$ ./bin/ko --c ../config_queue.yaml --q social_activity --w 10
```

What does this mean?

We are executing the **social\_activity** consumer telling it should use 10 child process to consuming.

Every time the consumer receives a message from the server, it will execute the configured callback passing the AMQP message as an instance of the `AMQPEnvelope` class. The message body can be obtained by calling `$msg->getBody()`.

By default the consumer will process messages in an **endless loop** for some definition of *endless*.

UNDONE

Production
----------

[](#production)

Ko-worker came with nice `ko-package` utility which allow you to create executable phar from your worker project.

```
$ ./bin/ko-package

ko-package version 0.0.1
Usage: ko-package [options] [operands]
Options:
  -p, --path         Path to application code
  -o, --output []    Output file name
  -v, --version []   Application version
  -n, --name []      Application name
  -e, --exclude []   Excluded folders like logs, test, etc.
```

Credits
-------

[](#credits)

Ko-worker written as a part of [GameNet project](http://gamenet.ru) by Nikolay Bondarenko (misterionkell at gmail.com) and Vadim Sabirov (pr0head at gmail.com). We use some interesting ideas from Alvaro Videla [Thumper](https://github.com/videlalvaro/Thumper) and part of documentation from [RabbitMqBundle](https://github.com/videlalvaro/RabbitMqBundle).

License
-------

[](#license)

Released under the [MIT](LICENSE) license.

Links
-----

[](#links)

- [Project profile on the Ohloh](https://www.ohloh.net/p/ko-worker)

###  Health Score

36

—

LowBetter than 82% of packages

Maintenance17

Infrequent updates — may be unmaintained

Popularity31

Limited adoption so far

Community15

Small or concentrated contributor base

Maturity66

Established project with proven stability

 Bus Factor1

Top contributor holds 73.2% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~106 days

Recently: every ~263 days

Total

14

Last Release

3007d ago

PHP version history (2 changes)v1.0.0PHP &gt;=5.4.0

v1.3.0PHP &gt;=7.0.0

### Community

Maintainers

![](https://www.gravatar.com/avatar/f654b32d3633dcd767ba2aeb2fa2c4af845cd70d270f2971f9d5e4ee8d9c9c0a?d=identicon)[misterion](/maintainers/misterion)

---

Top Contributors

[![misterion](https://avatars.githubusercontent.com/u/196737?v=4)](https://github.com/misterion "misterion (60 commits)")[![pr0head](https://avatars.githubusercontent.com/u/7290631?v=4)](https://github.com/pr0head "pr0head (21 commits)")[![umpirsky](https://avatars.githubusercontent.com/u/208957?v=4)](https://github.com/umpirsky "umpirsky (1 commits)")

---

Tags

cliutilityrpcposixqueueparallelAMQPbackgroundforkpcntl

###  Code Quality

TestsPHPUnit

### Embed Badge

![Health badge](/badges/misterion-ko-worker/health.svg)

```
[![Health](https://phpackages.com/badges/misterion-ko-worker/health.svg)](https://phpackages.com/packages/misterion-ko-worker)
```

###  Alternatives

[misterion/ko-process

Simple pcntl fork wrapper and process manager

177337.7k6](/packages/misterion-ko-process)[zhgzhg/gphpthread

Generic PHP Threads library using only pure PHP

154.1k](/packages/zhgzhg-gphpthread)[j13k/yaml-lint

A compact command line utility for checking YAML file syntax

161.1M19](/packages/j13k-yaml-lint)[phlib/console-process

Console implementation.

1833.5k2](/packages/phlib-console-process)[enqueue/async-command

Symfony async command

12200.7k3](/packages/enqueue-async-command)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
