PHPackages                             reactphp-x/tunnel-stream - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Utility &amp; Helpers](/categories/utility)
4. /
5. reactphp-x/tunnel-stream

ActiveLibrary[Utility &amp; Helpers](/categories/utility)

reactphp-x/tunnel-stream
========================

v1.0.0(12mo ago)2443MITPHP

Since May 22Pushed 12mo agoCompare

[ Source](https://github.com/reactphp-x/tunnel-stream)[ Packagist](https://packagist.org/packages/reactphp-x/tunnel-stream)[ RSS](/packages/reactphp-x-tunnel-stream/feed)WikiDiscussions master Synced 1mo ago

READMEChangelog (1)Dependencies (9)Versions (2)Used By (3)

Tunnel Stream
=============

[](#tunnel-stream)

一个基于 ReactPHP 的流式隧道通信库，支持在不同进程间传输数据流和执行闭包函数。

特性
--

[](#特性)

- 🚀 支持进程间双向数据流传输
- 🔄 支持序列化闭包函数在不同进程间执行
- 📦 基于 MessagePack 的高效二进制协议
- 💓 内置心跳检测机制
- ⚠️ 完整的错误处理和事件通知
- ⏱️ 支持异步 Promise 操作
- 🔒 支持闭包函数序列化安全控制

安装
--

[](#安装)

```
composer require reactphp-x/tunnel-stream
```

快速开始
----

[](#快速开始)

### 1. 创建隧道流

[](#1-创建隧道流)

```
use ReactphpX\TunnelStream\TunnelStream;
use React\Stream\ThroughStream;

// 创建读写流
$read = new ThroughStream();
$write = new ThroughStream();

// 初始化隧道流
$tunnelStream = new TunnelStream($read, $write);
```

### 2. 执行远程闭包

[](#2-执行远程闭包)

```
// 在远程进程执行闭包函数
$stream = $tunnelStream->run(function () {
    return file_get_contents('example.txt');
});

// 处理返回的数据流
$stream->on('data', function ($data) {
    echo $data;
});

$stream->on('end', function () {
    echo "Stream ended\n";
});

$stream->on('error', function (Exception $e) {
    echo "Error: " . $e->getMessage() . "\n";
});
```

### 3. 心跳检测

[](#3-心跳检测)

```
$tunnelStream->ping(3)->then(
    function () {
        echo "Ping successful\n";
    },
    function (\Exception $e) {
        echo "Ping failed: " . $e->getMessage() . "\n";
    }
);
```

进阶用法
----

[](#进阶用法)

### 子进程通信

[](#子进程通信)

在父子进程间建立隧道流通信是一个常见的使用场景。以下是一个完整的示例：

#### 主进程 (process.php)

[](#主进程-processphp)

```
use ReactphpX\TunnelStream\TunnelStream;
use React\ChildProcess\Process;
use React\EventLoop\Loop;

// 创建子进程
$process = new Process(sprintf(
    'exec php %s/child_process_init.php',
    __DIR__
));

$process->start();

// 创建隧道流
$tunnelStream = new TunnelStream($process->stderr, $process->stdin);

// 监听子进程输出
$process->stdout->on('data', function ($data) {
    echo "STDOUT: " . $data . PHP_EOL;
});

$process->stderr->on('data', function ($data) {
    echo "STDERR: " . $data . PHP_EOL;
});

$process->on('exit', function ($exitCode) {
    echo "Process exited with code $exitCode\n";
});

// 执行文件读取操作
$fileStream = $tunnelStream->run(function () {
    return file_get_contents(__DIR__ . '/composer.json');
});

$fileStream->on('data', function ($data) {
    echo "File Stream: " . $data . PHP_EOL;
});

$fileStream->on('error', function ($error) {
    echo "File Stream Error: " . $error->getMessage() . PHP_EOL;
});

// 执行异步延迟操作
$promiseStream = $tunnelStream->run(function () {
    return \React\Promise\Timer\sleep(2)->then(function () {
        return 'Hello World';
    });
});

$promiseStream->on('data', function ($data) {
    echo "Promise Stream: " . $data . PHP_EOL;
});

// 持续数据流示例
$alwayStream = $tunnelStream->run(function ($stream) {
    $i = 0;
    $timer = Loop::addPeriodicTimer(1, function () use ($stream, &$i) {
        $stream->write('Hello World' . $i . PHP_EOL);
        $i++;
    });

    $stream->on('close', function () use ($timer) {
        Loop::cancelTimer($timer);
        echo "Always Stream Close\n";
    });

    return $stream;
});

$alwayStream->on('data', function ($data) {
    echo "Always Stream: " . $data . PHP_EOL;
});

// 5秒后关闭持续数据流
Loop::addTimer(5, function () use ($alwayStream) {
    $alwayStream->close();
});
```

#### 子进程 (child\_process\_init.php)

[](#子进程-child_process_initphp)

```
