PHPackages                             glider88/amp-redis-streams - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Caching](/categories/caching)
4. /
5. glider88/amp-redis-streams

ActiveLibrary[Caching](/categories/caching)

glider88/amp-redis-streams
==========================

Redis Streams with Amphp

1.0.1(2mo ago)00MITPHPPHP ^8.2

Since Dec 7Pushed 2mo agoCompare

[ Source](https://github.com/Glider88/amphp-redis-streams)[ Packagist](https://packagist.org/packages/glider88/amp-redis-streams)[ RSS](/packages/glider88-amp-redis-streams/feed)WikiDiscussions main Synced today

READMEChangelogDependencies (10)Versions (3)Used By (0)

AMPHP redis streams realization
===============================

[](#amphp-redis-streams-realization)

### Installation:

[](#installation)

```
composer require glider88/amp-redis-streams
```

Start docker:

```
bin/re # first time
```

```
bin/up # next times
```

Tests:

```
bin/unit
```

### Run example:

[](#run-example)

Producer:

```
bin/php examples/producer.php
```

Consumer:

```
bin/php examples/consumer.php
```

### Settings:

[](#settings)

```
    // create amphp redis
    $ampRedis = createRedisClient('redis://redis:6379');

    // second amphp redis, because Stream has blocking operation 'XREADGROUP GROUP ... BLOCK ...', this is necessary to reduce latency
    $ampRedisBlocked = createRedisClient('redis://redis:6379');

    // wrapper for amphp redis that implement missing redis commands
    $advRedis = new AdvancedAmpRedis($ampRedis, $logger);
    $advRedisBlocked = new AdvancedAmpRedis($ampRedisBlocked, $logger);

    $redis = new Redis(
        stream: 's',                    // stream name
        group: 'g',                     // consumer group name
        redis: $advRedis,
        redisBlocked: $advRedisBlocked,
        logger: new NullLogger(),
        maxStreamLength: 1000,          // approximate stream size
        maxDlqStreamLength: 1000,       // approximate stream size for dead letters
        readRetrySetCount: 100,         // how many entities we get at time from redis sorted set
                                        // (redis set is used to implement logic of retries, with name s:g:retry)
        readAutoClaimCount: 100,        // how many autoclaim entities we get at time
        blockRead: new Sec(1),          // how long we wait first data from stream
        deduplicationTtl: new Sec(3),   // for deduplication logic used `SET $streamMessageId . '-' . $this->group` with this ttl
        autoClaimMinIdle: new Sec(1),   // after this time we get message from PEL by autoclaim
        consumer: 'c',                  // optional consumer name, or generate: 'c-'.gethostname().'-'.getmypid()
    );

    $stream = new Stream(
        stream: 's',                               // stream name
        group: 'g',                                // consumer group name
        redis: $redis,
        maxRetries: 3,                             // after we send message to dead letters stream (with s:dql name)
        logger: new NullLogger(),
        retry: new MultiplyRetry(                  // retry with incremental increase time: 0 1 2 3... seconds wait before retry
            firstOffsetDelay: new Milli(0),
            baseDelay: new Sec(1),
        ),
        scaling: new PiecewiseLinearScaling([ // complex scaling number of workers, for empty stream (< 500) use 16 worker, next 32
            16 => 0,
            32 => 500,
        ]),
        retryInterval: new Milli(100),  // how often we launch retry logic
        claimInterval: new Milli(100),  // how often we launch autoclaim logic
        timeoutJob: new Sec(100),       // timeout for job, after which we cancel it by TimeoutCancellation exception
    );
```

###  Health Score

37

—

LowBetter than 81% of packages

Maintenance88

Actively maintained with recent releases

Popularity0

Limited adoption so far

Community2

Small or concentrated contributor base

Maturity49

Maturing project, gaining track record

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~148 days

Total

2

Last Release

61d ago

### Community

Maintainers

![](https://avatars.githubusercontent.com/u/1173406?v=4)[Pavel Sakharov](/maintainers/Glider88)[@Glider88](https://github.com/Glider88)

---

Tags

streamredisamphp

###  Code Quality

TestsPHPUnit

### Embed Badge

![Health badge](/badges/glider88-amp-redis-streams/health.svg)

```
[![Health](https://phpackages.com/badges/glider88-amp-redis-streams/health.svg)](https://phpackages.com/packages/glider88-amp-redis-streams)
```

###  Alternatives

[danog/madelineproto

Async PHP client API for the telegram MTProto protocol.

3.5k902.0k23](/packages/danog-madelineproto)[predis/predis

A flexible and feature-complete Redis/Valkey client for PHP.

7.8k325.4M2.8k](/packages/predis-predis)[amphp/http-server

A non-blocking HTTP application server for PHP based on Amp.

1.3k6.7M110](/packages/amphp-http-server)[amphp/redis

Efficient asynchronous communication with Redis servers, enabling scalable and responsive data storage and retrieval.

167714.3k59](/packages/amphp-redis)[amphp/byte-stream

A stream abstraction to make working with non-blocking I/O simple.

394125.7M136](/packages/amphp-byte-stream)[amphp/pipeline

Asynchronous iterators and operators.

7740.0M49](/packages/amphp-pipeline)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
