PHPackages                             hyperf/single-flight-incubator - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. hyperf/single-flight-incubator

ActiveLibrary

hyperf/single-flight-incubator
==============================

A concurrent library for Hyperf

v0.3.0(7mo ago)4162↓100%1[1 PRs](https://github.com/hyperf/single-flight-incubator/pulls)MITPHPPHP &gt;=8.1CI passing

Since Jun 19Pushed 7mo ago1 watchersCompare

[ Source](https://github.com/hyperf/single-flight-incubator)[ Packagist](https://packagist.org/packages/hyperf/single-flight-incubator)[ Docs](https://github.com/hyperf/single-flight-incubator)[ Fund](https://hyperf.wiki/#/zh-cn/donate)[ Fund](https://opencollective.com/hyperf)[ RSS](/packages/hyperf-single-flight-incubator/feed)WikiDiscussions main Synced 1mo ago

READMEChangelog (4)Dependencies (8)Versions (5)Used By (0)

Hyperf Concurrent Tools
=======================

[](#hyperf-concurrent-tools)

一个hyperf常用并发工具库，包括single-flight、barrier、semaphore、worker-pool

安装
--

[](#安装)

```
composer require hyperf/single-flight-incubtor
```

基本使用
----

[](#基本使用)

**所有例子都在[examples](examples)目录下，更多用法请参考[tests](tests)目录**

### single-flight

[](#single-flight)

```
$ret = [];
$barrierKey = uniqid();

run(static function () use (&$ret, $barrierKey) {
    for ($i = 0; $i < 10; ++$i) {
        go(static function () use (&$ret, $barrierKey, $i) {
            $ret[] = SingleFlight::do($barrierKey, static function () use ($i) {
                // ensure that other coroutines can be scheduled at the same time
                usleep(1000);
                return [Coroutine::getCid() => $i];
            });
        });
    }
});

if (count(array_unique($ret)) === 1) {
    $ret = var_export($ret, true);
    printf("%s\n只有一个协程会执行闭包逻辑，其他协程等待其结果进行复用\n", $ret);
}
```

### barrier

[](#barrier)

```
run(static function () {
    $parties = 10;
    $barrier = new CounterBarrier($parties);
    $sleepMs = 5;

    for ($i = 0; $i < $parties - 1; ++$i) {
        go(static function () use ($barrier) {
            $waitAt = microtime(true);
            $barrier->await();
            // your biz logic here
            $resumeAt = microtime(true);
            $elapsed = ($resumeAt - $waitAt) * 1000;
            printf("协程 [%d] 等待 %.2f 毫秒后，恢复执行\n", Coroutine::getCid(), $elapsed);
        });
    }

    go(static function () use ($barrier, $sleepMs) {
        usleep($sleepMs * 1000);
        printf("协程 [%d] 作为最后一个协程，等待 %d 毫秒后加入屏障，同其他协程一起执行\n", Coroutine::getCid(), $sleepMs);
        $barrier->await();
        // your biz logic here
    });
});
```

### semaphore

[](#semaphore)

```
run(static function () {
    $sema = new Semaphore(3);

    go(static function () use ($sema) {
        $sleepSec = 1;
        $tokens = 1;
        defer(static function () use ($sema, $sleepSec, $tokens) {
            printf("协程 [%d] 占用信号量 %d 秒后释放\n", Coroutine::getCid(), $sleepSec);
            $sema->release($tokens);
        });

        $acquireAt = time();
        $sema->acquire($tokens);
        $resumedAt = time();
        $elapsed = $resumedAt - $acquireAt;
        printf("协程 [%d] 于 %d 秒后获取信号量成功\n", Coroutine::getCid(), $elapsed);
        sleep($sleepSec);
    });

    $chan = new Channel();
    go(static function () use ($sema, $chan) {
        $sleepSec = 2;
        $tokens = 2;
        defer(static function () use ($sema, $sleepSec, $tokens) {
            printf("协程 [%d] 占用信号量 %d 秒后释放\n", Coroutine::getCid(), $sleepSec);
            $sema->release($tokens);
        });

        $acquireAt = microtime(true);
        $sema->acquire($tokens);
        // 唤醒下面一个协程
        $chan->close();
        $resumedAt = microtime(true);
        $elapsed = ($resumedAt - $acquireAt) * 1000;
        printf("协程 [%d] 于 %d 秒后获取信号量成功\n", Coroutine::getCid(), $elapsed);
        sleep($sleepSec);
    });

    go(static function () use ($sema, $chan) {
        $tokens = 3;
        defer(static function () use ($sema, $tokens) {
            $sema->release($tokens);
            printf("协程 [%d] 释放信号量\n", Coroutine::getCid());
        });

        // 确保此协程在前一个协程后尝试获取信号量
        $chan->pop();
        $acquireAt = time();
        $sema->acquire($tokens);
        $resumedAt = time();
        $elapsed = $resumedAt - $acquireAt;
        printf("协程 [%d] 于 %d 秒后获取信号量成功\n", Coroutine::getCid(), $elapsed);
    });
});
```

### worker-pool

[](#worker-pool)

```
run(static function () {
    $config = new Config();
    $config->setCapacity(5);
    $pool = new WorkerPool($config);

    // 关闭协程池
    defer(static fn () => $pool->stop());

    // 投递异步任务
    $mockBiz = static fn (): int => Coroutine::getCid();
    $ret = $pool->submit($mockBiz);
    if (is_null($ret)) {
        printf("投递异步任务若不关心返回值可直接忽略\n");
    }

    // 投递同步任务，可直接获取结果
    $ret = $pool->submit($mockBiz, sync: true);
    if (Coroutine::getCid() !== $ret) {
        printf("同步任务投递到worker-pool中的工作协程执行\n");
    }

    // 投递异步任务，通过waitResult方法获取结果
    $task = new Task($mockBiz(...), sync: false);
    $nullRet = $pool->submitTask($task);
    if (is_null($nullRet)) {
        printf("投递异步任务不会直接得到返回值，可通过waitResult方法获取\n");
    }
    $ret = $task->waitResult();
    if (Coroutine::getCid() !== $ret) {
        printf("异步任务投递到worker-pool中的工作协程执行\n");
    }
});
```

### double-barrier

[](#double-barrier)

```
run(static function () {
    $barrier = new DoubleBarrier(3);

    $queuedMs = 10;
    $startAt = (int)floor(microtime(true) * 1000);
    $biz = static function () use ($startAt, $queuedMs) {
        $cid = Coroutine::getCid();
        $elapsed = (int)floor(microtime(true) * 1000) - $startAt;
        printf("协程 [%d] 等待 %d 毫秒后组队成功，同时执行\n", $cid, $elapsed);
        if ($cid % 2) {
            usleep($queuedMs * 1000);
            printf("协程 [%d] 额外执行 %d 毫秒后，结束执行\n", $cid, $queuedMs);
        } else {
            printf("协程 [%d] 执行完毕，等待其他协程执行完毕，同时退出\n", $cid);
        }
    };

    $exit = static function () use ($startAt) {
        $cid = Coroutine::getCid();
        $elapsed = (int)floor(microtime(true) * 1000) - $startAt;
        printf("协程 [%d] 共执行 %d 毫秒，同时结束执行\n", $cid, $elapsed);
    };

    go(static function () use ($barrier, $biz, $exit) {
        defer($exit(...));
        $barrier->execute($biz(...));
    });
    go(static function () use ($barrier, $biz, $exit) {
        defer($exit(...));
        $barrier->execute($biz(...));
    });
    go(static function () use ($barrier, $biz, $queuedMs, $exit) {
        defer($exit(...));
        usleep($queuedMs * 1000);
        $barrier->execute($biz(...));
    });
});
```

###  Health Score

35

—

LowBetter than 79% of packages

Maintenance67

Regular maintenance activity

Popularity19

Limited adoption so far

Community10

Small or concentrated contributor base

Maturity38

Early-stage or recently created project

 Bus Factor1

Top contributor holds 92.3% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Every ~31 days

Total

4

Last Release

228d ago

### Community

Maintainers

![](https://avatars.githubusercontent.com/u/44228082?v=4)[Hyperf Community](/maintainers/hyperf)[@hyperf](https://github.com/hyperf)

---

Top Contributors

[![alwaysLinger](https://avatars.githubusercontent.com/u/43213269?v=4)](https://github.com/alwaysLinger "alwaysLinger (36 commits)")[![limingxinleo](https://avatars.githubusercontent.com/u/16648551?v=4)](https://github.com/limingxinleo "limingxinleo (3 commits)")

---

Tags

phphyperfworker-poolbarriersingle-flightsemephoredouble-barrier

###  Code Quality

TestsPHPUnit

Static AnalysisPHPStan

Code StylePHP CS Fixer

Type Coverage Yes

### Embed Badge

![Health badge](/badges/hyperf-single-flight-incubator/health.svg)

```
[![Health](https://phpackages.com/badges/hyperf-single-flight-incubator/health.svg)](https://phpackages.com/packages/hyperf-single-flight-incubator)
```

###  Alternatives

[hyperf/nano

Scale Hyperf application down to a single file

43113.6k2](/packages/hyperf-nano)[hyperf/nacos

Nacos SDK

22487.9k10](/packages/hyperf-nacos)[friendsofhyperf/sentry

The sentry component for Hyperf.

1864.6k](/packages/friendsofhyperf-sentry)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
