fiberManager = new FiberManager($clock, $timer); }); it('creates unbuffered channel by default', function () { $channel = new AsyncChannel(); $stats = $channel->getStats(); expect($stats['capacity'])->toBe(0); expect($stats['buffered_items'])->toBe(0); }); it('creates buffered channel with capacity', function () { $channel = new AsyncChannel(capacity: 10); $stats = $channel->getStats(); expect($stats['capacity'])->toBe(10); }); it('supports producer-consumer pattern with buffered channel', function () { $channel = new AsyncChannel(capacity: 5); $received = []; // Producer fiber $producer = $this->fiberManager->asyncCooperative(function () use ($channel) { for ($i = 1; $i <= 3; $i++) { $channel->send("item-{$i}"); } $channel->close(); }); // Consumer fiber $consumer = $this->fiberManager->asyncCooperative(function () use ($channel, &$received) { try { while (true) { $item = $channel->receive(); $received[] = $item; } } catch (ChannelClosedException) { // Channel closed, stop consuming } }); // Start fibers $producer->start(); $consumer->start(); // Process until both terminated while (!$producer->isTerminated() || !$consumer->isTerminated()) { if ($producer->isSuspended()) { $producer->resume(); } if ($consumer->isSuspended()) { $consumer->resume(); } } expect($received)->toBe(['item-1', 'item-2', 'item-3']); }); it('blocks producer when buffered channel is full', function () { $channel = new AsyncChannel(capacity: 2); $sent = 0; $producer = new Fiber(function () use ($channel, &$sent) { for ($i = 1; $i <= 5; $i++) { $channel->send("item-{$i}"); $sent = $i; } }); $producer->start(); // Producer should have sent 2 items and then blocked expect($sent)->toBe(2); expect($channel->count())->toBe(2); // Receive one item to free space $item = $channel->receive(); expect($item)->toBe('item-1'); // Resume producer if ($producer->isSuspended()) { $producer->resume(); } // Should have sent one more expect($sent)->toBe(3); }); it('throws exception when sending to closed channel', function () { $channel = new AsyncChannel(capacity: 5); $channel->close(); expect(fn() => $channel->send('test')) ->toThrow(ChannelClosedException::class); }); it('throws exception when receiving from closed empty channel', function () { $channel = new AsyncChannel(capacity: 5); $channel->close(); expect(fn() => $channel->receive()) ->toThrow(ChannelClosedException::class); }); it('allows receiving remaining items after close', function () { $channel = new AsyncChannel(capacity: 5); $channel->send('item-1'); $channel->send('item-2'); $channel->close(); // Can still receive buffered items expect($channel->receive())->toBe('item-1'); expect($channel->receive())->toBe('item-2'); // Now throws expect(fn() => $channel->receive()) ->toThrow(ChannelClosedException::class); }); it('supports non-blocking try operations', function () { $channel = new AsyncChannel(capacity: 2); // trySend succeeds when space available expect($channel->trySend('item-1'))->toBeTrue(); expect($channel->trySend('item-2'))->toBeTrue(); expect($channel->trySend('item-3'))->toBeFalse(); // Full // tryReceive succeeds when items available $result = $channel->tryReceive(); expect($result['success'])->toBeTrue(); expect($result['value'])->toBe('item-1'); $result = $channel->tryReceive(); expect($result['success'])->toBeTrue(); expect($result['value'])->toBe('item-2'); $result = $channel->tryReceive(); expect($result['success'])->toBeFalse(); // Empty }); it('provides accurate channel statistics', function () { $channel = new AsyncChannel(capacity: 10); $channel->send('item-1'); $channel->send('item-2'); $channel->send('item-3'); $stats = $channel->getStats(); expect($stats['capacity'])->toBe(10); expect($stats['buffered_items'])->toBe(3); expect($stats['is_closed'])->toBeFalse(); $channel->close(); $stats = $channel->getStats(); expect($stats['is_closed'])->toBeTrue(); }); it('counts buffered items correctly', function () { $channel = new AsyncChannel(capacity: 5); expect($channel->count())->toBe(0); $channel->send('a'); expect($channel->count())->toBe(1); $channel->send('b'); $channel->send('c'); expect($channel->count())->toBe(3); $channel->receive(); expect($channel->count())->toBe(2); }); });