Files
michaelschiemer/docs/features/queue/system.md
Michael Schiemer 36ef2a1e2c
Some checks failed
🚀 Build & Deploy Image / Determine Build Necessity (push) Failing after 10m14s
🚀 Build & Deploy Image / Build Runtime Base Image (push) Has been skipped
🚀 Build & Deploy Image / Build Docker Image (push) Has been skipped
🚀 Build & Deploy Image / Run Tests & Quality Checks (push) Has been skipped
🚀 Build & Deploy Image / Auto-deploy to Staging (push) Has been skipped
🚀 Build & Deploy Image / Auto-deploy to Production (push) Has been skipped
Security Vulnerability Scan / Check for Dependency Changes (push) Failing after 11m25s
Security Vulnerability Scan / Composer Security Audit (push) Has been cancelled
fix: Gitea Traefik routing and connection pool optimization
- Remove middleware reference from Gitea Traefik labels (caused routing issues)
- Optimize Gitea connection pool settings (MAX_IDLE_CONNS=30, authentication_timeout=180s)
- Add explicit service reference in Traefik labels
- Fix intermittent 504 timeouts by improving PostgreSQL connection handling

Fixes Gitea unreachability via git.michaelschiemer.de
2025-11-09 14:46:15 +01:00

27 KiB

Queue System

Comprehensive documentation of the asynchronous job processing system in the Custom PHP Framework.

Overview

The Queue System provides robust, scalable background job processing with support for multiple queue drivers, priority-based execution, retry mechanisms, dead letter queues, and comprehensive monitoring.

Core Features:

  • Multi-Driver Support: File-based, Redis, and Database-backed queues
  • Priority Queues: Critical, high, normal, low priority levels
  • Delayed Jobs: Schedule jobs for future execution
  • Retry Strategies: Exponential backoff with configurable attempts
  • Dead Letter Queue: Automatic failed job management
  • Job Persistence: Full job lifecycle tracking and history
  • Progress Tracking: Real-time job progress monitoring
  • Distributed Locking: Multi-worker coordination
  • Metrics & Monitoring: Performance statistics and health checks

Queue Interface

The framework provides a unified Queue interface that all queue drivers implement:

interface Queue
{
    /**
     * Push a job to the queue
     */
    public function push(JobPayload $payload): void;

    /**
     * Pop the next available job from the queue
     */
    public function pop(): ?JobPayload;

    /**
     * Peek at the next job without removing it
     */
    public function peek(): ?JobPayload;

    /**
     * Get the number of jobs in the queue
     */
    public function size(): int;

    /**
     * Clear all jobs from the queue
     */
    public function clear(): int;

    /**
     * Get queue statistics
     */
    public function getStats(): array;
}

JobPayload Value Object

All jobs are wrapped in a JobPayload value object that provides type safety and configuration:

final readonly class JobPayload
{
    public function __construct(
        public object $job,                    // The job instance
        public QueuePriority $priority,        // Priority level
        public Duration $delay,                // Execution delay
        public ?Duration $timeout = null,      // Max execution time
        public ?RetryStrategy $retryStrategy = null,  // Retry configuration
        public ?JobMetadata $metadata = null   // Additional metadata
    ) {}
}

Factory Methods:

// Immediate execution with high priority
$payload = JobPayload::immediate($job);

// Delayed execution
$payload = JobPayload::delayed($job, Duration::fromMinutes(5));

// Critical priority with timeout
$payload = JobPayload::critical($job);

// Background job with retries
$payload = JobPayload::background($job);

// Custom configuration
$payload = JobPayload::create(
    job: $job,
    priority: QueuePriority::high(),
    delay: Duration::fromSeconds(30),
    timeout: Duration::fromMinutes(5),
    retryStrategy: new ExponentialBackoffStrategy(maxAttempts: 3)
);

Queue Drivers

File-Based Queue (Default)

Simple file-based queue implementation for development and small-scale applications:

Features:

  • No external dependencies
  • Persistent across restarts
  • Suitable for development and small production loads
  • Priority-based sorting

Usage:

use App\Framework\Queue\FileQueue;

$queue = new FileQueue(
    directory: '/var/www/storage/queues',
    queueName: 'default'
);

$queue->push(JobPayload::immediate($job));

Redis Queue (Production)

High-performance Redis-backed queue with advanced features:

Features:

  • High throughput (10,000+ jobs/second)
  • Redis sorted sets for priority queues
  • Delayed job support via timestamp scoring
  • Atomic operations for thread safety
  • Real-time statistics

Architecture:

  • Priority Queue: Redis sorted set with score = priority + timestamp
  • Delayed Queue: Redis sorted set with score = execution timestamp
  • Statistics: Redis hash with counters

Usage:

use App\Framework\Queue\RedisQueue;

$queue = new RedisQueue(
    connection: $redisConnection,
    queueName: 'production',
    serializer: new PhpSerializer()
);

$queue->push(JobPayload::immediate($job));

// Statistics
$stats = $queue->getStats();
// [
//     'total_size' => 1523,
//     'priority_queue_size' => 1200,
//     'delayed_queue_size' => 323,
//     'priority_breakdown' => ['high' => 150, 'normal' => 1050, 'low' => 323]
// ]

Score Calculation:

// Lower score = higher priority in Redis
$priorityScore = 1000 - $priority->value;  // Invert priority
$timeScore = $currentTime / 1000000;       // Microsecond precision for FIFO
$finalScore = $priorityScore + $timeScore;

Delayed Job Processing:

// Automatic processing of ready delayed jobs
private function processDelayedJobs(): void
{
    $currentTime = time();

    // Get all delayed jobs where score <= current time
    $readyJobs = $this->redis->zRangeByScore($this->delayedKey, 0, $currentTime);

    foreach ($readyJobs as $serializedPayload) {
        // Remove from delayed queue
        $this->redis->zRem($this->delayedKey, $serializedPayload);

        // Add to priority queue
        $this->redis->zAdd($this->priorityKey, $score, $serializedPayload);
    }
}

Persistent Queue Decorator

Wraps any queue implementation with job persistence and tracking:

final readonly class PersistentQueue implements Queue
{
    public function __construct(
        private Queue $baseQueue,
        private JobPersistenceLayer $persistence,
        private QueueType $queueType
    ) {}

    public function push(JobPayload $payload): void
    {
        // Generate job ID
        $jobId = JobId::generate();

        // Store in persistence layer
        $this->persistence->storeJob(
            jobId: $jobId,
            queueType: $this->queueType,
            jobData: $payload->job,
            maxAttempts: $payload->retryStrategy->maxAttempts
        );

        // Push to base queue with job ID
        $this->baseQueue->push($payload->withJobId($jobId));
    }

    public function markJobCompleted(JobId $jobId, array $result = []): void
    {
        $this->persistence->markAsCompleted($jobId, $result);
    }

    public function markJobFailed(JobId $jobId, string $errorMessage): void
    {
        $jobState = $this->persistence->markAsFailed($jobId, $errorMessage);

        // Automatic retry if applicable
        if ($jobState->canRetry()) {
            $this->baseQueue->push($retryPayload);
            $this->persistence->markForRetry($jobId, $errorMessage);
        }
    }
}

Background Jobs

Creating a Job

Jobs are simple PHP classes with a handle() method:

final readonly class SendWelcomeEmailJob
{
    public function __construct(
        private UserId $userId,
        private Email $email,
        private UserName $userName
    ) {}

    public function handle(): array
    {
        // Business logic
        $this->emailService->send(
            to: $this->email,
            template: 'welcome',
            data: ['name' => $this->userName->value]
        );

        // Return result for logging
        return [
            'user_id' => $this->userId->toString(),
            'email' => $this->email->getMasked(),
            'sent_at' => time()
        ];
    }

    public function getType(): string
    {
        return 'email.welcome';
    }
}

Dispatching Jobs

use App\Framework\Queue\Queue;
use App\Framework\Queue\ValueObjects\JobPayload;

// Immediate execution
$job = new SendWelcomeEmailJob($userId, $email, $userName);
$queue->push(JobPayload::immediate($job));

// Delayed execution (5 minutes)
$queue->push(JobPayload::delayed($job, Duration::fromMinutes(5)));

// Background job with retries
$queue->push(JobPayload::background($job));

// Custom configuration
$queue->push(JobPayload::create(
    job: $job,
    priority: QueuePriority::high(),
    timeout: Duration::fromMinutes(5),
    retryStrategy: new ExponentialBackoffStrategy(maxAttempts: 3)
));

Processing Jobs

// Worker loop
while (true) {
    $payload = $queue->pop();

    if ($payload === null) {
        sleep(1); // Wait for new jobs
        continue;
    }

    try {
        // Execute job
        $result = $payload->job->handle();

        // Mark as completed
        if ($queue instanceof PersistentQueue) {
            $queue->markJobCompleted($payload->getJobId(), $result);
        }
    } catch (\Throwable $e) {
        // Mark as failed
        if ($queue instanceof PersistentQueue) {
            $queue->markJobFailed(
                $payload->getJobId(),
                $e->getMessage(),
                $e
            );
        }
    }
}

Priority System

QueuePriority Value Object

enum QueuePriority: int
{
    case CRITICAL = 1000;  // System-critical operations
    case HIGH = 750;       // User-facing operations
    case NORMAL = 500;     // Standard background tasks
    case LOW = 250;        // Non-urgent maintenance

    public static function critical(): self { return self::CRITICAL; }
    public static function high(): self { return self::HIGH; }
    public static function normal(): self { return self::NORMAL; }
    public static function low(): self { return self::LOW; }

    public function toString(): string
    {
        return match ($this) {
            self::CRITICAL => 'critical',
            self::HIGH => 'high',
            self::NORMAL => 'normal',
            self::LOW => 'low',
        };
    }
}

Priority Usage Examples

// Critical: Payment processing, real-time notifications
$queue->push(JobPayload::create(
    job: new ProcessPaymentJob($payment),
    priority: QueuePriority::critical(),
    timeout: Duration::fromSeconds(30)
));

// High: User-triggered emails, report generation
$queue->push(JobPayload::create(
    job: new GenerateReportJob($userId),
    priority: QueuePriority::high()
));

// Normal: Scheduled maintenance, cache warming
$queue->push(JobPayload::create(
    job: new WarmCacheJob(),
    priority: QueuePriority::normal()
));

// Low: Cleanup tasks, analytics aggregation
$queue->push(JobPayload::create(
    job: new CleanupOldLogsJob(),
    priority: QueuePriority::low()
));

Retry Mechanisms

RetryStrategy Interface

interface RetryStrategy
{
    public function shouldRetry(int $attempts): bool;
    public function getDelay(int $attempts): Duration;
    public function getMaxAttempts(): int;
}

Exponential Backoff Strategy

final readonly class ExponentialBackoffStrategy implements RetryStrategy
{
    public function __construct(
        private int $maxAttempts = 3,
        private int $baseDelaySeconds = 60,
        private int $maxDelaySeconds = 3600
    ) {}

    public function shouldRetry(int $attempts): bool
    {
        return $attempts < $this->maxAttempts;
    }

    public function getDelay(int $attempts): Duration
    {
        // 2^attempts * baseDelay, capped at maxDelay
        $delaySeconds = min(
            pow(2, $attempts) * $this->baseDelaySeconds,
            $this->maxDelaySeconds
        );

        return Duration::fromSeconds($delaySeconds);
    }

    public function getMaxAttempts(): int
    {
        return $this->maxAttempts;
    }
}

Custom Retry Strategy

final readonly class CustomRetryStrategy implements RetryStrategy
{
    public function __construct(
        private array $retryDelays = [60, 300, 1800] // 1min, 5min, 30min
    ) {}

    public function shouldRetry(int $attempts): bool
    {
        return $attempts < count($this->retryDelays);
    }

    public function getDelay(int $attempts): Duration
    {
        $index = min($attempts, count($this->retryDelays) - 1);
        return Duration::fromSeconds($this->retryDelays[$index]);
    }

    public function getMaxAttempts(): int
    {
        return count($this->retryDelays);
    }
}

Usage with Jobs

// Standard exponential backoff
$payload = JobPayload::create(
    job: $job,
    retryStrategy: new ExponentialBackoffStrategy(
        maxAttempts: 5,
        baseDelaySeconds: 60,    // Start with 1 minute
        maxDelaySeconds: 3600    // Cap at 1 hour
    )
);

// Custom retry schedule
$payload = JobPayload::create(
    job: $job,
    retryStrategy: new CustomRetryStrategy([60, 300, 900, 3600])
);

// No retries
$payload = JobPayload::create(
    job: $job,
    retryStrategy: null  // Job fails permanently on first error
);

Failed Jobs & Dead Letter Queue

JobStatus Lifecycle

enum JobStatus: string
{
    case PENDING = 'pending';       // Waiting in queue
    case PROCESSING = 'processing'; // Currently executing
    case COMPLETED = 'completed';   // Successfully completed
    case FAILED = 'failed';        // Failed with error
    case RETRYING = 'retrying';    // Failed, will retry
    case CANCELLED = 'cancelled';  // Manually cancelled
    case EXPIRED = 'expired';      // Timeout exceeded

    public function isFinal(): bool
    {
        return match ($this) {
            self::COMPLETED, self::CANCELLED, self::EXPIRED => true,
            default => false
        };
    }

    public function isActive(): bool
    {
        return match ($this) {
            self::PENDING, self::PROCESSING, self::RETRYING => true,
            default => false
        };
    }

    public function isFailure(): bool
    {
        return match ($this) {
            self::FAILED, self::EXPIRED => true,
            default => false
        };
    }
}

Dead Letter Queue Manager

When jobs exceed max retry attempts, they are automatically moved to a dead letter queue:

final readonly class DeadLetterManager
{
    public function __construct(
        private DeadLetterQueueInterface $deadLetterQueue,
        private ProductionJobPersistenceLayer $persistenceLayer
    ) {}

    /**
     * Move failed job to dead letter queue
     */
    public function moveJobToDeadLetterQueue(
        JobIndexEntry $failedJob,
        FailureReason $failureReason
    ): void {
        $dlqName = DeadLetterQueueName::forQueue(
            QueueName::fromString($failedJob->queueType)
        );

        $deadLetterJob = DeadLetterJob::fromFailedJob(
            failedJob: $failedJob,
            deadLetterQueueName: $dlqName,
            failureReason: $failureReason
        );

        $this->deadLetterQueue->addFailedJob($deadLetterJob);
        $this->persistenceLayer->deleteJob($failedJob->jobId);
    }

    /**
     * Handle job failure with automatic DLQ routing
     */
    public function handleJobFailure(
        string $jobId,
        \Throwable $exception,
        int $maxAttempts = 3
    ): bool {
        $jobEntry = $this->persistenceLayer->getJobById($jobId);

        if ($jobEntry->attempts >= $maxAttempts) {
            $this->moveJobToDeadLetterQueue(
                failedJob: $jobEntry,
                failureReason: FailureReason::fromException($exception)
            );

            return true;
        }

        return false;
    }
}

Managing Failed Jobs

use App\Framework\Queue\Services\DeadLetterManager;

// Get failed jobs
$failedJobs = $deadLetterManager->getFailedJobs(
    originalQueue: QueueName::fromString('emails'),
    limit: 100
);

// Retry a specific job
$deadLetterManager->retryJob($deadLetterJobId);

// Retry all jobs in a dead letter queue
$retriedCount = $deadLetterManager->retryAllJobs($deadLetterQueueName);

// Clear dead letter queue
$deletedCount = $deadLetterManager->clearDeadLetterQueue($deadLetterQueueName);

// Get statistics
$stats = $deadLetterManager->getStatistics();
// [
//     'default.dlq' => ['count' => 23, 'oldest' => '2024-01-15 10:30:00'],
//     'emails.dlq' => ['count' => 5, 'oldest' => '2024-01-16 14:20:00']
// ]

Queue Monitoring

Queue Statistics

// Get comprehensive statistics
$stats = $queue->getStats();

// Example output:
[
    'total_size' => 1523,
    'priority_queue_size' => 1200,
    'delayed_queue_size' => 323,
    'priority_breakdown' => [
        'critical' => 15,
        'high' => 150,
        'normal' => 1050,
        'low' => 308
    ],
    'stats' => [
        'total_pushed' => 125430,
        'total_popped' => 123907,
        'high_pushed' => 15234,
        'normal_pushed' => 95678,
        'low_pushed' => 14518,
        'last_activity' => 1705410234
    ],
    'persistence' => [
        'total_jobs' => 125430,
        'completed' => 120145,
        'failed' => 3762,
        'retrying' => 1523
    ]
]

Job Metrics

use App\Framework\Queue\Services\JobMetricsManager;

// Get job performance metrics
$metrics = $metricsManager->getJobMetrics($jobId);

// [
//     'execution_time_ms' => 1234,
//     'memory_used_mb' => 45.2,
//     'peak_memory_mb' => 52.8,
//     'attempts' => 1,
//     'last_attempt_at' => '2024-01-16 15:30:00'
// ]

// Get aggregated metrics by job type
$aggregated = $metricsManager->getAggregatedMetrics('email.welcome');

// [
//     'total_executions' => 1523,
//     'avg_execution_time_ms' => 856,
//     'p95_execution_time_ms' => 1450,
//     'p99_execution_time_ms' => 2340,
//     'success_rate' => 0.987,
//     'avg_memory_mb' => 38.4
// ]

Progress Tracking

For long-running jobs, track progress:

final readonly class LargeDataImportJob
{
    public function __construct(
        private ProgressManager $progressManager,
        private JobId $jobId,
        private array $dataFiles
    ) {}

    public function handle(): array
    {
        $totalSteps = count($this->dataFiles);

        foreach ($this->dataFiles as $index => $file) {
            // Process file
            $this->processFile($file);

            // Update progress
            $this->progressManager->updateProgress(
                jobId: $this->jobId,
                currentStep: $index + 1,
                totalSteps: $totalSteps,
                message: "Processing {$file}"
            );
        }

        return ['files_processed' => $totalSteps];
    }
}

// Query progress
$progress = $progressManager->getProgress($jobId);

// [
//     'current_step' => 45,
//     'total_steps' => 100,
//     'percentage' => 45.0,
//     'message' => 'Processing data_2024_01_16.csv',
//     'updated_at' => '2024-01-16 15:32:45'
// ]

Health Monitoring

use App\Framework\Queue\Services\WorkerHealthCheckService;

// Check worker health
$health = $healthCheckService->checkAllWorkers();

// [
//     'healthy_workers' => 8,
//     'unhealthy_workers' => 1,
//     'total_workers' => 9,
//     'workers' => [
//         ['id' => 'worker-1', 'status' => 'healthy', 'last_heartbeat' => '2024-01-16 15:33:00'],
//         ['id' => 'worker-2', 'status' => 'unhealthy', 'last_heartbeat' => '2024-01-16 15:20:15']
//     ]
// ]

// Register worker heartbeat
$healthCheckService->registerHeartbeat($workerId);

// Cleanup stale workers
$cleanedCount = $healthCheckService->cleanupStaleWorkers(
    staleThreshold: Duration::fromMinutes(5)
);

Best Practices

1. Job Design

Keep Jobs Small and Focused:

// ✅ Good: Single responsibility
final readonly class SendEmailJob
{
    public function handle(): array
    {
        $this->emailService->send($this->email, $this->template);
        return ['sent' => true];
    }
}

// ❌ Bad: Multiple responsibilities
final readonly class SendEmailAndUpdateUserJob
{
    public function handle(): array
    {
        $this->emailService->send($this->email);
        $this->userRepository->update($this->userId);  // Separate concern
        $this->analyticsService->track($this->event);  // Separate concern
    }
}

Make Jobs Idempotent:

final readonly class ProcessPaymentJob
{
    public function handle(): array
    {
        // Check if already processed (idempotency)
        if ($this->paymentRepository->isProcessed($this->paymentId)) {
            return ['status' => 'already_processed'];
        }

        // Process payment
        $result = $this->paymentGateway->charge($this->payment);

        // Mark as processed
        $this->paymentRepository->markAsProcessed($this->paymentId);

        return $result;
    }
}

Use Value Objects for Job Data:

// ✅ Good: Type-safe value objects
final readonly class SendInvoiceJob
{
    public function __construct(
        private InvoiceId $invoiceId,
        private Email $recipientEmail,
        private Money $amount
    ) {}
}

// ❌ Bad: Primitive arrays
final readonly class SendInvoiceJob
{
    public function __construct(
        private array $data  // What structure? What validation?
    ) {}
}

2. Priority Assignment

Critical: System-critical operations that must execute immediately

  • Payment processing
  • Real-time notifications
  • Security alerts

High: User-facing operations with user waiting

  • Password reset emails
  • Welcome emails
  • Report generation (user-triggered)

Normal: Standard background tasks

  • Scheduled maintenance
  • Cache warming
  • Analytics processing

Low: Non-urgent maintenance

  • Log cleanup
  • Old data archival
  • Non-critical aggregations

3. Retry Strategy

Use Exponential Backoff for transient failures:

// External API calls, network operations
JobPayload::create(
    job: $apiCallJob,
    retryStrategy: new ExponentialBackoffStrategy(
        maxAttempts: 5,
        baseDelaySeconds: 60,
        maxDelaySeconds: 3600
    )
);

No Retries for permanent failures:

// Invalid input, business logic violations
JobPayload::create(
    job: $validationJob,
    retryStrategy: null  // Fail immediately
);

Custom Schedule for specific requirements:

// Time-sensitive operations
JobPayload::create(
    job: $timeSensitiveJob,
    retryStrategy: new CustomRetryStrategy([30, 60, 120])  // Quick retries
);

4. Monitoring

Track Key Metrics:

  • Queue depth by priority
  • Average job execution time
  • Success/failure rates
  • Dead letter queue size
  • Worker health status

Set Up Alerts:

// Alert on high queue depth
if ($stats['total_size'] > 10000) {
    $alerting->send('Queue depth exceeded threshold');
}

// Alert on high failure rate
$failureRate = $stats['persistence']['failed'] / $stats['persistence']['total_jobs'];
if ($failureRate > 0.05) {  // 5% failure rate
    $alerting->send('High job failure rate detected');
}

// Alert on dead letter queue growth
if ($dlqStats['count'] > 100) {
    $alerting->send('Dead letter queue requires attention');
}

5. Resource Management

Implement Timeouts:

JobPayload::create(
    job: $job,
    timeout: Duration::fromMinutes(5)  // Kill job after 5 minutes
);

Manage Memory:

final readonly class LargeDataProcessingJob
{
    public function handle(): array
    {
        // Process in batches to avoid memory exhaustion
        foreach ($this->getBatches() as $batch) {
            $this->processBatch($batch);

            // Force garbage collection
            gc_collect_cycles();
        }
    }
}

Connection Pooling:

// Reuse connections across jobs
final readonly class DatabaseJob
{
    public function handle(): array
    {
        // Use connection pool instead of new connections
        $connection = $this->connectionPool->acquire();

        try {
            // Execute queries
            $result = $connection->query($sql);
        } finally {
            // Always release back to pool
            $this->connectionPool->release($connection);
        }
    }
}

6. Error Handling

Graceful Degradation:

final readonly class EmailJob
{
    public function handle(): array
    {
        try {
            $this->emailService->send($this->email);
            return ['status' => 'sent'];
        } catch (EmailProviderException $e) {
            // Log error but don't fail job
            $this->logger->warning('Email provider unavailable', [
                'error' => $e->getMessage()
            ]);

            // Use fallback provider
            $this->fallbackEmailService->send($this->email);
            return ['status' => 'sent_via_fallback'];
        }
    }
}

Detailed Error Logging:

try {
    $job->handle();
} catch (\Throwable $e) {
    $this->logger->error('Job failed', [
        'job_id' => $jobId,
        'job_class' => get_class($job),
        'error' => $e->getMessage(),
        'trace' => $e->getTraceAsString(),
        'attempts' => $attempts,
        'metadata' => $payload->metadata->toArray()
    ]);

    throw $e;
}

7. Testing

Unit Test Jobs:

it('sends welcome email', function () {
    $job = new SendWelcomeEmailJob(
        userId: UserId::generate(),
        email: new Email('test@example.com'),
        userName: new UserName('Test User')
    );

    $result = $job->handle();

    expect($result['sent_at'])->toBeGreaterThan(0);
});

Integration Test Queue Operations:

it('processes jobs in priority order', function () {
    $queue = new RedisQueue($connection);

    // Push jobs with different priorities
    $queue->push(JobPayload::create($lowJob, QueuePriority::low()));
    $queue->push(JobPayload::create($highJob, QueuePriority::high()));
    $queue->push(JobPayload::create($normalJob, QueuePriority::normal()));

    // Pop and verify order
    $first = $queue->pop();
    expect($first->job)->toBe($highJob);

    $second = $queue->pop();
    expect($second->job)->toBe($normalJob);

    $third = $queue->pop();
    expect($third->job)->toBe($lowJob);
});

Framework Integration

With Event System

// Dispatch jobs in response to events
#[OnEvent(priority: 100)]
public function onUserRegistered(UserRegisteredEvent $event): void
{
    $job = new SendWelcomeEmailJob(
        userId: $event->userId,
        email: $event->email,
        userName: $event->userName
    );

    $this->queue->push(JobPayload::immediate($job));
}

With Scheduler

See Scheduler-Queue Pipeline for complete integration documentation.

// Schedule recurring job dispatch
$scheduler->schedule(
    'send-daily-digest',
    CronSchedule::fromExpression('0 8 * * *'),  // Every day at 8 AM
    fn() => $this->queue->push(JobPayload::immediate(new SendDailyDigestJob()))
);

With Command Bus

// Dispatch jobs via command handlers
final readonly class CreateUserHandler
{
    public function handle(CreateUserCommand $command): User
    {
        $user = $this->userRepository->create($command);

        // Queue welcome email
        $this->queue->push(JobPayload::immediate(
            new SendWelcomeEmailJob($user->id, $user->email, $user->name)
        ));

        return $user;
    }
}

Console Commands

The framework provides console commands for queue management:

# Monitor queue status
php console.php queue:status

# Process jobs (worker)
php console.php queue:work --queue=default --timeout=60

# Retry failed jobs
php console.php queue:retry --queue=default --limit=50

# Clear queue
php console.php queue:clear --queue=default

# View failed jobs
php console.php queue:failed --limit=20

# Clean up old jobs
php console.php queue:cleanup --older-than=7d

Performance Characteristics

Redis Queue:

  • Throughput: 10,000+ jobs/second
  • Latency: <5ms per operation
  • Memory: ~200 bytes per job (serialized)
  • Scalability: Horizontal (add Redis instances)

File Queue:

  • Throughput: 1,000+ jobs/second
  • Latency: <20ms per operation
  • Memory: Minimal (disk-based)
  • Scalability: Vertical (single server)

Job Execution:

  • Overhead: <10ms per job (framework overhead)
  • Timeout: Configurable per job
  • Memory: Depends on job complexity
  • Concurrency: Multiple workers supported