Files
michaelschiemer/src/Framework/CircuitBreaker/CircuitBreaker.php
Michael Schiemer 55a330b223 Enable Discovery debug logging for production troubleshooting
- Add DISCOVERY_LOG_LEVEL=debug
- Add DISCOVERY_SHOW_PROGRESS=true
- Temporary changes for debugging InitializerProcessor fixes on production
2025-08-11 20:13:26 +02:00

505 lines
18 KiB
PHP

<?php
declare(strict_types=1);
namespace App\Framework\CircuitBreaker;
use App\Framework\Cache\Cache;
use App\Framework\Cache\CacheItem;
use App\Framework\Cache\CacheKey;
use App\Framework\CircuitBreaker\Events\CircuitBreakerClosed;
use App\Framework\CircuitBreaker\Events\CircuitBreakerEventPublisher;
use App\Framework\CircuitBreaker\Events\CircuitBreakerHalfOpened;
use App\Framework\CircuitBreaker\Events\CircuitBreakerOpened;
use App\Framework\CircuitBreaker\Registry\ServiceRegistry;
use App\Framework\Core\ValueObjects\Duration;
use App\Framework\Core\ValueObjects\Timestamp;
use App\Framework\DateTime\Clock;
use App\Framework\Logging\Logger;
use Throwable;
/**
* Circuit Breaker Pattern Implementation
*
* Schützt das System vor wiederholten Fehlern durch temporäres Blockieren von Requests
* nach einer bestimmten Anzahl von Fehlern.
*
* States:
* - CLOSED: Normal operation, alle Requests werden durchgelassen
* - OPEN: Service ist als fehlerhaft markiert, alle Requests werden abgelehnt
* - HALF_OPEN: Test-Phase, limitierte Requests werden durchgelassen
*/
final readonly class CircuitBreaker implements CircuitBreakerInterface
{
private const int DEFAULT_FAILURE_THRESHOLD = 5;
private const int DEFAULT_RECOVERY_TIMEOUT = 60; // seconds
private const int DEFAULT_HALF_OPEN_MAX_ATTEMPTS = 3;
private const int DEFAULT_SUCCESS_THRESHOLD = 3;
private string $cachePrefix;
public function __construct(
private Cache $cache,
private Clock $clock,
private ?Logger $logger = null,
private ?CircuitBreakerEventPublisher $eventPublisher = null,
private ?ServiceRegistry $serviceRegistry = null,
private string $namespace = 'default'
) {
$this->cachePrefix = 'circuit_breaker_' . $this->namespace . ':';
}
/**
* Prüft ob der Circuit Breaker für einen Service offen ist
*
* @throws CircuitBreakerException wenn der Circuit Breaker offen ist
*/
public function check(string $service, ?CircuitBreakerConfig $config = null): void
{
// Register service in registry
$this->registerService($service);
$config ??= $this->getDefaultConfig();
$state = $this->getState($service, $config);
if ($state === CircuitState::OPEN) {
$metrics = $this->getMetrics($service);
// Prüfen ob Recovery Timeout abgelaufen ist
if ($this->isRecoveryTimeoutExpired($service, $config)) {
$this->transitionToHalfOpen($service, $config);
$this->log('info', "Circuit breaker for '{$service}' transitioned to HALF_OPEN");
return;
}
// Circuit ist offen und Timeout noch nicht abgelaufen
$retryAfter = $this->calculateRetryAfter($service, $config);
throw new CircuitBreakerException(
service: $service,
state: CircuitState::OPEN,
failureCount: $metrics->failureCount,
retryAfterSeconds: $retryAfter
);
}
if ($state === CircuitState::HALF_OPEN) {
$halfOpenAttempts = $this->getHalfOpenAttempts($service);
if ($halfOpenAttempts >= $config->halfOpenMaxAttempts) {
// Zu viele Versuche im HALF_OPEN State, zurück zu OPEN
$this->transitionToOpen($service, 'Max half-open attempts exceeded', $config);
throw new CircuitBreakerException(
service: $service,
state: CircuitState::OPEN,
failureCount: $this->getMetrics($service)->failureCount,
retryAfterSeconds: $config->recoveryTimeout->toCacheSeconds()
);
}
// Request im HALF_OPEN State erlauben
$this->incrementHalfOpenAttempts($service, $config);
}
}
/**
* Markiert einen erfolgreichen Aufruf
*/
public function recordSuccess(string $service, ?CircuitBreakerConfig $config = null): void
{
$config ??= $this->getDefaultConfig();
$state = $this->getState($service, $config);
if ($state === CircuitState::HALF_OPEN) {
$successCount = $this->incrementSuccessCount($service, $config);
if ($successCount >= $config->successThreshold) {
// Genug erfolgreiche Aufrufe, Circuit schließen
$this->transitionToClosed($service, $config);
$this->log('info', "Circuit breaker for '{$service}' transitioned to CLOSED after {$successCount} successes");
}
} else {
// Im CLOSED State Success Count zurücksetzen
$this->resetMetrics($service);
}
}
/**
* Markiert einen fehlgeschlagenen Aufruf
*/
public function recordFailure(string $service, Throwable $exception, ?CircuitBreakerConfig $config = null): void
{
$config ??= $this->getDefaultConfig();
// Check if this exception should trigger the circuit breaker
if (! $config->shouldTriggerOnException($exception, $service)) {
$this->log('debug', "Exception ignored by failure predicate for service '{$service}': " . get_class($exception));
return;
}
$state = $this->getState($service, $config);
if ($state === CircuitState::HALF_OPEN) {
// Fehler im HALF_OPEN State führt sofort zu OPEN
$this->transitionToOpen($service, 'Failure in HALF_OPEN state: ' . $exception->getMessage(), $config);
return;
}
if ($state === CircuitState::CLOSED) {
$failureCount = $this->incrementFailureCount($service, $config);
if ($failureCount >= $config->failureThreshold) {
// Failure Threshold erreicht, Circuit öffnen
$this->transitionToOpen($service, 'Failure threshold exceeded: ' . $exception->getMessage(), $config);
$this->log('warning', "Circuit breaker for '{$service}' opened after {$failureCount} failures");
}
}
}
/**
* Führt eine Operation mit Circuit Breaker Schutz aus
*
* @template T
* @param callable(): T $operation
* @return T
* @throws Throwable
*/
public function execute(string $service, callable $operation, ?CircuitBreakerConfig $config = null): mixed
{
$this->check($service, $config);
try {
$result = $operation();
$this->recordSuccess($service, $config);
return $result;
} catch (Throwable $e) {
$this->recordFailure($service, $e, $config);
throw $e;
}
}
/**
* Gibt den aktuellen Zustand des Circuit Breakers zurück
*/
public function getState(string $service, ?CircuitBreakerConfig $config = null): CircuitState
{
$config ??= $this->getDefaultConfig();
$stateData = $this->cache->get($this->getStateKey($service));
if (! $stateData->isHit) {
return CircuitState::CLOSED;
}
// Ensure $stateData->value is an array and has a 'state' key
if (! is_array($stateData->value) || ! isset($stateData->value['state'])) {
$this->log('warning', "Invalid state data format in cache for service '{$service}'");
return CircuitState::CLOSED;
}
$state = CircuitState::from($stateData->value['state']);
// Prüfen ob OPEN State abgelaufen ist
if ($state === CircuitState::OPEN && $this->isRecoveryTimeoutExpired($service, $config)) {
return CircuitState::HALF_OPEN;
}
return $state;
}
/**
* Gibt detaillierte Metriken für einen Service zurück
*/
public function getMetrics(string $service): CircuitBreakerMetrics
{
$state = $this->getState($service);
$failureCountItem = $this->cache->get($this->getFailureCountKey($service));
$successCountItem = $this->cache->get($this->getSuccessCountKey($service));
$halfOpenAttemptsItem = $this->cache->get($this->getHalfOpenAttemptsKey($service));
$lastFailureTimeItem = $this->cache->get($this->getLastFailureTimeKey($service));
$openedAtItem = $this->cache->get($this->getOpenedAtKey($service));
return new CircuitBreakerMetrics(
state: $state,
failureCount: $failureCountItem->isHit ? $failureCountItem->value : 0,
successCount: $successCountItem->isHit ? $successCountItem->value : 0,
halfOpenAttempts: $halfOpenAttemptsItem->isHit ? $halfOpenAttemptsItem->value : 0,
lastFailureTime: $lastFailureTimeItem->isHit
? Timestamp::fromFloat($lastFailureTimeItem->value)
: null,
openedAt: $openedAtItem->isHit
? Timestamp::fromFloat($openedAtItem->value)
: null,
);
}
/**
* Setzt den Circuit Breaker für einen Service zurück
*/
public function reset(string $service): void
{
$keys = [
$this->getStateKey($service),
$this->getFailureCountKey($service),
$this->getSuccessCountKey($service),
$this->getHalfOpenAttemptsKey($service),
$this->getLastFailureTimeKey($service),
$this->getOpenedAtKey($service),
];
foreach ($keys as $key) {
$this->cache->forget($key);
}
$this->log('info', "Circuit breaker for '{$service}' has been reset");
}
/**
* Setzt alle Circuit Breaker zurück
*/
public function resetAll(): void
{
// Implementation hängt vom Cache-Backend ab
// Hier würde man alle Keys mit dem Prefix löschen
$this->log('info', 'All circuit breakers have been reset');
}
private function transitionToOpen(string $service, string $reason = '', ?CircuitBreakerConfig $config = null): void
{
$config ??= $this->getDefaultConfig();
$previousState = $this->getState($service, $config);
$now = Timestamp::fromClock($this->clock);
// EMERGENCY: Limit reason string to prevent memory explosion
$limitedReason = strlen($reason) > 1000
? substr($reason, 0, 1000) . '... (truncated for memory safety)'
: $reason;
$this->cache->set(CacheItem::forSet($this->getStateKey($service), [
'state' => CircuitState::OPEN->value,
'opened_at' => $now->toFloat(),
'reason' => $limitedReason,
], $config->metricsRetentionTime));
$this->cache->set(CacheItem::forSet($this->getOpenedAtKey($service), $now->toFloat(), $config->metricsRetentionTime));
$this->cache->set(CacheItem::forSet($this->getLastFailureTimeKey($service), $now->toFloat(), $config->metricsRetentionTime));
// Publish event
if ($this->eventPublisher !== null) {
$metrics = $this->getMetrics($service);
$event = new CircuitBreakerOpened(
service : $service,
namespace : $this->cachePrefix,
previousState: $previousState,
metrics : $metrics,
reason : $reason,
occurredAt : $now
);
$this->eventPublisher->publish($event);
}
}
private function transitionToHalfOpen(string $service, ?CircuitBreakerConfig $config = null): void
{
$config ??= $this->getDefaultConfig();
$previousState = $this->getState($service, $config);
$now = Timestamp::fromClock($this->clock);
$this->cache->set(CacheItem::forSet($this->getStateKey($service), [
'state' => CircuitState::HALF_OPEN->value,
'transitioned_at' => $now->toFloat(),
], $config->metricsRetentionTime));
// Reset half-open attempts counter
$this->cache->forget($this->getHalfOpenAttemptsKey($service));
$this->cache->forget($this->getSuccessCountKey($service));
// Publish event
if ($this->eventPublisher !== null) {
$metrics = $this->getMetrics($service);
$event = new CircuitBreakerHalfOpened(
service : $service,
namespace : $this->cachePrefix,
previousState: $previousState,
metrics : $metrics,
reason : 'Recovery timeout expired',
occurredAt : $now
);
$this->eventPublisher->publish($event);
}
}
private function transitionToClosed(string $service, ?CircuitBreakerConfig $config = null): void
{
$config ??= $this->getDefaultConfig();
$previousState = $this->getState($service, $config);
$metrics = $this->getMetrics($service);
$now = Timestamp::fromClock($this->clock);
$this->resetMetrics($service);
$this->log('info', "Circuit breaker for '{$service}' is now CLOSED");
// Publish event
if ($this->eventPublisher !== null) {
$event = new CircuitBreakerClosed(
service : $service,
namespace : $this->cachePrefix,
previousState: $previousState,
metrics : $metrics,
reason : 'Sufficient successful attempts',
occurredAt : $now
);
$this->eventPublisher->publish($event);
}
}
private function incrementFailureCount(string $service, ?CircuitBreakerConfig $config = null): int
{
$config ??= $this->getDefaultConfig();
$key = $this->getFailureCountKey($service);
$cacheItem = $this->cache->get($key);
$count = ($cacheItem->isHit ? $cacheItem->value : 0) + 1;
$this->cache->set(CacheItem::forSet($key, $count, $config->metricsRetentionTime));
// Update last failure time
$now = Timestamp::fromClock($this->clock);
$this->cache->set(CacheItem::forSet($this->getLastFailureTimeKey($service), $now->toFloat(), $config->metricsRetentionTime));
return $count;
}
private function incrementSuccessCount(string $service, ?CircuitBreakerConfig $config = null): int
{
$config ??= $this->getDefaultConfig();
$key = $this->getSuccessCountKey($service);
$cacheItem = $this->cache->get($key);
$count = ($cacheItem->isHit ? $cacheItem->value : 0) + 1;
$this->cache->set(CacheItem::forSet($key, $count, $config->metricsRetentionTime));
return $count;
}
private function incrementHalfOpenAttempts(string $service, ?CircuitBreakerConfig $config = null): int
{
$config ??= $this->getDefaultConfig();
$key = $this->getHalfOpenAttemptsKey($service);
$cacheItem = $this->cache->get($key);
$count = ($cacheItem->isHit ? $cacheItem->value : 0) + 1;
$this->cache->set(CacheItem::forSet($key, $count, $config->metricsRetentionTime));
return $count;
}
private function getHalfOpenAttempts(string $service): int
{
$cacheItem = $this->cache->get($this->getHalfOpenAttemptsKey($service));
if (! $cacheItem->isHit) {
return 0;
}
// Ensure we return an integer, handle case where value is false or invalid
$value = $cacheItem->value;
return is_int($value) ? $value : 0;
}
private function isRecoveryTimeoutExpired(string $service, CircuitBreakerConfig $config): bool
{
$metrics = $this->getMetrics($service);
return $metrics->hasRecoveryTimeoutExpired($config->recoveryTimeout);
}
private function calculateRetryAfter(string $service, CircuitBreakerConfig $config): int
{
$metrics = $this->getMetrics($service);
return $metrics->getRetryAfterDuration($config->recoveryTimeout)->toCacheSeconds();
}
private function resetMetrics(string $service): void
{
$this->cache->forget($this->getStateKey($service));
$this->cache->forget($this->getFailureCountKey($service));
$this->cache->forget($this->getSuccessCountKey($service));
$this->cache->forget($this->getHalfOpenAttemptsKey($service));
}
private function getDefaultConfig(): CircuitBreakerConfig
{
static $defaultConfig = null;
if ($defaultConfig === null) {
$defaultConfig = new CircuitBreakerConfig(
failureThreshold: self::DEFAULT_FAILURE_THRESHOLD,
recoveryTimeout: Duration::fromSeconds(self::DEFAULT_RECOVERY_TIMEOUT),
halfOpenMaxAttempts: self::DEFAULT_HALF_OPEN_MAX_ATTEMPTS,
successThreshold: self::DEFAULT_SUCCESS_THRESHOLD
);
}
return $defaultConfig;
}
// Cache Keys
private function getStateKey(string $service): CacheKey
{
return CacheKey::fromString($this->cachePrefix . $service . ':state');
}
private function getFailureCountKey(string $service): CacheKey
{
return CacheKey::fromString($this->cachePrefix . $service . ':failures');
}
private function getSuccessCountKey(string $service): CacheKey
{
return CacheKey::fromString($this->cachePrefix . $service . ':successes');
}
private function getHalfOpenAttemptsKey(string $service): CacheKey
{
return CacheKey::fromString($this->cachePrefix . $service . ':half_open_attempts');
}
private function getLastFailureTimeKey(string $service): CacheKey
{
return CacheKey::fromString($this->cachePrefix . $service . ':last_failure');
}
private function getOpenedAtKey(string $service): CacheKey
{
return CacheKey::fromString($this->cachePrefix . $service . ':opened_at');
}
private function registerService(string $service): void
{
if ($this->serviceRegistry !== null) {
$this->serviceRegistry->registerService($service, $this->namespace);
}
}
private function log(string $level, string $message): void
{
if ($this->logger !== null) {
match($level) {
'debug' => $this->logger->debug($message),
'info' => $this->logger->info($message),
'warning' => $this->logger->warning($message),
'error' => $this->logger->error($message),
default => $this->logger->info($message),
};
}
}
}