queues)) { throw new \InvalidArgumentException('Worker must handle at least one queue'); } if ($this->maxJobs <= 0) { throw new \InvalidArgumentException('Max jobs must be greater than 0'); } if ($this->currentJobs < 0) { throw new \InvalidArgumentException('Current jobs cannot be negative'); } if ($this->currentJobs > $this->maxJobs) { throw new \InvalidArgumentException('Current jobs cannot exceed max jobs'); } } /** * Erstelle einen neuen Worker */ public static function register( string $hostname, int $processId, array $queues, int $maxJobs = 10, array $capabilities = [] ): self { return new self( id: WorkerId::forHost($hostname, $processId), hostname: $hostname, processId: $processId, queues: $queues, maxJobs: $maxJobs, registeredAt: new \DateTimeImmutable(), lastHeartbeat: new \DateTimeImmutable(), isActive: true, capabilities: $capabilities ); } /** * Worker Heartbeat aktualisieren */ public function updateHeartbeat( Percentage $cpuUsage, Byte $memoryUsage, int $currentJobs ): self { return new self( id: $this->id, hostname: $this->hostname, processId: $this->processId, queues: $this->queues, maxJobs: $this->maxJobs, registeredAt: $this->registeredAt, lastHeartbeat: new \DateTimeImmutable(), isActive: true, cpuUsage: $cpuUsage, memoryUsage: $memoryUsage, currentJobs: $currentJobs, capabilities: $this->capabilities, version: $this->version ); } /** * Worker als inaktiv markieren */ public function markInactive(): self { return new self( id: $this->id, hostname: $this->hostname, processId: $this->processId, queues: $this->queues, maxJobs: $this->maxJobs, registeredAt: $this->registeredAt, lastHeartbeat: $this->lastHeartbeat, isActive: false, cpuUsage: $this->cpuUsage, memoryUsage: $this->memoryUsage, currentJobs: $this->currentJobs, capabilities: $this->capabilities, version: $this->version ); } /** * Prüfe ob Worker verfügbar für neue Jobs ist */ public function isAvailableForJobs(): bool { return $this->isActive && $this->currentJobs < $this->maxJobs && $this->isHealthy(); } /** * Prüfe ob Worker eine bestimmte Queue unterstützt */ public function handlesQueue(QueueName $queueName): bool { foreach ($this->queues as $queue) { if ($queue instanceof QueueName && $queue->equals($queueName)) { return true; } } return false; } /** * Prüfe ob Worker healthy ist */ public function isHealthy(): bool { if (! $this->isActive) { return false; } // Heartbeat nicht älter als 60 Sekunden if ($this->lastHeartbeat === null) { return false; } $heartbeatAge = time() - $this->lastHeartbeat->getTimestamp(); if ($heartbeatAge > 60) { return false; } // CPU und Memory Limits if ($this->cpuUsage->getValue() > 90) { return false; } // Memory Limit (2GB) if ($this->memoryUsage->toBytes() > 2 * 1024 * 1024 * 1024) { return false; } return true; } /** * Berechne Worker Load (0-100%) */ public function getLoadPercentage(): Percentage { if ($this->maxJobs === 0) { return new Percentage(100); } $jobLoad = ($this->currentJobs / $this->maxJobs) * 100; $cpuLoad = $this->cpuUsage->getValue(); // Höchste Last zählt return new Percentage(max($jobLoad, $cpuLoad)); } /** * Prüfe ob Worker eine Capability hat */ public function hasCapability(string $capability): bool { return in_array($capability, $this->capabilities, true); } /** * Worker Informationen für Monitoring */ public function toMonitoringArray(): array { return [ 'id' => $this->id->toString(), 'hostname' => $this->hostname, 'process_id' => $this->processId, 'queues' => array_map(fn (QueueName $queue) => $queue->toString(), $this->queues), 'max_jobs' => $this->maxJobs, 'current_jobs' => $this->currentJobs, 'is_active' => $this->isActive, 'is_healthy' => $this->isHealthy(), 'is_available' => $this->isAvailableForJobs(), 'load_percentage' => $this->getLoadPercentage()->getValue(), 'cpu_usage' => $this->cpuUsage->getValue(), 'memory_usage_mb' => round($this->memoryUsage->toBytes() / 1024 / 1024, 2), 'registered_at' => $this->registeredAt->format('Y-m-d H:i:s'), 'last_heartbeat' => $this->lastHeartbeat?->format('Y-m-d H:i:s'), 'capabilities' => $this->capabilities, 'version' => $this->version, ]; } /** * Array Repräsentation für Persistierung */ public function toArray(): array { return [ 'id' => $this->id->toString(), 'hostname' => $this->hostname, 'process_id' => $this->processId, 'queues' => json_encode(array_map(fn (QueueName $queue) => $queue->toString(), $this->queues)), 'max_jobs' => $this->maxJobs, 'current_jobs' => $this->currentJobs, 'is_active' => $this->isActive, 'cpu_usage' => $this->cpuUsage->getValue(), 'memory_usage_bytes' => $this->memoryUsage->toBytes(), 'registered_at' => $this->registeredAt->format('Y-m-d H:i:s'), 'last_heartbeat' => $this->lastHeartbeat?->format('Y-m-d H:i:s'), 'capabilities' => json_encode($this->capabilities), 'version' => $this->version, ]; } /** * Worker aus Array erstellen */ public static function fromArray(array $data): self { $queueStrings = json_decode($data['queues'], true); $queues = array_map(function (string $queueString) { // Parse queue string zurück zu QueueName // Annahme: Format ist "type.name" oder "tenant.type.name" $parts = explode('.', $queueString); if (count($parts) >= 2) { return QueueName::default(); // Vereinfacht - könnte erweitert werden } return QueueName::default(); }, $queueStrings); return new self( id: WorkerId::fromString($data['id']), hostname: $data['hostname'], processId: $data['process_id'], queues: $queues, maxJobs: $data['max_jobs'], registeredAt: new \DateTimeImmutable($data['registered_at']), lastHeartbeat: $data['last_heartbeat'] ? new \DateTimeImmutable($data['last_heartbeat']) : null, isActive: (bool) $data['is_active'], cpuUsage: new Percentage($data['cpu_usage'] ?? 0), memoryUsage: Byte::fromBytes($data['memory_usage_bytes'] ?? 0), currentJobs: $data['current_jobs'] ?? 0, capabilities: json_decode($data['capabilities'] ?? '[]', true), version: $data['version'] ?? '1.0.0' ); } }