entityManager->persist($deadLetterJob); $this->entityManager->flush(); } public function getJobs(DeadLetterQueueName $deadLetterQueueName, int $limit = 100): array { $sql = " SELECT * FROM dead_letter_jobs WHERE dead_letter_queue = ? ORDER BY moved_to_dlq_at DESC LIMIT ? "; $result = $this->connection->query(SqlQuery::create($sql, [$deadLetterQueueName->toString(), $limit])); $rows = $result->fetchAll(); return array_map([$this, 'mapRowToDeadLetterJob'], $rows); } public function getJobsByOriginalQueue(QueueName $originalQueue, int $limit = 100): array { $sql = " SELECT * FROM dead_letter_jobs WHERE original_queue = ? ORDER BY moved_to_dlq_at DESC LIMIT ? "; $result = $this->connection->query(SqlQuery::create($sql, [$originalQueue->toString(), $limit])); $rows = $result->fetchAll(); return array_map([$this, 'mapRowToDeadLetterJob'], $rows); } public function retryJob(string $deadLetterJobId): bool { $this->connection->beginTransaction(); try { // Get the dead letter job $deadLetterJob = $this->findDeadLetterJob($deadLetterJobId); if (! $deadLetterJob) { return false; } // Add job back to original queue $jobPayload = $deadLetterJob->getJobPayload(); $originalQueue = $deadLetterJob->getOriginalQueueName(); $this->originalQueue->push( payload: $jobPayload, queueName: $originalQueue, priority: QueuePriority::NORMAL ); // Update retry count and timestamp $updatedJob = $deadLetterJob->withRetryAttempt(); $this->entityManager->persist($updatedJob); // Delete from dead letter queue $this->deleteJobById($deadLetterJobId); $this->entityManager->flush(); $this->connection->commit(); return true; } catch (\Throwable $e) { $this->connection->rollback(); throw $e; } } public function deleteJob(string $deadLetterJobId): bool { return $this->deleteJobById($deadLetterJobId); } public function retryAllJobs(DeadLetterQueueName $deadLetterQueueName): int { $jobs = $this->getJobs($deadLetterQueueName, 1000); // Get all jobs $retriedCount = 0; foreach ($jobs as $job) { if ($this->retryJob($job->id)) { $retriedCount++; } } return $retriedCount; } public function clearQueue(DeadLetterQueueName $deadLetterQueueName): int { $sql = "DELETE FROM dead_letter_jobs WHERE dead_letter_queue = ?"; return $this->connection->execute(SqlQuery::create($sql, [$deadLetterQueueName->toString()])); } public function getQueueStats(DeadLetterQueueName $deadLetterQueueName): array { $sql = " SELECT COUNT(*) as total_jobs, AVG(failed_attempts) as avg_failed_attempts, MAX(failed_attempts) as max_failed_attempts, AVG(retry_count) as avg_retry_count, MAX(retry_count) as max_retry_count, MIN(moved_to_dlq_at) as oldest_job, MAX(moved_to_dlq_at) as newest_job FROM dead_letter_jobs WHERE dead_letter_queue = ? "; $stats = $this->connection->queryOne(SqlQuery::create($sql, [$deadLetterQueueName->toString()])); return [ 'queue_name' => $deadLetterQueueName->toString(), 'total_jobs' => (int) $stats['total_jobs'], 'avg_failed_attempts' => round((float) $stats['avg_failed_attempts'], 2), 'max_failed_attempts' => (int) $stats['max_failed_attempts'], 'avg_retry_count' => round((float) $stats['avg_retry_count'], 2), 'max_retry_count' => (int) $stats['max_retry_count'], 'oldest_job' => $stats['oldest_job'], 'newest_job' => $stats['newest_job'], ]; } public function getAvailableQueues(): array { $sql = "SELECT DISTINCT dead_letter_queue FROM dead_letter_jobs ORDER BY dead_letter_queue"; $result = $this->connection->query(SqlQuery::create($sql)); $rows = $result->fetchAll(); return array_map( fn (array $row) => DeadLetterQueueName::fromString($row['dead_letter_queue']), $rows ); } private function findDeadLetterJob(string $deadLetterJobId): ?DeadLetterJob { $sql = "SELECT * FROM dead_letter_jobs WHERE id = ?"; $row = $this->connection->queryOne(SqlQuery::create($sql, [$deadLetterJobId])); return $row ? $this->mapRowToDeadLetterJob($row) : null; } private function deleteJobById(string $deadLetterJobId): bool { $sql = "DELETE FROM dead_letter_jobs WHERE id = ?"; return $this->connection->execute(SqlQuery::create($sql, [$deadLetterJobId])) > 0; } private function mapRowToDeadLetterJob(array $row): DeadLetterJob { return new DeadLetterJob( id: $row['id'], originalJobId: $row['original_job_id'], deadLetterQueue: $row['dead_letter_queue'], originalQueue: $row['original_queue'], jobPayload: $row['job_payload'], failureReason: $row['failure_reason'], exceptionType: $row['exception_type'], stackTrace: $row['stack_trace'], failedAttempts: (int) $row['failed_attempts'], failedAt: $row['failed_at'], movedToDlqAt: $row['moved_to_dlq_at'], retryCount: (int) $row['retry_count'], lastRetryAt: $row['last_retry_at'] ); } }