PHPackages                             phunkie/streams - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Utility &amp; Helpers](/categories/utility)
4. /
5. phunkie/streams

ActiveLibrary[Utility &amp; Helpers](/categories/utility)

phunkie/streams
===============

Functional Streams for PHP

1.1.0(3mo ago)115MITPHPPHP ^8.2 || ^8.3 || ^8.4 || ^8.5CI passing

Since Oct 15Pushed 3mo agoCompare

[ Source](https://github.com/phunkie/streams)[ Packagist](https://packagist.org/packages/phunkie/streams)[ RSS](/packages/phunkie-streams/feed)WikiDiscussions main Synced 1mo ago

READMEChangelog (3)Dependencies (6)Versions (5)Used By (0)

Phunkie Streams
===============

[](#phunkie-streams)

[![CI](https://github.com/phunkie/streams/actions/workflows/ci.yml/badge.svg)](https://github.com/phunkie/streams/actions)[![Latest Stable Version](https://camo.githubusercontent.com/af8d2a93270dbe8df1c0c44d5454e4e3c712e819c117248be5daf991af6a5d6b/68747470733a2f2f706f7365722e707567782e6f72672f7068756e6b69652f73747265616d732f762f737461626c65)](https://packagist.org/packages/phunkie/streams)[![Total Downloads](https://camo.githubusercontent.com/a3c8aeeb3bd144c355f35fa17022c1af1a560f6cffe19ece0701269464d69839/68747470733a2f2f706f7365722e707567782e6f72672f7068756e6b69652f73747265616d732f646f776e6c6f616473)](https://packagist.org/packages/phunkie/streams)[![License](https://camo.githubusercontent.com/388de729a213b53c9d21cbddd3f30caa8b9b67661d8433b4e72ba0d4e8e329d7/68747470733a2f2f706f7365722e707567782e6f72672f7068756e6b69652f73747265616d732f6c6963656e7365)](https://packagist.org/packages/phunkie/streams)[![PHP Version](https://camo.githubusercontent.com/65ca48900cd56a3a7d4d8e6f05b87981cd0dae277cb6e528071efc377b05b306/68747470733a2f2f696d672e736869656c64732e696f2f7061636b61676973742f7068702d762f7068756e6b69652f73747265616d732e737667)](https://packagist.org/packages/phunkie/streams)

Phunkie Streams is a PHP functional library for working with streams inspired by functional streaming libraries like fs2 (Scala). It allows you to process data in a declarative, composable way.

Installation
------------

[](#installation)

```
composer require phunkie/streams

```

Features
--------

[](#features)

- **Pure streams**: Finite sequences of values that can be transformed and combined
- **Infinite streams**: Unbounded sequences that can be processed lazily
- **Effectful operations**: Operations that interact with the outside world (I/O, etc.) using phunkie/effect
- **Resource management**: Safe handling of resources through bracket pattern for guaranteed cleanup
- **Error handling**: Functional error handling with `attempt()` and `handleError()`
- **Monadic composition**: Compose IO operations with `flatMap()` for type-safe pipelines
- **Stream operations**: Rich set of operations including `through()`, `takeWhile()`, `dropWhile()`, `chunk()`
- **Memory efficient**: True streaming with constant memory usage regardless of data size
- **Parallel processing**: Concurrent operations with automatic fallback and memory optimization

What's Implemented
------------------

[](#whats-implemented)

Phunkie Streams has completed Phases 1-3 of development. Here's what's currently available:

### Core Stream Types

[](#core-stream-types)

- **Pure Streams** - `Stream(1, 2, 3)` for finite in-memory sequences
- **Infinite Streams** - `Stream(iterate(...))`, `Stream(fromRange(...))` for unbounded sequences
- **Resource Streams** - File and network I/O streams with automatic resource management

### Stream Operations

[](#stream-operations)

- **Transformations**: `map()`, `filter()`, `flatMap()`
- **Composition**: `concat()`, `merge()`, `interleave()`, `zip()`, `zipWith()`
- **Control flow**: `take()`, `drop()`, `takeWhile()`, `dropWhile()`
- **Batching**: `chunk()` for processing in fixed-size chunks
- **Pipes**: `through()` for composable transformation pipelines
- **Compilation**: `compile()->toArray()`, `compile()->toList()`, `compile->drain`

### File I/O

[](#file-io)

- **Reading**: `Stream(new Path('file.txt'))`, `readFileContents()`, `readLines()`
- **Writing**: `writeFileContents()`, `writeLines()`, `writeFile()` pipe function
- **Utilities**: `exists()`, `deleteFile()`
- All file operations use `bracket()` internally for guaranteed cleanup

### Network Operations

[](#network-operations)

- **HTTP**: `Network::httpGet()`, `Network::httpPost()`, `Network::httpPut()`, `Network::httpDelete()`
- **TCP Client**: `Network::client(SocketAddress)` for socket connections
- **TCP Server**: `Network::server(host:, port:)` for accepting connections
- **Socket Writing**: `Network::socketWrite()` pipe function
- All network resources use automatic cleanup via `__destruct()`

### Resource Management

[](#resource-management)

- **Bracket pattern**: `bracket(acquire, use, release)` from phunkie/effect
- **Automatic cleanup**: Resource objects (HttpRequest, SocketRead, etc.) clean up automatically
- **Safe by default**: All provided file/network functions handle resources safely
- See [Resource Management Guide](doc/resource-management.md) for patterns

### Error Handling

[](#error-handling)

- **Validation**: `IO->attempt()` converts exceptions to `Validation`
- **Recovery**: `IO->handleError(fn)` for error recovery with fallback values
- **Composition**: Chain error handling with `flatMap()` for complex scenarios
- See [Error Handling Guide](doc/error-handling.md) for comprehensive patterns

### Concurrency &amp; Parallel Processing

[](#concurrency--parallel-processing)

- **Parallel operations**: `parMap()`, `parMapValidation()`, `parEvalMap()`, `parTraverse()`, `parEval()`
- **Concurrent merging**: `parMerge()`, `parMergeMap()` for parallel stream combining
- **Auto CPU detection**: Automatically detects CPU cores for optimal parallelism
- **Automatic fallback**: Falls back to sequential execution on concurrency failures
- **Memory optimized**: All parallel operations process incrementally without materializing entire streams

### Not Yet Implemented

[](#not-yet-implemented)

- **Backpressure**: Flow control mechanisms (planned for future release)
- **Connection pooling**: Resource pooling for HTTP/sockets (planned for future release)
- **Process integration**: System command execution (future consideration)

Pure Streams
------------

[](#pure-streams)

```
// Stream
$stream = Stream(1, 2, 3, 4);

// Pure streams can be converted to other collections
$stream->toList();   // List (1, 2, 3, 4)
$stream->toArray();  // [1, 2, 3, 4]
```

Transformations on Pure Streams
-------------------------------

[](#transformations-on-pure-streams)

```
use const Phunkie\Functions\numbers\increment;

$stream = Stream(1, 2, 3, 4);

$stream->map(increment)->toList();
// List (2, 3, 4, 5)

Stream(1, 2, 3, 4)
    ->zipWith(increment)
    ->toList();
// List(Pair(1, 2), Pair(2, 3), Pair(3, 4), Pair(4, 5))
```

Infinite Streams
----------------

[](#infinite-streams)

```
// Stream from a range
$fromRange = Stream(fromRange(0, 1000000000));
$fromRange->take(10)->compile()->toList();
// List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)

// Stream from an iteration
$infiniteOdds = Stream(iterate(1)(fn ($x) => $x + 2));
$infiniteOdds->take(10)->compile()->toList();
// List(1, 3, 5, 7, 9, 11, 13, 15, 17, 19)

// Repeating a finite stream infinitely
$repeat = Stream(1, 2, 3)
    ->repeat()->take(12)->compile()->toList();
// List(1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3)
```

Picking on Infinite Streams with runlog
---------------------------------------

[](#picking-on-infinite-streams-with-runlog)

```
Stream(1, 2, 3)
    ->repeat()
    ->runLog()
    ->unsafeRun();
// [1, 2, 3, 1, 2, 3, 1, 2, 3, ...]
```

Combining Streams
-----------------

[](#combining-streams)

```
// Concatenation
Stream(1, 2, 3)
    ->concat(Stream(4, 5, 6))
    ->compile()
    ->toList();
// List(1, 2, 3, 4, 5, 6)

// Interleaving
$x = Stream(1, 2, 3, 4, 5);
$y = Stream("Monday", "Tuesday", "Wednesday", "Thursday", "Friday");
$z = Stream(true, false, true, false, true);

$x->interleave($y, $z)->compile()->toList();
// List(1, "Monday", true, 2, "Tuesday", false, 3, "Wednesday", true, 4,
// "Thursday", false, 5, "Friday", true)
```

Basic Usage
-----------

[](#basic-usage)

```
