PHPackages                             glider88/amp-redis-streams - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Caching](/categories/caching)
4. /
5. glider88/amp-redis-streams

ActiveLibrary[Caching](/categories/caching)

glider88/amp-redis-streams
==========================

Redis Streams with Amphp

1.0.0(5mo ago)00MITPHPPHP ^8.2

Since Dec 7Pushed 5mo agoCompare

[ Source](https://github.com/Glider88/amphp-redis-streams)[ Packagist](https://packagist.org/packages/glider88/amp-redis-streams)[ RSS](/packages/glider88-amp-redis-streams/feed)WikiDiscussions main Synced 1mo ago

READMEChangelogDependencies (5)Versions (2)Used By (0)

AMPHP redis streams realization
===============================

[](#amphp-redis-streams-realization)

### Installation:

[](#installation)

```
composer require glider88/amp-redis-streams
```

Start docker:

```
bin/re # first time
```

```
bin/up # next times
```

Tests:

```
bin/unit
```

### Run example:

[](#run-example)

Producer:

```
bin/php examples/producer.php
```

Consumer:

```
bin/php examples/consumer.php
```

### Settings:

[](#settings)

```
    // create amphp redis
    $ampRedis = createRedisClient('redis://redis:6379');

    // second amphp redis, because Stream has blocking operation 'XREADGROUP GROUP ... BLOCK ...', this is necessary to reduce latency
    $ampRedisBlocked = createRedisClient('redis://redis:6379');

    // wrapper for amphp redis that implement missing redis commands
    $advRedis = new AdvancedAmpRedis($ampRedis, $logger);
    $advRedisBlocked = new AdvancedAmpRedis($ampRedisBlocked, $logger);

    $redis = new Redis(
        stream: 's',                    // stream name
        group: 'g',                     // consumer group name
        redis: $advRedis,
        redisBlocked: $advRedisBlocked,
        logger: new NullLogger(),
        maxStreamLength: 1000,          // approximate stream size
        maxDlqStreamLength: 1000,       // approximate stream size for dead letters
        readRetrySetCount: 100,         // how many entities we get at time from redis sorted set
                                        // (redis set is used to implement logic of retries, with name s:g:retry)
        readAutoClaimCount: 100,        // how many autoclaim entities we get at time
        blockRead: new Sec(1),          // how long we wait first data from stream
        deduplicationTtl: new Sec(3),   // for deduplication logic used `SET $streamMessageId . '-' . $this->group` with this ttl
        autoClaimMinIdle: new Sec(1),   // after this time we get message from PEL by autoclaim
        consumer: 'c',                  // optional consumer name, or generate: 'c-'.gethostname().'-'.getmypid()
    );

    $stream = new Stream(
        stream: 's',                               // stream name
        group: 'g',                                // consumer group name
        redis: $redis,
        maxRetries: 3,                             // after we send message to dead letters stream (with s:dql name)
        logger: new NullLogger(),
        retry: new MultiplyRetry(                  // retry with incremental increase time: 0 1 2 3... seconds wait before retry
            firstOffsetDelay: new Milli(0),
            baseDelay: new Sec(1),
        ),
        scaling: new PiecewiseLinearScaling([ // complex scaling number of workers, for empty stream (< 500) use 16 worker, next 32
            16 => 0,
            32 => 500,
        ]),
        retryInterval: new Milli(100),  // how often we launch retry logic
        claimInterval: new Milli(100),  // how often we launch autoclaim logic
        timeoutJob: new Sec(100),       // timeout for job, after which we cancel it by TimeoutCancellation exception
    );
```

###  Health Score

33

—

LowBetter than 75% of packages

Maintenance73

Regular maintenance activity

Popularity0

Limited adoption so far

Community2

Small or concentrated contributor base

Maturity47

Maturing project, gaining track record

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Unknown

Total

1

Last Release

153d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/7103e36446ad2fc58a3da730afd7d0c5a0b89b01a231dbd537832030bd8cadd7?d=identicon)[Glider88](/maintainers/Glider88)

---

Tags

streamredisamphp

###  Code Quality

TestsPHPUnit

### Embed Badge

![Health badge](/badges/glider88-amp-redis-streams/health.svg)

```
[![Health](https://phpackages.com/badges/glider88-amp-redis-streams/health.svg)](https://phpackages.com/packages/glider88-amp-redis-streams)
```

###  Alternatives

[predis/predis

A flexible and feature-complete Redis/Valkey client for PHP.

7.8k305.7M2.4k](/packages/predis-predis)[amphp/redis

Efficient asynchronous communication with Redis servers, enabling scalable and responsive data storage and retrieval.

165634.7k44](/packages/amphp-redis)[amphp/byte-stream

A stream abstraction to make working with non-blocking I/O simple.

392116.2M104](/packages/amphp-byte-stream)[danog/madelineproto

Async PHP client API for the telegram MTProto protocol.

3.4k855.0k18](/packages/danog-madelineproto)[snc/redis-bundle

A Redis bundle for Symfony

1.0k39.4M67](/packages/snc-redis-bundle)[clue/redis-protocol

A streaming Redis protocol (RESP) parser and serializer written in pure PHP.

5311.0M13](/packages/clue-redis-protocol)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
