# Queue System Comprehensive documentation of the asynchronous job processing system in the Custom PHP Framework. ## Overview The Queue System provides robust, scalable background job processing with support for multiple queue drivers, priority-based execution, retry mechanisms, dead letter queues, and comprehensive monitoring. **Core Features**: - **Multi-Driver Support**: File-based, Redis, and Database-backed queues - **Priority Queues**: Critical, high, normal, low priority levels - **Delayed Jobs**: Schedule jobs for future execution - **Retry Strategies**: Exponential backoff with configurable attempts - **Dead Letter Queue**: Automatic failed job management - **Job Persistence**: Full job lifecycle tracking and history - **Progress Tracking**: Real-time job progress monitoring - **Distributed Locking**: Multi-worker coordination - **Metrics & Monitoring**: Performance statistics and health checks ## Queue Interface The framework provides a unified `Queue` interface that all queue drivers implement: ```php interface Queue { /** * Push a job to the queue */ public function push(JobPayload $payload): void; /** * Pop the next available job from the queue */ public function pop(): ?JobPayload; /** * Peek at the next job without removing it */ public function peek(): ?JobPayload; /** * Get the number of jobs in the queue */ public function size(): int; /** * Clear all jobs from the queue */ public function clear(): int; /** * Get queue statistics */ public function getStats(): array; } ``` ### JobPayload Value Object All jobs are wrapped in a `JobPayload` value object that provides type safety and configuration: ```php final readonly class JobPayload { public function __construct( public object $job, // The job instance public QueuePriority $priority, // Priority level public Duration $delay, // Execution delay public ?Duration $timeout = null, // Max execution time public ?RetryStrategy $retryStrategy = null, // Retry configuration public ?JobMetadata $metadata = null // Additional metadata ) {} } ``` **Factory Methods**: ```php // Immediate execution with high priority $payload = JobPayload::immediate($job); // Delayed execution $payload = JobPayload::delayed($job, Duration::fromMinutes(5)); // Critical priority with timeout $payload = JobPayload::critical($job); // Background job with retries $payload = JobPayload::background($job); // Custom configuration $payload = JobPayload::create( job: $job, priority: QueuePriority::high(), delay: Duration::fromSeconds(30), timeout: Duration::fromMinutes(5), retryStrategy: new ExponentialBackoffStrategy(maxAttempts: 3) ); ``` ## Queue Drivers ### File-Based Queue (Default) Simple file-based queue implementation for development and small-scale applications: **Features**: - No external dependencies - Persistent across restarts - Suitable for development and small production loads - Priority-based sorting **Usage**: ```php use App\Framework\Queue\FileQueue; $queue = new FileQueue( directory: '/var/www/storage/queues', queueName: 'default' ); $queue->push(JobPayload::immediate($job)); ``` ### Redis Queue (Production) High-performance Redis-backed queue with advanced features: **Features**: - High throughput (10,000+ jobs/second) - Redis sorted sets for priority queues - Delayed job support via timestamp scoring - Atomic operations for thread safety - Real-time statistics **Architecture**: - Priority Queue: Redis sorted set with score = priority + timestamp - Delayed Queue: Redis sorted set with score = execution timestamp - Statistics: Redis hash with counters **Usage**: ```php use App\Framework\Queue\RedisQueue; $queue = new RedisQueue( connection: $redisConnection, queueName: 'production', serializer: new PhpSerializer() ); $queue->push(JobPayload::immediate($job)); // Statistics $stats = $queue->getStats(); // [ // 'total_size' => 1523, // 'priority_queue_size' => 1200, // 'delayed_queue_size' => 323, // 'priority_breakdown' => ['high' => 150, 'normal' => 1050, 'low' => 323] // ] ``` **Score Calculation**: ```php // Lower score = higher priority in Redis $priorityScore = 1000 - $priority->value; // Invert priority $timeScore = $currentTime / 1000000; // Microsecond precision for FIFO $finalScore = $priorityScore + $timeScore; ``` **Delayed Job Processing**: ```php // Automatic processing of ready delayed jobs private function processDelayedJobs(): void { $currentTime = time(); // Get all delayed jobs where score <= current time $readyJobs = $this->redis->zRangeByScore($this->delayedKey, 0, $currentTime); foreach ($readyJobs as $serializedPayload) { // Remove from delayed queue $this->redis->zRem($this->delayedKey, $serializedPayload); // Add to priority queue $this->redis->zAdd($this->priorityKey, $score, $serializedPayload); } } ``` ### Persistent Queue Decorator Wraps any queue implementation with job persistence and tracking: ```php final readonly class PersistentQueue implements Queue { public function __construct( private Queue $baseQueue, private JobPersistenceLayer $persistence, private QueueType $queueType ) {} public function push(JobPayload $payload): void { // Generate job ID $jobId = JobId::generate(); // Store in persistence layer $this->persistence->storeJob( jobId: $jobId, queueType: $this->queueType, jobData: $payload->job, maxAttempts: $payload->retryStrategy->maxAttempts ); // Push to base queue with job ID $this->baseQueue->push($payload->withJobId($jobId)); } public function markJobCompleted(JobId $jobId, array $result = []): void { $this->persistence->markAsCompleted($jobId, $result); } public function markJobFailed(JobId $jobId, string $errorMessage): void { $jobState = $this->persistence->markAsFailed($jobId, $errorMessage); // Automatic retry if applicable if ($jobState->canRetry()) { $this->baseQueue->push($retryPayload); $this->persistence->markForRetry($jobId, $errorMessage); } } } ``` ## Background Jobs ### Creating a Job Jobs are simple PHP classes with a `handle()` method: ```php final readonly class SendWelcomeEmailJob { public function __construct( private UserId $userId, private Email $email, private UserName $userName ) {} public function handle(): array { // Business logic $this->emailService->send( to: $this->email, template: 'welcome', data: ['name' => $this->userName->value] ); // Return result for logging return [ 'user_id' => $this->userId->toString(), 'email' => $this->email->getMasked(), 'sent_at' => time() ]; } public function getType(): string { return 'email.welcome'; } } ``` ### Dispatching Jobs ```php use App\Framework\Queue\Queue; use App\Framework\Queue\ValueObjects\JobPayload; // Immediate execution $job = new SendWelcomeEmailJob($userId, $email, $userName); $queue->push(JobPayload::immediate($job)); // Delayed execution (5 minutes) $queue->push(JobPayload::delayed($job, Duration::fromMinutes(5))); // Background job with retries $queue->push(JobPayload::background($job)); // Custom configuration $queue->push(JobPayload::create( job: $job, priority: QueuePriority::high(), timeout: Duration::fromMinutes(5), retryStrategy: new ExponentialBackoffStrategy(maxAttempts: 3) )); ``` ### Processing Jobs ```php // Worker loop while (true) { $payload = $queue->pop(); if ($payload === null) { sleep(1); // Wait for new jobs continue; } try { // Execute job $result = $payload->job->handle(); // Mark as completed if ($queue instanceof PersistentQueue) { $queue->markJobCompleted($payload->getJobId(), $result); } } catch (\Throwable $e) { // Mark as failed if ($queue instanceof PersistentQueue) { $queue->markJobFailed( $payload->getJobId(), $e->getMessage(), $e ); } } } ``` ## Priority System ### QueuePriority Value Object ```php enum QueuePriority: int { case CRITICAL = 1000; // System-critical operations case HIGH = 750; // User-facing operations case NORMAL = 500; // Standard background tasks case LOW = 250; // Non-urgent maintenance public static function critical(): self { return self::CRITICAL; } public static function high(): self { return self::HIGH; } public static function normal(): self { return self::NORMAL; } public static function low(): self { return self::LOW; } public function toString(): string { return match ($this) { self::CRITICAL => 'critical', self::HIGH => 'high', self::NORMAL => 'normal', self::LOW => 'low', }; } } ``` ### Priority Usage Examples ```php // Critical: Payment processing, real-time notifications $queue->push(JobPayload::create( job: new ProcessPaymentJob($payment), priority: QueuePriority::critical(), timeout: Duration::fromSeconds(30) )); // High: User-triggered emails, report generation $queue->push(JobPayload::create( job: new GenerateReportJob($userId), priority: QueuePriority::high() )); // Normal: Scheduled maintenance, cache warming $queue->push(JobPayload::create( job: new WarmCacheJob(), priority: QueuePriority::normal() )); // Low: Cleanup tasks, analytics aggregation $queue->push(JobPayload::create( job: new CleanupOldLogsJob(), priority: QueuePriority::low() )); ``` ## Retry Mechanisms ### RetryStrategy Interface ```php interface RetryStrategy { public function shouldRetry(int $attempts): bool; public function getDelay(int $attempts): Duration; public function getMaxAttempts(): int; } ``` ### Exponential Backoff Strategy ```php final readonly class ExponentialBackoffStrategy implements RetryStrategy { public function __construct( private int $maxAttempts = 3, private int $baseDelaySeconds = 60, private int $maxDelaySeconds = 3600 ) {} public function shouldRetry(int $attempts): bool { return $attempts < $this->maxAttempts; } public function getDelay(int $attempts): Duration { // 2^attempts * baseDelay, capped at maxDelay $delaySeconds = min( pow(2, $attempts) * $this->baseDelaySeconds, $this->maxDelaySeconds ); return Duration::fromSeconds($delaySeconds); } public function getMaxAttempts(): int { return $this->maxAttempts; } } ``` ### Custom Retry Strategy ```php final readonly class CustomRetryStrategy implements RetryStrategy { public function __construct( private array $retryDelays = [60, 300, 1800] // 1min, 5min, 30min ) {} public function shouldRetry(int $attempts): bool { return $attempts < count($this->retryDelays); } public function getDelay(int $attempts): Duration { $index = min($attempts, count($this->retryDelays) - 1); return Duration::fromSeconds($this->retryDelays[$index]); } public function getMaxAttempts(): int { return count($this->retryDelays); } } ``` ### Usage with Jobs ```php // Standard exponential backoff $payload = JobPayload::create( job: $job, retryStrategy: new ExponentialBackoffStrategy( maxAttempts: 5, baseDelaySeconds: 60, // Start with 1 minute maxDelaySeconds: 3600 // Cap at 1 hour ) ); // Custom retry schedule $payload = JobPayload::create( job: $job, retryStrategy: new CustomRetryStrategy([60, 300, 900, 3600]) ); // No retries $payload = JobPayload::create( job: $job, retryStrategy: null // Job fails permanently on first error ); ``` ## Failed Jobs & Dead Letter Queue ### JobStatus Lifecycle ```php enum JobStatus: string { case PENDING = 'pending'; // Waiting in queue case PROCESSING = 'processing'; // Currently executing case COMPLETED = 'completed'; // Successfully completed case FAILED = 'failed'; // Failed with error case RETRYING = 'retrying'; // Failed, will retry case CANCELLED = 'cancelled'; // Manually cancelled case EXPIRED = 'expired'; // Timeout exceeded public function isFinal(): bool { return match ($this) { self::COMPLETED, self::CANCELLED, self::EXPIRED => true, default => false }; } public function isActive(): bool { return match ($this) { self::PENDING, self::PROCESSING, self::RETRYING => true, default => false }; } public function isFailure(): bool { return match ($this) { self::FAILED, self::EXPIRED => true, default => false }; } } ``` ### Dead Letter Queue Manager When jobs exceed max retry attempts, they are automatically moved to a dead letter queue: ```php final readonly class DeadLetterManager { public function __construct( private DeadLetterQueueInterface $deadLetterQueue, private ProductionJobPersistenceLayer $persistenceLayer ) {} /** * Move failed job to dead letter queue */ public function moveJobToDeadLetterQueue( JobIndexEntry $failedJob, FailureReason $failureReason ): void { $dlqName = DeadLetterQueueName::forQueue( QueueName::fromString($failedJob->queueType) ); $deadLetterJob = DeadLetterJob::fromFailedJob( failedJob: $failedJob, deadLetterQueueName: $dlqName, failureReason: $failureReason ); $this->deadLetterQueue->addFailedJob($deadLetterJob); $this->persistenceLayer->deleteJob($failedJob->jobId); } /** * Handle job failure with automatic DLQ routing */ public function handleJobFailure( string $jobId, \Throwable $exception, int $maxAttempts = 3 ): bool { $jobEntry = $this->persistenceLayer->getJobById($jobId); if ($jobEntry->attempts >= $maxAttempts) { $this->moveJobToDeadLetterQueue( failedJob: $jobEntry, failureReason: FailureReason::fromException($exception) ); return true; } return false; } } ``` ### Managing Failed Jobs ```php use App\Framework\Queue\Services\DeadLetterManager; // Get failed jobs $failedJobs = $deadLetterManager->getFailedJobs( originalQueue: QueueName::fromString('emails'), limit: 100 ); // Retry a specific job $deadLetterManager->retryJob($deadLetterJobId); // Retry all jobs in a dead letter queue $retriedCount = $deadLetterManager->retryAllJobs($deadLetterQueueName); // Clear dead letter queue $deletedCount = $deadLetterManager->clearDeadLetterQueue($deadLetterQueueName); // Get statistics $stats = $deadLetterManager->getStatistics(); // [ // 'default.dlq' => ['count' => 23, 'oldest' => '2024-01-15 10:30:00'], // 'emails.dlq' => ['count' => 5, 'oldest' => '2024-01-16 14:20:00'] // ] ``` ## Queue Monitoring ### Queue Statistics ```php // Get comprehensive statistics $stats = $queue->getStats(); // Example output: [ 'total_size' => 1523, 'priority_queue_size' => 1200, 'delayed_queue_size' => 323, 'priority_breakdown' => [ 'critical' => 15, 'high' => 150, 'normal' => 1050, 'low' => 308 ], 'stats' => [ 'total_pushed' => 125430, 'total_popped' => 123907, 'high_pushed' => 15234, 'normal_pushed' => 95678, 'low_pushed' => 14518, 'last_activity' => 1705410234 ], 'persistence' => [ 'total_jobs' => 125430, 'completed' => 120145, 'failed' => 3762, 'retrying' => 1523 ] ] ``` ### Job Metrics ```php use App\Framework\Queue\Services\JobMetricsManager; // Get job performance metrics $metrics = $metricsManager->getJobMetrics($jobId); // [ // 'execution_time_ms' => 1234, // 'memory_used_mb' => 45.2, // 'peak_memory_mb' => 52.8, // 'attempts' => 1, // 'last_attempt_at' => '2024-01-16 15:30:00' // ] // Get aggregated metrics by job type $aggregated = $metricsManager->getAggregatedMetrics('email.welcome'); // [ // 'total_executions' => 1523, // 'avg_execution_time_ms' => 856, // 'p95_execution_time_ms' => 1450, // 'p99_execution_time_ms' => 2340, // 'success_rate' => 0.987, // 'avg_memory_mb' => 38.4 // ] ``` ### Progress Tracking For long-running jobs, track progress: ```php final readonly class LargeDataImportJob { public function __construct( private ProgressManager $progressManager, private JobId $jobId, private array $dataFiles ) {} public function handle(): array { $totalSteps = count($this->dataFiles); foreach ($this->dataFiles as $index => $file) { // Process file $this->processFile($file); // Update progress $this->progressManager->updateProgress( jobId: $this->jobId, currentStep: $index + 1, totalSteps: $totalSteps, message: "Processing {$file}" ); } return ['files_processed' => $totalSteps]; } } // Query progress $progress = $progressManager->getProgress($jobId); // [ // 'current_step' => 45, // 'total_steps' => 100, // 'percentage' => 45.0, // 'message' => 'Processing data_2024_01_16.csv', // 'updated_at' => '2024-01-16 15:32:45' // ] ``` ### Health Monitoring ```php use App\Framework\Queue\Services\WorkerHealthCheckService; // Check worker health $health = $healthCheckService->checkAllWorkers(); // [ // 'healthy_workers' => 8, // 'unhealthy_workers' => 1, // 'total_workers' => 9, // 'workers' => [ // ['id' => 'worker-1', 'status' => 'healthy', 'last_heartbeat' => '2024-01-16 15:33:00'], // ['id' => 'worker-2', 'status' => 'unhealthy', 'last_heartbeat' => '2024-01-16 15:20:15'] // ] // ] // Register worker heartbeat $healthCheckService->registerHeartbeat($workerId); // Cleanup stale workers $cleanedCount = $healthCheckService->cleanupStaleWorkers( staleThreshold: Duration::fromMinutes(5) ); ``` ## Best Practices ### 1. Job Design **Keep Jobs Small and Focused**: ```php // ✅ Good: Single responsibility final readonly class SendEmailJob { public function handle(): array { $this->emailService->send($this->email, $this->template); return ['sent' => true]; } } // ❌ Bad: Multiple responsibilities final readonly class SendEmailAndUpdateUserJob { public function handle(): array { $this->emailService->send($this->email); $this->userRepository->update($this->userId); // Separate concern $this->analyticsService->track($this->event); // Separate concern } } ``` **Make Jobs Idempotent**: ```php final readonly class ProcessPaymentJob { public function handle(): array { // Check if already processed (idempotency) if ($this->paymentRepository->isProcessed($this->paymentId)) { return ['status' => 'already_processed']; } // Process payment $result = $this->paymentGateway->charge($this->payment); // Mark as processed $this->paymentRepository->markAsProcessed($this->paymentId); return $result; } } ``` **Use Value Objects for Job Data**: ```php // ✅ Good: Type-safe value objects final readonly class SendInvoiceJob { public function __construct( private InvoiceId $invoiceId, private Email $recipientEmail, private Money $amount ) {} } // ❌ Bad: Primitive arrays final readonly class SendInvoiceJob { public function __construct( private array $data // What structure? What validation? ) {} } ``` ### 2. Priority Assignment **Critical**: System-critical operations that must execute immediately - Payment processing - Real-time notifications - Security alerts **High**: User-facing operations with user waiting - Password reset emails - Welcome emails - Report generation (user-triggered) **Normal**: Standard background tasks - Scheduled maintenance - Cache warming - Analytics processing **Low**: Non-urgent maintenance - Log cleanup - Old data archival - Non-critical aggregations ### 3. Retry Strategy **Use Exponential Backoff** for transient failures: ```php // External API calls, network operations JobPayload::create( job: $apiCallJob, retryStrategy: new ExponentialBackoffStrategy( maxAttempts: 5, baseDelaySeconds: 60, maxDelaySeconds: 3600 ) ); ``` **No Retries** for permanent failures: ```php // Invalid input, business logic violations JobPayload::create( job: $validationJob, retryStrategy: null // Fail immediately ); ``` **Custom Schedule** for specific requirements: ```php // Time-sensitive operations JobPayload::create( job: $timeSensitiveJob, retryStrategy: new CustomRetryStrategy([30, 60, 120]) // Quick retries ); ``` ### 4. Monitoring **Track Key Metrics**: - Queue depth by priority - Average job execution time - Success/failure rates - Dead letter queue size - Worker health status **Set Up Alerts**: ```php // Alert on high queue depth if ($stats['total_size'] > 10000) { $alerting->send('Queue depth exceeded threshold'); } // Alert on high failure rate $failureRate = $stats['persistence']['failed'] / $stats['persistence']['total_jobs']; if ($failureRate > 0.05) { // 5% failure rate $alerting->send('High job failure rate detected'); } // Alert on dead letter queue growth if ($dlqStats['count'] > 100) { $alerting->send('Dead letter queue requires attention'); } ``` ### 5. Resource Management **Implement Timeouts**: ```php JobPayload::create( job: $job, timeout: Duration::fromMinutes(5) // Kill job after 5 minutes ); ``` **Manage Memory**: ```php final readonly class LargeDataProcessingJob { public function handle(): array { // Process in batches to avoid memory exhaustion foreach ($this->getBatches() as $batch) { $this->processBatch($batch); // Force garbage collection gc_collect_cycles(); } } } ``` **Connection Pooling**: ```php // Reuse connections across jobs final readonly class DatabaseJob { public function handle(): array { // Use connection pool instead of new connections $connection = $this->connectionPool->acquire(); try { // Execute queries $result = $connection->query($sql); } finally { // Always release back to pool $this->connectionPool->release($connection); } } } ``` ### 6. Error Handling **Graceful Degradation**: ```php final readonly class EmailJob { public function handle(): array { try { $this->emailService->send($this->email); return ['status' => 'sent']; } catch (EmailProviderException $e) { // Log error but don't fail job $this->logger->warning('Email provider unavailable', [ 'error' => $e->getMessage() ]); // Use fallback provider $this->fallbackEmailService->send($this->email); return ['status' => 'sent_via_fallback']; } } } ``` **Detailed Error Logging**: ```php try { $job->handle(); } catch (\Throwable $e) { $this->logger->error('Job failed', [ 'job_id' => $jobId, 'job_class' => get_class($job), 'error' => $e->getMessage(), 'trace' => $e->getTraceAsString(), 'attempts' => $attempts, 'metadata' => $payload->metadata->toArray() ]); throw $e; } ``` ### 7. Testing **Unit Test Jobs**: ```php it('sends welcome email', function () { $job = new SendWelcomeEmailJob( userId: UserId::generate(), email: new Email('test@example.com'), userName: new UserName('Test User') ); $result = $job->handle(); expect($result['sent_at'])->toBeGreaterThan(0); }); ``` **Integration Test Queue Operations**: ```php it('processes jobs in priority order', function () { $queue = new RedisQueue($connection); // Push jobs with different priorities $queue->push(JobPayload::create($lowJob, QueuePriority::low())); $queue->push(JobPayload::create($highJob, QueuePriority::high())); $queue->push(JobPayload::create($normalJob, QueuePriority::normal())); // Pop and verify order $first = $queue->pop(); expect($first->job)->toBe($highJob); $second = $queue->pop(); expect($second->job)->toBe($normalJob); $third = $queue->pop(); expect($third->job)->toBe($lowJob); }); ``` ## Framework Integration ### With Event System ```php // Dispatch jobs in response to events #[OnEvent(priority: 100)] public function onUserRegistered(UserRegisteredEvent $event): void { $job = new SendWelcomeEmailJob( userId: $event->userId, email: $event->email, userName: $event->userName ); $this->queue->push(JobPayload::immediate($job)); } ``` ### With Scheduler See [Scheduler-Queue Pipeline](scheduler-queue-pipeline.md) for complete integration documentation. ```php // Schedule recurring job dispatch $scheduler->schedule( 'send-daily-digest', CronSchedule::fromExpression('0 8 * * *'), // Every day at 8 AM fn() => $this->queue->push(JobPayload::immediate(new SendDailyDigestJob())) ); ``` ### With Command Bus ```php // Dispatch jobs via command handlers final readonly class CreateUserHandler { public function handle(CreateUserCommand $command): User { $user = $this->userRepository->create($command); // Queue welcome email $this->queue->push(JobPayload::immediate( new SendWelcomeEmailJob($user->id, $user->email, $user->name) )); return $user; } } ``` ## Console Commands The framework provides console commands for queue management: ```bash # Monitor queue status php console.php queue:status # Process jobs (worker) php console.php queue:work --queue=default --timeout=60 # Retry failed jobs php console.php queue:retry --queue=default --limit=50 # Clear queue php console.php queue:clear --queue=default # View failed jobs php console.php queue:failed --limit=20 # Clean up old jobs php console.php queue:cleanup --older-than=7d ``` ## Performance Characteristics **Redis Queue**: - Throughput: 10,000+ jobs/second - Latency: <5ms per operation - Memory: ~200 bytes per job (serialized) - Scalability: Horizontal (add Redis instances) **File Queue**: - Throughput: 1,000+ jobs/second - Latency: <20ms per operation - Memory: Minimal (disk-based) - Scalability: Vertical (single server) **Job Execution**: - Overhead: <10ms per job (framework overhead) - Timeout: Configurable per job - Memory: Depends on job complexity - Concurrency: Multiple workers supported