- Move 12 markdown files from root to docs/ subdirectories - Organize documentation by category: • docs/troubleshooting/ (1 file) - Technical troubleshooting guides • docs/deployment/ (4 files) - Deployment and security documentation • docs/guides/ (3 files) - Feature-specific guides • docs/planning/ (4 files) - Planning and improvement proposals Root directory cleanup: - Reduced from 16 to 4 markdown files in root - Only essential project files remain: • CLAUDE.md (AI instructions) • README.md (Main project readme) • CLEANUP_PLAN.md (Current cleanup plan) • SRC_STRUCTURE_IMPROVEMENTS.md (Structure improvements) This improves: ✅ Documentation discoverability ✅ Logical organization by purpose ✅ Clean root directory ✅ Better maintainability
458 lines
15 KiB
Markdown
458 lines
15 KiB
Markdown
# Scheduler-Queue Pipeline
|
|
|
|
Komplette Dokumentation der Scheduler-Queue Integration Pipeline im Custom PHP Framework.
|
|
|
|
## Übersicht
|
|
|
|
Die Scheduler-Queue Pipeline ist eine vollständig integrierte Lösung für zeitbasierte Aufgabenplanung und asynchrone Job-Ausführung. Sie kombiniert das Framework's Scheduler System mit dem erweiterten Queue System für robuste, skalierbare Background-Processing.
|
|
|
|
## Architektur
|
|
|
|
```
|
|
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
|
|
│ Scheduler │───▶│ Job Dispatch │───▶│ Queue System │───▶│ Background Exec │
|
|
│ System │ │ & Validation │ │ (13 Tables) │ │ & Logging │
|
|
└─────────────────┘ └─────────────────┘ └─────────────────┘ └─────────────────┘
|
|
│ │ │ │
|
|
Cron/Interval JobPayload FileQueue/Redis Named Job Classes
|
|
OneTime/Manual Value Objects w/ Metrics w/ handle() Method
|
|
Schedule Types Type Safety & Monitoring Result Logging
|
|
```
|
|
|
|
## Pipeline Komponenten
|
|
|
|
### 1. Scheduler System
|
|
**Location**: `src/Framework/Scheduler/`
|
|
|
|
**Schedule Types**:
|
|
- **CronSchedule**: Traditionelle Cron-basierte Zeitplanung
|
|
- **IntervalSchedule**: Wiederholende Ausführung in festen Intervallen
|
|
- **OneTimeSchedule**: Einmalige Ausführung zu bestimmter Zeit
|
|
- **ManualSchedule**: Manuelle Trigger-basierte Ausführung
|
|
|
|
**Core Services**:
|
|
- `SchedulerService`: Hauptservice für Task-Management
|
|
- `TaskExecutionResult`: Value Object für Execution-Ergebnisse
|
|
- `Timestamp`/`Duration`: Zeit-Value Objects mit Framework-Compliance
|
|
|
|
### 2. Job Dispatch & Validation
|
|
**Integration Layer zwischen Scheduler und Queue**
|
|
|
|
**Key Features**:
|
|
- **Type Safety**: Verwendung von JobPayload Value Objects
|
|
- **Named Job Classes**: Vermeidung von Anonymous Classes (ClassName Restrictions)
|
|
- **Validation**: Input-Validierung vor Queue-Dispatch
|
|
- **Error Handling**: Graceful Fallbacks bei Dispatch-Fehlern
|
|
|
|
### 3. Queue System
|
|
**Location**: `src/Framework/Queue/`
|
|
|
|
**Database Schema** (13 Specialized Tables):
|
|
```sql
|
|
jobs -- Haupt-Job-Queue
|
|
job_batches -- Batch-Job-Verwaltung
|
|
job_history -- Execution-Historie
|
|
job_metrics -- Performance-Metriken
|
|
dead_letter_jobs -- Fehlgeschlagene Jobs
|
|
job_priorities -- Prioritäts-basierte Scheduling
|
|
job_dependencies -- Inter-Job Dependencies
|
|
delayed_jobs -- Zeit-verzögerte Ausführung
|
|
worker_processes -- Worker-Management
|
|
job_locks -- Distributed Locking
|
|
job_progress -- Progress Tracking
|
|
recurring_jobs -- Wiederkehrende Patterns
|
|
job_tags -- Kategorisierung & Filtering
|
|
```
|
|
|
|
**Core Features**:
|
|
- **Multi-Driver Support**: FileQueue, Redis, Database
|
|
- **Priority Scheduling**: High/Medium/Low Priority Queues
|
|
- **Metrics & Monitoring**: Real-time Performance Tracking
|
|
- **Dead Letter Queue**: Automatic Failed Job Management
|
|
- **Distributed Locking**: Multi-Worker Coordination
|
|
|
|
### 4. Background Execution & Logging
|
|
**Job Processing mit umfassender Protokollierung**
|
|
|
|
**Execution Pattern**:
|
|
```php
|
|
final class ScheduledBackgroundJob
|
|
{
|
|
public function handle(): array
|
|
{
|
|
// Business Logic Execution
|
|
$result = $this->performWork();
|
|
|
|
// Automatic Logging
|
|
$this->logExecution($result);
|
|
|
|
return $result;
|
|
}
|
|
}
|
|
```
|
|
|
|
## Verwendung
|
|
|
|
### Basic Scheduler-Queue Integration
|
|
|
|
```php
|
|
use App\Framework\Scheduler\Services\SchedulerService;
|
|
use App\Framework\Scheduler\Schedules\IntervalSchedule;
|
|
use App\Framework\Queue\Queue;
|
|
use App\Framework\Queue\ValueObjects\JobPayload;
|
|
use App\Framework\Core\ValueObjects\Duration;
|
|
|
|
// 1. Schedule Setup
|
|
$scheduler = $container->get(SchedulerService::class);
|
|
$queue = $container->get(Queue::class);
|
|
|
|
// 2. Create Interval Schedule (every 10 minutes)
|
|
$schedule = IntervalSchedule::every(Duration::fromMinutes(10));
|
|
|
|
// 3. Register Task with Queue Dispatch
|
|
$scheduler->schedule('email-cleanup', $schedule, function() use ($queue) {
|
|
$job = new EmailCleanupJob(
|
|
olderThan: Duration::fromDays(30),
|
|
batchSize: 100
|
|
);
|
|
|
|
$payload = JobPayload::immediate($job);
|
|
$queue->push($payload);
|
|
|
|
return ['status' => 'queued', 'timestamp' => time()];
|
|
});
|
|
```
|
|
|
|
### Advanced Pipeline Configuration
|
|
|
|
```php
|
|
// Complex Multi-Stage Pipeline
|
|
final class ReportGenerationPipeline
|
|
{
|
|
public function __construct(
|
|
private readonly SchedulerService $scheduler,
|
|
private readonly Queue $queue
|
|
) {}
|
|
|
|
public function setupDailyReports(): void
|
|
{
|
|
// Stage 1: Data Collection (Daily at 2 AM)
|
|
$this->scheduler->schedule(
|
|
'daily-data-collection',
|
|
CronSchedule::fromExpression('0 2 * * *'),
|
|
fn() => $this->dispatchDataCollection()
|
|
);
|
|
|
|
// Stage 2: Report Generation (After data collection)
|
|
$this->scheduler->schedule(
|
|
'daily-report-generation',
|
|
CronSchedule::fromExpression('30 2 * * *'),
|
|
fn() => $this->dispatchReportGeneration()
|
|
);
|
|
|
|
// Stage 3: Distribution (After generation)
|
|
$this->scheduler->schedule(
|
|
'daily-report-distribution',
|
|
CronSchedule::fromExpression('0 3 * * *'),
|
|
fn() => $this->dispatchReportDistribution()
|
|
);
|
|
}
|
|
|
|
private function dispatchDataCollection(): array
|
|
{
|
|
$job = new DataCollectionJob(
|
|
sources: ['database', 'analytics', 'external_apis'],
|
|
target_date: Timestamp::yesterday()
|
|
);
|
|
|
|
$payload = JobPayload::withPriority($job, Priority::HIGH);
|
|
$this->queue->push($payload);
|
|
|
|
return ['stage' => 'data_collection', 'queued_at' => time()];
|
|
}
|
|
}
|
|
```
|
|
|
|
### Named Job Classes (Best Practice)
|
|
|
|
```php
|
|
// ✅ Framework-Compliant Job Class
|
|
final class EmailCleanupJob
|
|
{
|
|
public function __construct(
|
|
private readonly Duration $olderThan,
|
|
private readonly int $batchSize = 100
|
|
) {}
|
|
|
|
public function handle(): array
|
|
{
|
|
$deleted = $this->emailService->deleteOldEmails(
|
|
$this->olderThan,
|
|
$this->batchSize
|
|
);
|
|
|
|
$this->logCleanupResults($deleted);
|
|
|
|
return [
|
|
'deleted_count' => count($deleted),
|
|
'batch_size' => $this->batchSize,
|
|
'cleanup_threshold' => $this->olderThan->toDays() . ' days'
|
|
];
|
|
}
|
|
|
|
public function getType(): string
|
|
{
|
|
return 'email-cleanup';
|
|
}
|
|
}
|
|
```
|
|
|
|
## Pipeline Monitoring
|
|
|
|
### Health Checks
|
|
|
|
```php
|
|
// Pipeline Health Verification
|
|
final class PipelineHealthChecker
|
|
{
|
|
public function checkPipelineHealth(): PipelineHealthReport
|
|
{
|
|
return new PipelineHealthReport([
|
|
'scheduler_status' => $this->checkSchedulerHealth(),
|
|
'queue_status' => $this->checkQueueHealth(),
|
|
'integration_status' => $this->checkIntegrationHealth(),
|
|
'performance_metrics' => $this->gatherPerformanceMetrics()
|
|
]);
|
|
}
|
|
|
|
private function checkSchedulerHealth(): array
|
|
{
|
|
$dueTasks = $this->scheduler->getDueTasks();
|
|
$nextExecution = $this->scheduler->getNextExecutionTime();
|
|
|
|
return [
|
|
'due_tasks_count' => count($dueTasks),
|
|
'next_execution' => $nextExecution?->format('Y-m-d H:i:s'),
|
|
'status' => count($dueTasks) < 100 ? 'healthy' : 'overloaded'
|
|
];
|
|
}
|
|
|
|
private function checkQueueHealth(): array
|
|
{
|
|
$stats = $this->queue->getStats();
|
|
|
|
return [
|
|
'queue_size' => $stats['total_size'],
|
|
'priority_distribution' => $stats['priority_breakdown'],
|
|
'processing_rate' => $this->calculateProcessingRate(),
|
|
'status' => $stats['total_size'] < 1000 ? 'healthy' : 'backlog'
|
|
];
|
|
}
|
|
}
|
|
```
|
|
|
|
### Performance Metrics
|
|
|
|
```php
|
|
// Real-time Pipeline Performance
|
|
final class PipelineMetricsCollector
|
|
{
|
|
public function collectMetrics(string $timeframe = '1hour'): array
|
|
{
|
|
return [
|
|
'scheduler_metrics' => [
|
|
'tasks_executed' => $this->getExecutedTasksCount($timeframe),
|
|
'average_execution_time' => $this->getAverageExecutionTime($timeframe),
|
|
'success_rate' => $this->getSchedulerSuccessRate($timeframe)
|
|
],
|
|
'queue_metrics' => [
|
|
'jobs_processed' => $this->getProcessedJobsCount($timeframe),
|
|
'average_wait_time' => $this->getAverageWaitTime($timeframe),
|
|
'throughput' => $this->calculateThroughput($timeframe)
|
|
],
|
|
'integration_metrics' => [
|
|
'dispatch_success_rate' => $this->getDispatchSuccessRate($timeframe),
|
|
'end_to_end_latency' => $this->getEndToEndLatency($timeframe),
|
|
'error_rate' => $this->getIntegrationErrorRate($timeframe)
|
|
]
|
|
];
|
|
}
|
|
}
|
|
```
|
|
|
|
## Troubleshooting
|
|
|
|
### Häufige Probleme
|
|
|
|
**1. Jobs werden nicht ausgeführt**
|
|
```bash
|
|
# Diagnose Queue Status
|
|
docker exec php php console.php queue:status
|
|
|
|
# Check Scheduler Tasks
|
|
docker exec php php console.php scheduler:status
|
|
|
|
# Verify Integration
|
|
docker exec php php tests/debug/test-scheduler-queue-integration-fixed.php
|
|
```
|
|
|
|
**2. Memory Leaks bei großen Jobs**
|
|
```php
|
|
// Memory-effiziente Job Implementation
|
|
final class LargeDataProcessingJob
|
|
{
|
|
public function handle(): array
|
|
{
|
|
// Batch Processing to prevent memory exhaustion
|
|
$batch = $this->dataSource->getBatch($this->batchSize);
|
|
|
|
while (!empty($batch)) {
|
|
$this->processBatch($batch);
|
|
|
|
// Force garbage collection
|
|
gc_collect_cycles();
|
|
|
|
$batch = $this->dataSource->getNextBatch($this->batchSize);
|
|
}
|
|
|
|
return ['processed' => $this->totalProcessed];
|
|
}
|
|
}
|
|
```
|
|
|
|
**3. Queue Backlog Management**
|
|
```php
|
|
// Automatic Backlog Resolution
|
|
final class BacklogManager
|
|
{
|
|
public function resolveBacklog(): void
|
|
{
|
|
$stats = $this->queue->getStats();
|
|
|
|
if ($stats['total_size'] > $this->backlogThreshold) {
|
|
// Scale up workers
|
|
$this->workerManager->scaleUp($this->calculateRequiredWorkers($stats));
|
|
|
|
// Prioritize critical jobs
|
|
$this->queue->reprioritize(['high_priority_types']);
|
|
|
|
// Alert operations team
|
|
$this->alertManager->sendBacklogAlert($stats);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Testing
|
|
|
|
### Integration Tests
|
|
|
|
```php
|
|
// Complete Pipeline Testing
|
|
describe('Scheduler Queue Pipeline', function () {
|
|
it('handles full pipeline flow', function () {
|
|
// 1. Setup Scheduler Task
|
|
$schedule = IntervalSchedule::every(Duration::fromSeconds(1));
|
|
$this->scheduler->schedule('test-task', $schedule, function() {
|
|
$job = new TestPipelineJob('pipeline-test');
|
|
$payload = JobPayload::immediate($job);
|
|
$this->queue->push($payload);
|
|
return ['dispatched' => true];
|
|
});
|
|
|
|
// 2. Execute Scheduler
|
|
$results = $this->scheduler->executeDueTasks();
|
|
expect($results)->toHaveCount(1);
|
|
expect($results[0]->success)->toBeTrue();
|
|
|
|
// 3. Process Queue
|
|
$jobPayload = $this->queue->pop();
|
|
expect($jobPayload)->not->toBeNull();
|
|
|
|
$result = $jobPayload->job->handle();
|
|
expect($result['status'])->toBe('completed');
|
|
|
|
// 4. Verify End-to-End
|
|
expect($this->queue->size())->toBe(0);
|
|
});
|
|
});
|
|
```
|
|
|
|
### Performance Tests
|
|
|
|
```php
|
|
// Pipeline Performance Benchmarks
|
|
describe('Pipeline Performance', function () {
|
|
it('processes 1000 jobs within 30 seconds', function () {
|
|
$startTime = microtime(true);
|
|
|
|
// Dispatch 1000 jobs via scheduler
|
|
for ($i = 0; $i < 1000; $i++) {
|
|
$job = new BenchmarkJob("job-{$i}");
|
|
$payload = JobPayload::immediate($job);
|
|
$this->queue->push($payload);
|
|
}
|
|
|
|
// Process all jobs
|
|
while ($this->queue->size() > 0) {
|
|
$jobPayload = $this->queue->pop();
|
|
$jobPayload->job->handle();
|
|
}
|
|
|
|
$executionTime = microtime(true) - $startTime;
|
|
expect($executionTime)->toBeLessThan(30.0);
|
|
expect($this->queue->size())->toBe(0);
|
|
});
|
|
});
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### 1. Job Design
|
|
- **Named Classes**: Immer Named Classes statt Anonymous Functions verwenden
|
|
- **Type Safety**: JobPayload Value Objects für alle Queue-Operationen
|
|
- **Idempotency**: Jobs sollten mehrfach ausführbar sein ohne Seiteneffekte
|
|
- **Error Handling**: Graceful Degradation bei Fehlern implementieren
|
|
|
|
### 2. Scheduler Configuration
|
|
- **Reasonable Intervals**: Nicht zu aggressive Scheduling-Intervalle
|
|
- **Resource Awareness**: CPU/Memory-Limits bei Task-Design beachten
|
|
- **Monitoring**: Kontinuierliche Überwachung der Execution-Times
|
|
- **Failover**: Backup-Strategien für kritische Tasks
|
|
|
|
### 3. Queue Management
|
|
- **Priority Classes**: Sinnvolle Priorisierung für verschiedene Job-Types
|
|
- **Batch Processing**: Große Datenmengen in Batches aufteilen
|
|
- **Dead Letter Handling**: Automatische Failed-Job-Recovery
|
|
- **Metrics Collection**: Performance-Daten für Optimierung sammeln
|
|
|
|
### 4. Production Deployment
|
|
- **Health Checks**: Regelmäßige Pipeline-Health-Verification
|
|
- **Alerting**: Automatische Benachrichtigungen bei Problemen
|
|
- **Scaling**: Auto-Scaling für Worker-Processes
|
|
- **Backup**: Disaster-Recovery-Strategien für Queue-Daten
|
|
|
|
## Framework Integration
|
|
|
|
Die Pipeline nutzt konsequent Framework-Patterns:
|
|
- **Value Objects**: Timestamp, Duration, JobPayload, Priority
|
|
- **Readonly Classes**: Unveränderliche Job-Definitionen
|
|
- **Event System**: Integration mit Framework's Event Dispatcher
|
|
- **DI Container**: Automatic Service Resolution
|
|
- **Attribute Discovery**: Convention-over-Configuration
|
|
- **MCP Integration**: AI-gestützte Pipeline-Analyse
|
|
|
|
## Performance Charakteristiken
|
|
|
|
**Typische Performance-Werte**:
|
|
- **Job Dispatch Latency**: < 50ms
|
|
- **Queue Throughput**: 1000+ Jobs/Minute (FileQueue)
|
|
- **Memory Usage**: < 50MB für Standard-Jobs
|
|
- **Scheduler Precision**: ±1 Second für Cron-basierte Tasks
|
|
- **End-to-End Latency**: < 500ms für einfache Jobs
|
|
|
|
**Skalierungscharakteristiken**:
|
|
- **Horizontal Scaling**: Multi-Worker Support
|
|
- **Queue Capacity**: 100,000+ Jobs (Database-backed)
|
|
- **Scheduler Load**: 10,000+ concurrent scheduled tasks
|
|
- **Memory Efficiency**: Linear scaling mit Job-Complexity |