filter(fn($x) => $x % 2 === 0) ->map(fn($x) => $x * $x) ->toArray(); echo "Even numbers squared: " . json_encode($result) . "\n\n"; // Example 2: Large dataset processing with chunking echo "2. Large Dataset with Chunking:\n"; AsyncStream::range(1, 100, $fiberManager) ->chunk(20) ->map(fn($chunk) => array_sum($chunk)) ->forEach(fn($sum) => echo "Chunk sum: $sum\n"); echo "\n"; // Example 3: Data aggregation echo "3. Data Aggregation:\n"; $users = [ ['name' => 'Alice', 'age' => 30, 'city' => 'Berlin'], ['name' => 'Bob', 'age' => 25, 'city' => 'Munich'], ['name' => 'Charlie', 'age' => 35, 'city' => 'Berlin'], ['name' => 'Diana', 'age' => 28, 'city' => 'Munich'], ]; $byCity = AsyncStream::from($users, $fiberManager) ->groupBy(fn($user) => $user['city']); foreach ($byCity as $city => $users) { echo "$city: " . count($users) . " users\n"; } echo "\n"; // Example 4: Complex transformation echo "4. Complex Transformation:\n"; $avgAge = AsyncStream::from($users, $fiberManager) ->filter(fn($u) => $u['city'] === 'Berlin') ->map(fn($u) => $u['age']) ->reduce(fn($sum, $age) => $sum + $age, 0) / 2; echo "Average age in Berlin: $avgAge\n\n"; // Example 5: Channel integration echo "5. Channel Integration:\n"; $channel = new AsyncChannel(capacity: 5); // Producer fiber $producer = $fiberManager->asyncCooperative(function () use ($channel, $fiberManager) { AsyncStream::range(1, 5, $fiberManager) ->map(fn($x) => "Message-$x") ->toChannel($channel); }); // Consumer using stream $consumer = $fiberManager->asyncCooperative(function () use ($channel, $fiberManager) { $messages = AsyncStream::fromChannel($channel, $fiberManager) ->toArray(); echo "Received: " . json_encode($messages) . "\n"; }); $producer->start(); $consumer->start(); while (!$producer->isTerminated() || !$consumer->isTerminated()) { if ($producer->isSuspended()) $producer->resume(); if ($consumer->isSuspended()) $consumer->resume(); } echo "\n=== All Examples Completed ===\n";