maxProcesses = $maxProcesses; $this->tempDir = $tempDir ?? sys_get_temp_dir(); } /** * {@inheritdoc} */ public function processTasks(array $tasks, mixed ...$sharedData): array { if (empty($tasks)) { return []; } // Fasse gemeinsame Daten zusammen, die serialisierbar sind $serializedSharedData = $this->safeSerialize($sharedData); if ($serializedSharedData === false) { throw new \RuntimeException("Gemeinsame Daten können nicht serialisiert werden"); } // Teile die Aufgaben in Blöcke auf $chunks = array_chunk($tasks, (int)ceil(count($tasks) / $this->maxProcesses)); $tempFiles = []; $childPids = []; // Starte für jeden Chunk einen separaten Prozess foreach ($chunks as $index => $chunk) { // Erstelle temporäre Datei für die Ergebnisse $tempFile = $this->tempDir . '/async_results_' . uniqid() . '.tmp'; $tempFiles[$index] = $tempFile; // Da wir Closures nicht serialisieren können, erstellen wir Stellvertreter-Aufgaben $taskDescriptors = []; foreach ($chunk as $taskIndex => $task) { // Wir speichern nur den originalen Index für die spätere Zuordnung $taskDescriptors[$taskIndex] = true; } // Starte einen Child-Prozess $pid = pcntl_fork(); if ($pid == -1) { throw new \RuntimeException('Konnte keinen neuen Prozess starten'); } elseif ($pid) { // Elternprozess: speichere PID $childPids[$pid] = ['index' => $index, 'chunk' => $chunk]; } else { // Kindprozess: Verarbeite die Aufgaben try { $chunkResults = []; // Deserialisiere gemeinsame Daten $decodedSharedData = unserialize($serializedSharedData); // Führe die Aufgaben in diesem Chunk aus foreach ($chunk as $taskIndex => $task) { try { // Führe die Task direkt aus, ohne zu serialisieren $chunkResults[$taskIndex] = $task(...$decodedSharedData); } catch (\Throwable $e) { // Fehler bei einer bestimmten Aufgabe $chunkResults[$taskIndex] = ['__error__' => $e->getMessage()]; } } // Versuche, die Ergebnisse zu serialisieren $serializedResults = $this->safeSerialize($chunkResults); if ($serializedResults === false) { throw new \RuntimeException("Ergebnisse können nicht serialisiert werden"); } // Speichere die Ergebnisse file_put_contents($tempFile, $serializedResults); } catch (\Throwable $e) { // Schwerwiegender Fehler im Kindprozess file_put_contents($tempFile, serialize(['__process_error__' => $e->getMessage()])); } // Beende den Kindprozess exit(0); } } // Sammle die Ergebnisse aus allen Child-Prozessen $allResults = []; // Warte auf jeden Kindprozess foreach ($childPids as $pid => $info) { $index = $info['index']; $chunk = $info['chunk']; // Warte auf den Prozess pcntl_waitpid($pid, $status); $tempFile = $tempFiles[$index]; if (file_exists($tempFile)) { $fileContent = file_get_contents($tempFile); try { $chunkResults = unserialize($fileContent); if (is_array($chunkResults)) { if (isset($chunkResults['__process_error__'])) { error_log("Prozessfehler in Chunk $index: " . $chunkResults['__process_error__']); } else { // Füge die Ergebnisse zum Gesamtergebnis hinzu foreach ($chunkResults as $taskIndex => $result) { if (is_array($result) && isset($result['__error__'])) { error_log("Fehler bei Task #$taskIndex: " . $result['__error__']); $allResults[$taskIndex] = null; } else { $allResults[$taskIndex] = $result; } } } } } catch (\Throwable $e) { error_log("Fehler beim Deserialisieren der Ergebnisse aus Chunk $index: " . $e->getMessage()); } // Lösche die temporäre Datei unlink($tempFile); } } return $allResults; } /** * Sichere Serialisierung mit Fehlerbehandlung */ private function safeSerialize($data) { try { return serialize($data); } catch (\Throwable $e) { return false; } } /** * {@inheritdoc} */ public static function isAvailable(): bool { return function_exists('pcntl_fork') && function_exists('pcntl_waitpid'); } }