PHPackages                             domraider/rxnet - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [HTTP &amp; Networking](/categories/http)
4. /
5. domraider/rxnet

ActiveLibrary[HTTP &amp; Networking](/categories/http)

domraider/rxnet
===============

Rx connector for PHP

0.6.18(7y ago)1318.6k15[1 issues](https://github.com/Domraider/rxnet/issues)2MITPHPPHP ^5.6 || ^7.0

Since May 20Pushed 7y ago7 watchersCompare

[ Source](https://github.com/Domraider/rxnet)[ Packagist](https://packagist.org/packages/domraider/rxnet)[ RSS](/packages/domraider-rxnet/feed)WikiDiscussions master Synced 3d ago

READMEChangelog (10)Dependencies (22)Versions (34)Used By (2)

RxNet
=====

[](#rxnet)

[![License](https://camo.githubusercontent.com/11be9d2c569633927b7797a194d9cc61f70e325084673b1820450e1071924a32/68747470733a2f2f706f7365722e707567782e6f72672f646f6d7261696465722f72786e65742f6c6963656e7365)](https://packagist.org/packages/domraider/rxnet)[![Latest Stable Version](https://camo.githubusercontent.com/9020f0a16f3a3aec6c56a0fdf940d6afa176afc5066017c6afb95fea6cadf491/68747470733a2f2f706f7365722e707567782e6f72672f646f6d7261696465722f72786e65742f762f737461626c65)](https://packagist.org/packages/domraider/rxnet)[![Total Downloads](https://camo.githubusercontent.com/2a8a39f985866bd43924b030ead478e908ebcb57417def8095d22347487a7fec/68747470733a2f2f706f7365722e707567782e6f72672f646f6d7261696465722f72786e65742f646f776e6c6f616473)](https://packagist.org/packages/domraider/rxnet)[![Latest Unstable Version](https://camo.githubusercontent.com/cfcbd8485aedd61513d6a179c3fec6582595db3b1a1e53fed33e5904b8374850/68747470733a2f2f706f7365722e707567782e6f72672f646f6d7261696465722f72786e65742f762f756e737461626c65)](https://packagist.org/packages/domraider/rxnet)[![composer.lock](https://camo.githubusercontent.com/63267abc693c8c8bea9121ced997e09c7d606990c477b94ccaf81e650ff9f036/68747470733a2f2f706f7365722e707567782e6f72672f646f6d7261696465722f72786e65742f636f6d706f7365726c6f636b)](https://packagist.org/packages/domraider/rxnet)

[RxPhp](https://github.com/ReactiveX/RxPHP) is a great work that brings us Reactive programming : asynchronous programming for human being.
You can play with reactiveX on [RxMarble.com](http://rxmarbles.com/), find all the available operators on the official [Reactivex.io](http://reactivex.io/documentation/operators.html) website or read an [interesting introduction](https://gist.github.com/staltz/868e7e9bc2a7b8c1f754).

RxNet is an effort to bring it battery included.

- [Dns](#dns)
- [Http](#http)
- [Httpd](#httpd)
- [RabbitMq](#rabbitmq)
- [Redis](#redis)
- [ZeroMq](#zeromq)
- [InfluxDB](#influxdb)
- [Statsd](#statsd)
- Others outside
    - [voryx/pg-async](https://github.com/voryx/PgAsync) postgres client
    - [RxPHP/RxStream](https://github.com/RxPHP/RxStream) stream
    - [RxPHP/RxWebsocket](https://github.com/RxPHP/RxWebsocket) websocket client / server
    - [RxPHP/RxChildProcess](https://github.com/RxPHP/RxChildProcess) forking

Thanks to [react/react](https://github.com/reactphp/react), its marvelous reactor pattern and all work done with it, many are just simple wrappers.

Installation
------------

[](#installation)

With composer : `domraider/rxnet`

Or just clone the lib, run `composer install` and try the examples scripts.

Why one repository for all ? Because when you start using it you want every libraries to be RxFriendly :)

Dns
---

[](#dns)

Asynchronous DNS resolver. Thanks to [daverandom/libdns](https://github.com/DaveRandom/LibDNS) parser it was a breeze to code.

No extra extensions are needed

```
$dns = new Dns();
// Procedural way
echo Rx\awaitOnce($dns->resolve('www.google.fr', '8.8.4.4'));

// All types of queries are allowed
$dns->soa('www.google.fr')
  ->subscribe(new StdoutObserver());
```

Http
----

[](#http)

HTTP client with all ReactiveX sweet

No extra extensions are needed

```
$http = new Http();
$http->get("https://github.com/Domraider/rxnet/commits/master")
  // Timeout after 0.3s
  ->timeout(300)
  // will retry 2 times on error
  ->retry(2)
  // Transform response to something else
  ->map(function(Psr\Http\Message\ResponseInterface $response) {
  	$body = (string) $response->getBody();
	// Domcrawler to extract commits
    return $body;
  })
  ->subscribe(new StdoutObserver());

// All the given options
$opts = [
  // No buffering, you will receive chunks has they arrived
  // Perfect for big files to parse or streaming json
  'stream' => true,
  // You can use body, json or form_params
  // * json will add the header and json_encode
  // * form_params will build query in body and add application/x-www-form-urlencoded header
  'body' => 'raw body for post',
  'json' => ['my'=>'parameters', 'they-will->be'=>'json'],
  'form_param' => ['param_0'=>'value_0', 'param_1'=>'value_1'],
  // Set query string from here not the url
  'query'=> [
    'param1'=>'one'
  ],
  // Use a proxy
  'proxy' => 'http://user:password@myproxy:8080',
  // Append extra headers
  'headers' => [
    'Authorization' => 'Basic '.base64_encode('user:password'),
    // Specify user-agent to use
    'User-Agent' => 'SuperParser/0.1',
  ],
  // see http://php.net/manual/en/context.ssl.php
  // Add whatever option you want on your https query
  'ssl' => [
    'verify_peer' => false
  ],
  // allow redirect
  'allow_redirects' => true,
  // or
  'allow_redirects' => [
    // max redirects to follow
    'max' => 10
  ]
];

$http->post('https://adwords.google.com/my-10gb.xml', $opts)
  ->subscribeCallback(function($chunk) {
    // let's give it to expat while it arrives
  });
```

### TODO

[](#todo)

\[ \] Psr7 request/response

\[ \] multipart

\[ \] gzip/deflate

HttpD
-----

[](#httpd)

Standalone HTTP server based on [react/http](https://github.com/reactphp/http) implements [nikic/fast-route](https://github.com/nikic/FastRoute) as default router

No extra extensions are needed

```
$httpd = new HttpD();
$httpd->route('GET', '/', function(HttpdRequest $request, HttpdResponse $response) {
  $response->text('Hello');
});
$httpd->route('POST', '/{var}', function(HttpdRequest $request, HttpdResponse $response) {
  $var = $request->getRouteParam('var');
  $response->json(['var'=>$var]);
});
$httpd->listen(8080);
```

Performances on a macbook pro are around 500 msg/s for one php process.

Remember that it does not need any fpm to run. And that default fpm configuration is with 10 childs.

### Todo

[](#todo-1)

\[ \] Psr7 Request / Response

\[ \] gzip / deflate

\[ \] http2

\[ \] multipart ?

\[ \] ssl ? :D

RabbitMq
--------

[](#rabbitmq)

Wrapper from [jakubkulhan/bunny](https://github.com/jakubkulhan/bunny) that works just fine

No extra extensions are needed

### Consume

[](#consume)

```
$rabbit = new RabbitMq('rabbit://guest:guest@127.0.0.1:5672/', new Serialize());
// Wait for rabbit to be connected
\Rxnet\awaitOnce($rabbit->connect());

// Will wait for messages
$rabbit->consume()
    ->subscribeCallback(function (RabbitMessage $message) use ($debug, $rabbit) {
        echo '.';
        $data = $message->getData();
        $name = $message->getName();
        $head = $message->getLabels();
        // Do what you want but do one of this to get next
        $message->ack();
        //$message->nack();
        //$message->reject();
        //$message->rejectToBottom();
    });
```

### Produce

[](#produce)

```
$queue = $rabbit->queue('test_queue', []);
$exchange = $rabbit->exchange('amq.direct');

$rabbit->connect()
    ->zip([
      // Declare all the binding
        $queue->create($queue::DURABLE),
        $queue->bind('/routing/key', 'amq.direct'),
        $exchange->create($exchange::TYPE_DIRECT, [
            $exchange::DURABLE,
            $exchange::AUTO_DELETE
        ])
    ])
    // Everything's done let's produce
    ->subscribeCallback(function () use ($exchange, $loop) {
        $done = 0;
		// Just a simple array
        \Rx\Observable::just(['id' => 2, 'foo' => 'bar'])
            // Wait for one produce to be done before starting another
            ->flatMap(function ($data) use ($exchange) {
                // Rabbit will handle serialize and unserialize
                return $exchange->produce($data, '/routing/key');
            })
            // Produce it many times
            ->repeat($10000)
            // Let's get some stats
            ->subscribeCallback(
                function () use (&$done) { $done++;},
                function (\Exception $e) { echo "shit happens : ".$e->getMessage();},
                function () use (&$done, $loop) { echo number_format($done)." lines produced"; }
        	);
    });
```

### TODO

[](#todo-2)

\[ \] add all possible options has constant

Redis
-----

[](#redis)

Wrapper from [clue/redis](https://github.com/clue/php-redis-react) (great work !)

No extra extensions are needed

```
$redis = new Redis();

// Wait for redis to be ready
$redis = RxNet\awaitOnce($redis->connect('127.0.0.1:6379'));

$redis->get('key')
  ->subscribe(new StdoutObserver());
// Every redis operators return an observable
// And they are all implemented
```

ZeroMq
------

[](#zeromq)

Message exchange through tcp (or ipc or inproc).

Needs [Pecl ZMQ](https://pecl.php.net/package/zmq) extension to work

Router/dealer is a both direction communication. Dealer will wait for the router, router will not, so dealer has to start first

```
$zmq = new ZeroMq(new MsgPack());
// Connect to the router with my identity
$dealer = $zmq->dealer('tcp://127.0.0.1:3000', 'Roger');
// Display output
$dealer->subscribeCallback(new StdoutObserver())
// And start
$dealer->send(new PingCommand('ping'));
```

```
// Bind and wait
$router = $zmq->router('tcp://127.0.0.1:3000');
// Show received message and wait 0.1s to answer
$router->doOnEach(new StdOutObserver())
  ->delay(100)
  ->subscribeCallback(function($data) use($router) {
  	$router->send(new ReceivedEvent('pong'), 'Roger');
  });
```

### Different protocols

[](#different-protocols)

You can do `push/pull`, `req/rep`, read [ØMQ - The Guide](http://zguide.zeromq.org) to see what network models fits you.

5k to 10k msg/s in router dealer. 30k msg/s in push pull.

### TODO

[](#todo-3)

\[ \] pub/sub

InfluxDB
--------

[](#influxdb)

InfluxDB client based on [influxdata/influxdb-php](https://github.com/influxdata/influxdb-php)It only supports UDP protocol for the moment (write only). Our primary goal was to have a non blocking client to send metrics.

Statsd
------

[](#statsd)

Statsd client based on [this gist](https://gist.github.com/1065177/5f7debc212724111f9f500733c626416f9f54ee6) and [php-datadogstatsd](https://github.com/DataDog/php-datadogstatsd) for tagging support.

```
$statsd->gauge("database.connections", 42)
  ->subscribe(new StdOutObserver(), new EventLoopScheduler($loop));
```

Mysql
-----

[](#mysql)

The mysql client uses mysqli.

```
$conn = new Rxnte\Mysql\Connection([
    'host' => 'localhost',
    'user' => 'myUser',
    'password' => 'myPasswd',
    'database' => 'myDb'
]);

$conn->query('SELECT NOW()');

$conn->transaction(['SELECT NOW()']);
```

Sweet
-----

[](#sweet)

### AwaitOnce

[](#awaitonce)

Classic procedural is always possible but take care of side effects

```
$observable = $http->get('http://www.google.fr')
  ->timeout(1000)
  ->retry(3);
// Loop continue to go forward during await
$response = Rxnet\awaitOnce($observable);
// but this echo will wait before running
echo "1";
```

### Await

[](#await)

Using [rx/await](https://packagist.org/packages/rx/await) you can transform you observable to a generator

```
$source = \Rx\Observable::interval(1000)
    ->take(5); //Limit items to 5

$generator = \Rx\await($source);

foreach ($generator as $item) {
    echo $item, PHP_EOL;
}
echo "DONE";
```

### On demand

[](#on-demand)

```
// Great to read gigabytes without memory leaks
$reader = new \Rxnet\OnDemand\OnDemandFileReader("./test.csv");
$reader->getObservable()
    ->subscribeCallback(
        function ($row) use ($reader) {
            echo "got row : {$row}\n";
            // read next octet
            $reader->produceNext();
        },
        null,
        function() {
            echo "------------------\n";
            echo "read completed\n";
        }
    );
$reader->produceNext(1);
```

### OnBackPressureBuffer

[](#onbackpressurebuffer)

[![](bp.obp.buffer.png)](bp.obp.buffer.png)

```
$backPressure = new \Rxnet\Operator\OnBackPressureBuffer(
    5, // Buffer capacity
    function($next, \SplQueue $queue) {echo "Buffer overflow";}, // Callable on buffer full (nullable)
    OnBackPressureBuffer::OVERFLOW_STRATEGY_ERROR // strategy on overflow
);

\Rx\Observable::interval(1000)
    ->doOnNext(function($i) {
        echo "produce {$i} ";
    })
    ->lift($backPressure->operator())
    ->flatMap(function ($i) {
        return \Rx\Observable::just($i)
            ->delay(3000);
    })
    ->doOnNext([$backPressure, 'request'])
    ->subscribe($stdout, $scheduler);
```

### OnBackPressureBufferFile

[](#onbackpressurebufferfile)

if consuming is slower than producing, next element will be written to file in givent folder.

On start, read buffer's path to search for existing and un-consumed events

```
$backPressure = new \Rxnet\Operator\OnBackPressureBufferFile(
    './', // Folder to write files
    new MsgPack(), // Serializer to use
    -1, // Buffer capacity, -1 for unlimited
    function($next, \SplQueue $queue) {echo "Buffer overflow";}, // Callable on buffer full (nullable)
    OnBackPressureBuffer::OVERFLOW_STRATEGY_ERROR // strategy on overflow
);

\Rx\Observable::interval(1000)
    ->doOnNext(function($i) {
        echo "produce {$i} ";
    })
    ->lift($backPressure->operator())
    ->flatMap(function ($i) {
        return \Rx\Observable::just($i)
            ->delay(3000);
    })
    ->doOnNext([$backPressure, 'request'])
    ->subscribe($stdout, $scheduler);
```

###  Health Score

38

—

LowBetter than 83% of packages

Maintenance20

Infrequent updates — may be unmaintained

Popularity35

Limited adoption so far

Community29

Small or concentrated contributor base

Maturity59

Maturing project, gaining track record

 Bus Factor2

2 contributors hold 50%+ of commits

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~42 days

Recently: every ~137 days

Total

25

Last Release

2673d ago

PHP version history (2 changes)0.1.0PHP &gt;=5.6.0

0.6.0PHP ^5.6 || ^7.0

### Community

Maintainers

![](https://avatars.githubusercontent.com/u/5006899?v=4)[Jérémy](/maintainers/Th3Mouk)[@Th3Mouk](https://github.com/Th3Mouk)

![](https://www.gravatar.com/avatar/bb8639a873e8027a684c61f649e40a62a1797085a20b6344cecc32aa15b7378c?d=identicon)[vince.ve](/maintainers/vince.ve)

![](https://avatars.githubusercontent.com/u/2012682?v=4)[etienne roudeix](/maintainers/etienneroudeix)[@etienneroudeix](https://github.com/etienneroudeix)

---

Top Contributors

[![Vinceveve](https://avatars.githubusercontent.com/u/912570?v=4)](https://github.com/Vinceveve "Vinceveve (108 commits)")[![etienneroudeix](https://avatars.githubusercontent.com/u/2012682?v=4)](https://github.com/etienneroudeix "etienneroudeix (83 commits)")[![madmatah](https://avatars.githubusercontent.com/u/2926?v=4)](https://github.com/madmatah "madmatah (11 commits)")[![Th3Mouk](https://avatars.githubusercontent.com/u/5006899?v=4)](https://github.com/Th3Mouk "Th3Mouk (5 commits)")[![Alban-io](https://avatars.githubusercontent.com/u/7300483?v=4)](https://github.com/Alban-io "Alban-io (3 commits)")[![lunika](https://avatars.githubusercontent.com/u/767834?v=4)](https://github.com/lunika "lunika (2 commits)")[![v-six](https://avatars.githubusercontent.com/u/3017033?v=4)](https://github.com/v-six "v-six (2 commits)")[![welcoMattic](https://avatars.githubusercontent.com/u/773875?v=4)](https://github.com/welcoMattic "welcoMattic (2 commits)")[![davidwdan](https://avatars.githubusercontent.com/u/4969183?v=4)](https://github.com/davidwdan "davidwdan (1 commits)")[![tlenclos](https://avatars.githubusercontent.com/u/686101?v=4)](https://github.com/tlenclos "tlenclos (1 commits)")[![krichprollsch](https://avatars.githubusercontent.com/u/562696?v=4)](https://github.com/krichprollsch "krichprollsch (1 commits)")[![flo-pereira](https://avatars.githubusercontent.com/u/1889274?v=4)](https://github.com/flo-pereira "flo-pereira (1 commits)")

---

Tags

httpredisqueuereactrabbitrxzeromqzmqreactivexhttpd

### Embed Badge

![Health badge](/badges/domraider-rxnet/health.svg)

```
[![Health](https://phpackages.com/badges/domraider-rxnet/health.svg)](https://phpackages.com/packages/domraider-rxnet)
```

###  Alternatives

[aws/aws-sdk-php

AWS SDK for PHP - Use Amazon Web Services in your PHP project

6.3k543.5M2.6k](/packages/aws-aws-sdk-php)[sylius/sylius

E-Commerce platform for PHP, based on Symfony framework.

8.5k5.9M733](/packages/sylius-sylius)[google/cloud

Google Cloud Client Library

1.2k16.7M57](/packages/google-cloud)[spatie/crawler

Crawl all internal links found on a website

2.8k18.5M65](/packages/spatie-crawler)[shopware/platform

The Shopware e-commerce core

3.4k1.5M3](/packages/shopware-platform)[shopware/core

Shopware platform is the core for all Shopware ecommerce products.

585.6M563](/packages/shopware-core)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
