Files
michaelschiemer/src/Framework/Queue/FileQueue.php

405 lines
13 KiB
PHP

<?php
declare(strict_types=1);
namespace App\Framework\Queue;
use App\Framework\DateTime\SystemClock;
use App\Framework\Filesystem\Directory;
use App\Framework\Filesystem\FileStorage;
use App\Framework\Logging\DefaultLogger;
use App\Framework\Logging\Logger;
use App\Framework\Logging\LogLevel;
use App\Framework\Logging\ProcessorManager;
use App\Framework\Logging\ValueObjects\LogContext;
use App\Framework\Queue\ValueObjects\JobPayload;
use App\Framework\Serializer\Php\PhpSerializer;
use App\Framework\Serializer\Php\PhpSerializerConfig;
use App\Framework\Serializer\Serializer;
/**
* File-based Queue Implementation with Priority Support
*
* Uses filesystem directory structure for priority-based organization:
* - queue/priority/ - Priority-based jobs sorted by score
* - queue/delayed/ - Delayed jobs scheduled for future processing
*/
final readonly class FileQueue implements Queue
{
private Directory $queueDirectory;
private Directory $priorityDirectory;
private Directory $delayedDirectory;
private FileStorage $storage;
private Logger $logger;
private Serializer $serializer;
public function __construct(
private string $queuePath,
?Serializer $serializer = null,
?FileStorage $storage = null,
?Logger $logger = null
) {
$this->serializer = $serializer ?? new PhpSerializer(PhpSerializerConfig::unsafe());
$this->storage = $storage ?? new FileStorage();
// Initialize directories using simple approach
$priorityPath = $queuePath . '/priority';
$delayedPath = $queuePath . '/delayed';
// Create directories directly using storage
$this->storage->createDirectory($queuePath);
$this->storage->createDirectory($priorityPath);
$this->storage->createDirectory($delayedPath);
// Create directory objects for easier path handling
$this->queueDirectory = new Directory($queuePath, $this->storage);
$this->priorityDirectory = new Directory($priorityPath, $this->storage);
$this->delayedDirectory = new Directory($delayedPath, $this->storage);
$this->logger = $logger ?? new DefaultLogger(
clock: new SystemClock(),
minLevel: LogLevel::WARNING,
handlers: [],
processorManager: new ProcessorManager()
);
}
public function push(JobPayload $payload): void
{
$currentTime = time();
// Handle delayed jobs
if ($payload->isDelayed()) {
$this->pushDelayedJob($payload);
} else {
$this->pushPriorityJob($payload, $currentTime);
}
}
public function pop(): ?JobPayload
{
// First, process any delayed jobs that are now ready
$this->processDelayedJobs();
// Pop highest priority job
return $this->popPriorityJob();
}
public function peek(): ?JobPayload
{
// Process delayed jobs first
$this->processDelayedJobs();
// Peek at highest priority job
$files = $this->getPriorityJobFiles();
if (empty($files)) {
return null;
}
$firstFile = reset($files);
$filePath = $this->priorityDirectory->getPathString() . '/' . $firstFile->filename;
$content = $this->storage->get($filePath);
return $this->serializer->deserialize($content);
}
public function size(): int
{
$priorityCount = count($this->getPriorityJobFiles());
$delayedCount = count($this->getDelayedJobFiles());
return $priorityCount + $delayedCount;
}
public function clear(): int
{
$totalCount = $this->size();
// Clear priority jobs
foreach ($this->getPriorityJobFiles() as $file) {
try {
$filePath = $this->priorityDirectory->getPathString() . '/' . $file->filename;
$this->storage->delete($filePath);
} catch (\Throwable $e) {
$this->logger->warning('Failed to delete priority job file', LogContext::withData([
'file' => $file->filename,
'error' => $e->getMessage(),
]));
}
}
// Clear delayed jobs
foreach ($this->getDelayedJobFiles() as $file) {
try {
$filePath = $this->delayedDirectory->getPathString() . '/' . $file->filename;
$this->storage->delete($filePath);
} catch (\Throwable $e) {
$this->logger->warning('Failed to delete delayed job file', [
'file' => $file->filename,
'error' => $e->getMessage(),
]);
}
}
return $totalCount;
}
public function getStats(): array
{
$priorityJobs = $this->getPriorityJobFiles();
$delayedJobs = $this->getDelayedJobFiles();
$priorityBreakdown = $this->getPriorityBreakdown($priorityJobs);
return [
'total_size' => count($priorityJobs) + count($delayedJobs),
'priority_queue_size' => count($priorityJobs),
'delayed_queue_size' => count($delayedJobs),
'priority_breakdown' => $priorityBreakdown,
];
}
/**
* Push a delayed job to the delayed queue
*/
private function pushDelayedJob(JobPayload $payload): void
{
$availableTime = $payload->getAvailableTime();
$filename = $this->generateDelayedFilename($availableTime);
$filePath = $this->delayedDirectory->getPathString() . '/' . $filename;
$serializedPayload = $this->serializer->serialize($payload);
$this->storage->put($filePath, $serializedPayload);
}
/**
* Push a priority job to the priority queue
*/
private function pushPriorityJob(JobPayload $payload, int $currentTime): void
{
$score = $this->calculateScore($payload, $currentTime);
$filename = $this->generatePriorityFilename($score);
$filePath = $this->priorityDirectory->getPathString() . '/' . $filename;
$serializedPayload = $this->serializer->serialize($payload);
$this->storage->put($filePath, $serializedPayload);
}
/**
* Process delayed jobs that are now ready
*/
private function processDelayedJobs(): void
{
$currentTime = time();
$delayedFiles = $this->getDelayedJobFiles();
foreach ($delayedFiles as $file) {
$availableTime = $this->extractTimeFromDelayedFilename($file->filename);
if ($availableTime <= $currentTime) {
try {
// Read and deserialize the job
$filePath = $this->delayedDirectory->getPathString() . '/' . $file->filename;
$content = $this->storage->get($filePath);
$payload = $this->serializer->deserialize($content);
if ($payload instanceof JobPayload) {
// Remove delay and add to priority queue
$payloadWithoutDelay = $payload->withDelay(\App\Framework\Core\ValueObjects\Duration::zero());
$this->pushPriorityJob($payloadWithoutDelay, $currentTime);
// Delete from delayed queue
$this->storage->delete($filePath);
}
} catch (\Throwable $e) {
$this->logger->error('Failed to process delayed job', LogContext::withData([
'file' => $file->filename,
'error' => $e->getMessage(),
]));
}
}
}
}
/**
* Pop the highest priority job
*/
private function popPriorityJob(): ?JobPayload
{
$files = $this->getPriorityJobFiles();
if (empty($files)) {
return null;
}
$firstFile = reset($files);
try {
$filePath = $this->priorityDirectory->getPathString() . '/' . $firstFile->filename;
$content = $this->storage->get($filePath);
$payload = $this->serializer->deserialize($content);
// Delete the file after successful reading
$this->storage->delete($filePath);
if ($payload instanceof JobPayload) {
return $payload;
}
} catch (\Throwable $e) {
$this->logger->error('Failed to pop priority job', LogContext::withData([
'file' => $firstFile->filename,
'error' => $e->getMessage(),
]));
// Try to delete corrupted file
try {
$filePath = $this->priorityDirectory->getPathString() . '/' . $firstFile->filename;
$this->storage->delete($filePath);
} catch (\Throwable) {
// Ignore deletion errors
}
}
return null;
}
/**
* Get priority job files sorted by score (lowest first = highest priority)
*/
public function getPriorityJobFiles(): array
{
try {
$filenames = $this->storage->listDirectory($this->priorityDirectory->getPathString());
$files = [];
foreach ($filenames as $filepath) {
if (str_ends_with($filepath, '.json')) {
// Extract just the filename from the path
$filename = basename($filepath);
// Create a simple file object for compatibility
$files[] = (object) ['filename' => $filename];
}
}
// Sort by filename (which contains the score)
usort($files, function ($a, $b) {
$scoreA = $this->extractScoreFromPriorityFilename($a->filename);
$scoreB = $this->extractScoreFromPriorityFilename($b->filename);
return $scoreA <=> $scoreB;
});
return $files;
} catch (\Throwable) {
return [];
}
}
/**
* Get delayed job files
*/
public function getDelayedJobFiles(): array
{
try {
$filenames = $this->storage->listDirectory($this->delayedDirectory->getPathString());
$files = [];
foreach ($filenames as $filepath) {
if (str_ends_with($filepath, '.json')) {
// Extract just the filename from the path
$filename = basename($filepath);
// Create a simple file object for compatibility
$files[] = (object) ['filename' => $filename];
}
}
return $files;
} catch (\Throwable) {
return [];
}
}
/**
* Calculate score for priority queue (lower score = higher priority)
*/
private function calculateScore(JobPayload $payload, int $currentTime): float
{
$priorityScore = 1000 - $payload->priority->value; // Invert for higher priority = lower score
$timeScore = $currentTime / 1000000; // Microsecond precision for FIFO within same priority
return $priorityScore + $timeScore;
}
/**
* Generate filename for priority job
*/
private function generatePriorityFilename(float $score): string
{
$scoreStr = str_pad((string) (int) ($score * 1000000), 15, '0', STR_PAD_LEFT);
$generator = new \App\Framework\Id\Ulid\UlidGenerator();
return "job_{$scoreStr}_" . $generator->generate() . '.json';
}
/**
* Generate filename for delayed job
*/
private function generateDelayedFilename(int $availableTime): string
{
$generator = new \App\Framework\Id\Ulid\UlidGenerator();
return "delayed_{$availableTime}_" . $generator->generate() . '.json';
}
/**
* Extract score from priority filename
*/
private function extractScoreFromPriorityFilename(string $filename): float
{
if (preg_match('/job_(\d+)_/', $filename, $matches)) {
return (float) $matches[1] / 1000000;
}
return 999999.0; // Default high score for malformed filenames
}
/**
* Extract time from delayed filename
*/
private function extractTimeFromDelayedFilename(string $filename): int
{
if (preg_match('/delayed_(\d+)_/', $filename, $matches)) {
return (int) $matches[1];
}
return PHP_INT_MAX; // Default far future for malformed filenames
}
/**
* Get priority breakdown for statistics
*/
private function getPriorityBreakdown(array $priorityFiles): array
{
$breakdown = [];
foreach ($priorityFiles as $file) {
try {
$filePath = $this->priorityDirectory->getPathString() . '/' . $file->filename;
$content = $this->storage->get($filePath);
$payload = $this->serializer->deserialize($content);
if ($payload instanceof JobPayload) {
$priority = $payload->priority->toString();
$breakdown[$priority] = ($breakdown[$priority] ?? 0) + 1;
}
} catch (\Throwable) {
// Skip corrupted files in breakdown
}
}
return $breakdown;
}
}