fiberManager = new FiberManager($clock, $timer); }); it('creates stream from array', function () { $stream = AsyncStream::from([1, 2, 3, 4, 5], $this->fiberManager); $result = $stream->toArray(); expect($result)->toBe([1, 2, 3, 4, 5]); }); it('creates stream from range', function () { $stream = AsyncStream::range(1, 5, $this->fiberManager); $result = $stream->toArray(); expect($result)->toBe([1, 2, 3, 4, 5]); }); it('maps values', function () { $result = AsyncStream::from([1, 2, 3], $this->fiberManager) ->map(fn($x) => $x * 2) ->toArray(); expect($result)->toBe([2, 4, 6]); }); it('filters values', function () { $result = AsyncStream::from([1, 2, 3, 4, 5], $this->fiberManager) ->filter(fn($x) => $x % 2 === 0) ->toArray(); expect($result)->toBe([2, 4]); }); it('chains multiple operations', function () { $result = AsyncStream::from([1, 2, 3, 4, 5], $this->fiberManager) ->map(fn($x) => $x * 2) // [2, 4, 6, 8, 10] ->filter(fn($x) => $x > 5) // [6, 8, 10] ->map(fn($x) => $x / 2) // [3, 4, 5] ->toArray(); expect($result)->toBe([3, 4, 5]); }); it('reduces stream to single value', function () { $sum = AsyncStream::from([1, 2, 3, 4, 5], $this->fiberManager) ->reduce(fn($acc, $x) => $acc + $x, 0); expect($sum)->toBe(15); }); it('takes first N elements', function () { $result = AsyncStream::range(1, 100, $this->fiberManager) ->take(5) ->toArray(); expect($result)->toBe([1, 2, 3, 4, 5]); }); it('skips first N elements', function () { $result = AsyncStream::range(1, 10, $this->fiberManager) ->skip(7) ->toArray(); expect($result)->toBe([8, 9, 10]); }); it('chunks stream into batches', function () { $result = AsyncStream::range(1, 10, $this->fiberManager) ->chunk(3) ->toArray(); expect($result)->toBe([ [1, 2, 3], [4, 5, 6], [7, 8, 9], [10] ]); }); it('counts elements', function () { $count = AsyncStream::from([1, 2, 3, 4, 5], $this->fiberManager) ->filter(fn($x) => $x > 2) ->count(); expect($count)->toBe(3); }); it('checks if any element matches', function () { $hasEven = AsyncStream::from([1, 3, 5, 7, 8], $this->fiberManager) ->any(fn($x) => $x % 2 === 0); expect($hasEven)->toBeTrue(); $hasNegative = AsyncStream::from([1, 2, 3], $this->fiberManager) ->any(fn($x) => $x < 0); expect($hasNegative)->toBeFalse(); }); it('checks if all elements match', function () { $allPositive = AsyncStream::from([1, 2, 3, 4], $this->fiberManager) ->all(fn($x) => $x > 0); expect($allPositive)->toBeTrue(); $allEven = AsyncStream::from([2, 4, 5, 6], $this->fiberManager) ->all(fn($x) => $x % 2 === 0); expect($allEven)->toBeFalse(); }); it('finds first matching element', function () { $first = AsyncStream::from([1, 3, 5, 2, 4], $this->fiberManager) ->first(fn($x) => $x % 2 === 0); expect($first)->toBe(2); }); it('finds first element without predicate', function () { $first = AsyncStream::from([5, 10, 15], $this->fiberManager) ->first(); expect($first)->toBe(5); }); it('groups elements by key', function () { $groups = AsyncStream::from([1, 2, 3, 4, 5, 6], $this->fiberManager) ->groupBy(fn($x) => $x % 2 === 0 ? 'even' : 'odd'); expect($groups)->toHaveKey('even'); expect($groups)->toHaveKey('odd'); expect($groups['even'])->toBe([2, 4, 6]); expect($groups['odd'])->toBe([1, 3, 5]); }); it('removes duplicates', function () { $result = AsyncStream::from([1, 2, 2, 3, 3, 3, 4], $this->fiberManager) ->distinct() ->toArray(); expect($result)->toBe([1, 2, 3, 4]); }); it('removes duplicates with key selector', function () { $data = [ ['id' => 1, 'name' => 'Alice'], ['id' => 2, 'name' => 'Bob'], ['id' => 1, 'name' => 'Alice Copy'], // Duplicate ID ]; $result = AsyncStream::from($data, $this->fiberManager) ->distinct(fn($x) => $x['id']) ->toArray(); expect(count($result))->toBe(2); expect($result[0]['id'])->toBe(1); expect($result[1]['id'])->toBe(2); }); it('sorts stream', function () { $result = AsyncStream::from([3, 1, 4, 1, 5, 9, 2], $this->fiberManager) ->sorted() ->toArray(); expect($result)->toBe([1, 1, 2, 3, 4, 5, 9]); }); it('sorts with custom comparator', function () { $result = AsyncStream::from([3, 1, 4, 1, 5], $this->fiberManager) ->sorted(fn($a, $b) => $b <=> $a) // Descending ->toArray(); expect($result)->toBe([5, 4, 3, 1, 1]); }); it('flat maps nested structures', function () { $result = AsyncStream::from([[1, 2], [3, 4], [5]], $this->fiberManager) ->flatMap(fn($arr) => $arr) ->toArray(); expect($result)->toBe([1, 2, 3, 4, 5]); }); it('executes forEach action', function () { $sum = 0; AsyncStream::from([1, 2, 3, 4, 5], $this->fiberManager) ->forEach(function ($x) use (&$sum) { $sum += $x; }); expect($sum)->toBe(15); }); it('taps into stream without modification', function () { $sideEffects = []; $result = AsyncStream::from([1, 2, 3], $this->fiberManager) ->tap(function ($x) use (&$sideEffects) { $sideEffects[] = $x; }) ->map(fn($x) => $x * 2) ->toArray(); expect($sideEffects)->toBe([1, 2, 3]); expect($result)->toBe([2, 4, 6]); }); it('integrates with AsyncChannel', function () { $channel = new AsyncChannel(capacity: 10); // Send data to channel AsyncStream::range(1, 5, $this->fiberManager) ->toChannel($channel); // Read from channel as stream $result = AsyncStream::fromChannel($channel, $this->fiberManager) ->toArray(); expect($result)->toBe([1, 2, 3, 4, 5]); }); it('processes large streams efficiently', function () { $result = AsyncStream::range(1, 1000, $this->fiberManager) ->filter(fn($x) => $x % 2 === 0) ->map(fn($x) => $x / 2) ->take(10) ->toArray(); expect($result)->toBe([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); }); it('combines multiple stream patterns', function () { $result = AsyncStream::range(1, 20, $this->fiberManager) ->filter(fn($x) => $x % 2 === 0) // Even numbers ->map(fn($x) => $x * $x) // Square them ->skip(2) // Skip first 2 ->take(5) // Take next 5 ->reduce(fn($acc, $x) => $acc + $x, 0); // Even: 2,4,6,8,10,12,14,16,18,20 // Squared: 4,16,36,64,100,144,196,256,324,400 // Skip 2: 36,64,100,144,196,256,324,400 // Take 5: 36,64,100,144,196 // Sum: 540 expect($result)->toBe(540); }); });