chore: complete update
This commit is contained in:
129
src/Framework/Queue/FileQueue.php
Normal file
129
src/Framework/Queue/FileQueue.php
Normal file
@@ -0,0 +1,129 @@
|
||||
<?php
|
||||
|
||||
namespace App\Framework\Queue;
|
||||
|
||||
use App\Framework\Cache\Serializer;
|
||||
use App\Framework\Cache\Serializer\PhpSerializer;
|
||||
|
||||
final readonly class FileQueue implements Queue
|
||||
{
|
||||
private string $queueDir;
|
||||
|
||||
public function __construct(
|
||||
string $queueDir,
|
||||
private Serializer $serializer = new PhpSerializer())
|
||||
{
|
||||
$this->queueDir = $queueDir;
|
||||
if (!is_dir($queueDir)) {
|
||||
mkdir($queueDir, 0777, true);
|
||||
}
|
||||
}
|
||||
|
||||
public function push(object $job): void
|
||||
{
|
||||
// Job-Hash für Deduplication
|
||||
$jobHash = md5($this->serializer->serialize($job));
|
||||
$hashFile = $this->queueDir . '/hash_' . $jobHash . '.job';
|
||||
|
||||
// DETAILED DEBUG: Wer ruft push() auf?
|
||||
$trace = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 10);
|
||||
$caller = '';
|
||||
foreach ($trace as $frame) {
|
||||
if (isset($frame['file']) && isset($frame['line'])) {
|
||||
$caller .= basename($frame['file']) . ':' . $frame['line'] . ' -> ';
|
||||
}
|
||||
}
|
||||
|
||||
error_log("FileQueue Debug: PUSH called from: " . $caller);
|
||||
error_log("FileQueue Debug: Job class: " . get_class($job));
|
||||
error_log("FileQueue Debug: Job data: " . json_encode($job));
|
||||
error_log("FileQueue Debug: Job hash: " . $jobHash);
|
||||
|
||||
// Prüfe ob identischer Job bereits existiert
|
||||
if (file_exists($hashFile)) {
|
||||
error_log("FileQueue Debug: ⚠️ DUPLICATE JOB REJECTED - hash already exists: " . $jobHash);
|
||||
error_log("FileQueue Debug: Called from: " . $caller);
|
||||
return; // Job wird nicht hinzugefügt
|
||||
}
|
||||
|
||||
// Verwende Hash als Dateiname für automatische Deduplication
|
||||
file_put_contents($hashFile, $this->serializer->serialize($job));
|
||||
error_log("FileQueue Debug: ✅ Job added with hash: " . $jobHash);
|
||||
}
|
||||
|
||||
public function pop(): ?object
|
||||
{
|
||||
$files = glob($this->queueDir . '/*.job');
|
||||
|
||||
// Debug: Queue-Status loggen
|
||||
if (empty($files)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
error_log("FileQueue Debug: Found " . count($files) . " job files in " . $this->queueDir);
|
||||
error_log("FileQueue Debug: Files: " . implode(', ', array_map('basename', $files)));
|
||||
|
||||
// Sortiere nach Erstellungsdatum (FIFO)
|
||||
usort($files, function($a, $b) {
|
||||
return filemtime($a) <=> filemtime($b);
|
||||
});
|
||||
|
||||
$file = $files[0];
|
||||
$lockFile = $file . '.lock';
|
||||
|
||||
// Atomic Lock-Mechanismus um race conditions zu verhindern
|
||||
if (file_exists($lockFile)) {
|
||||
error_log("FileQueue Debug: Job file is locked: $file");
|
||||
return null; // Job wird bereits verarbeitet
|
||||
}
|
||||
|
||||
// Lock erstellen
|
||||
if (!@touch($lockFile)) {
|
||||
error_log("FileQueue Debug: Could not create lock file: $lockFile");
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
// Prüfe ob Datei noch existiert (race condition)
|
||||
if (!file_exists($file) || !is_readable($file)) {
|
||||
error_log("FileQueue Debug: Job file not accessible: $file");
|
||||
@unlink($lockFile);
|
||||
return $this->pop();
|
||||
}
|
||||
|
||||
$content = file_get_contents($file);
|
||||
if ($content === false) {
|
||||
error_log("FileQueue Debug: Could not read job file: $file");
|
||||
@unlink($file);
|
||||
@unlink($lockFile);
|
||||
return $this->pop();
|
||||
}
|
||||
|
||||
$job = $this->serializer->unserialize($content);
|
||||
|
||||
// Atomic delete: Erst rename, dann delete
|
||||
$tempFile = $file . '.deleting.' . time() . '.' . getmypid();
|
||||
if (rename($file, $tempFile)) {
|
||||
unlink($tempFile);
|
||||
error_log("FileQueue Debug: Successfully deleted job file: " . basename($file));
|
||||
} else {
|
||||
error_log("FileQueue Debug: Failed to rename job file for deletion: $file");
|
||||
// Fallback: direktes löschen versuchen
|
||||
if (!@unlink($file)) {
|
||||
error_log("FileQueue Debug: Failed to delete job file: $file - moving to processed");
|
||||
$processedFile = $file . '.processed.' . time();
|
||||
@rename($file, $processedFile);
|
||||
}
|
||||
}
|
||||
|
||||
@unlink($lockFile);
|
||||
return $job;
|
||||
|
||||
} catch (\Throwable $e) {
|
||||
error_log("FileQueue Debug: Failed to process job from file $file: " . $e->getMessage());
|
||||
@unlink($file);
|
||||
@unlink($lockFile);
|
||||
return $this->pop();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user