- Add DISCOVERY_LOG_LEVEL=debug - Add DISCOVERY_SHOW_PROGRESS=true - Temporary changes for debugging InitializerProcessor fixes on production
160 lines
4.3 KiB
PHP
160 lines
4.3 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
namespace App\Framework\SlidingWindow;
|
|
|
|
use App\Framework\Cache\Cache;
|
|
use App\Framework\Cache\CacheKey;
|
|
use App\Framework\Core\ValueObjects\Duration;
|
|
use App\Framework\Core\ValueObjects\Timestamp;
|
|
use App\Framework\SlidingWindow\Aggregator\SlidingWindowAggregator;
|
|
|
|
/**
|
|
* Cache-based sliding window implementation for persistence
|
|
*/
|
|
final readonly class CacheBasedSlidingWindow implements SlidingWindow
|
|
{
|
|
private const string CACHE_PREFIX = 'sliding_window:';
|
|
|
|
public function __construct(
|
|
private Cache $cache,
|
|
private Duration $windowSize,
|
|
private string $identifier,
|
|
private SlidingWindowAggregator $aggregator,
|
|
private ?Duration $cacheTtl = null,
|
|
) {
|
|
$this->cacheTtl ??= Duration::fromHours(1);
|
|
}
|
|
|
|
public function record(mixed $value, Timestamp $timestamp): void
|
|
{
|
|
$entries = $this->getEntriesFromCache();
|
|
$entries[] = new WindowEntry($value, $timestamp);
|
|
|
|
$this->storeEntriesToCache($entries);
|
|
$this->cleanup();
|
|
}
|
|
|
|
public function getValues(?Timestamp $now = null): array
|
|
{
|
|
$now ??= Timestamp::now();
|
|
$this->cleanup($now);
|
|
|
|
return array_map(
|
|
fn (WindowEntry $entry) => $entry->value,
|
|
$this->getValidEntries($now)
|
|
);
|
|
}
|
|
|
|
public function getStats(?Timestamp $now = null): SlidingWindowStats
|
|
{
|
|
$now ??= Timestamp::now();
|
|
$this->cleanup($now);
|
|
|
|
$validEntries = $this->getValidEntries($now);
|
|
$windowStart = $now->diff($this->windowSize) > Duration::zero()
|
|
? Timestamp::fromFloat($now->toFloat() - $this->windowSize->toSeconds())
|
|
: $now;
|
|
|
|
$aggregatedData = [];
|
|
if (! empty($validEntries)) {
|
|
$result = $this->aggregator->aggregate(...$validEntries);
|
|
$aggregatedData = method_exists($result, 'toArray') ? $result->toArray() : [$this->aggregator->getIdentifier() => $result];
|
|
}
|
|
|
|
return new SlidingWindowStats(
|
|
totalCount: count($validEntries),
|
|
windowSize: $this->windowSize,
|
|
windowStart: $windowStart,
|
|
windowEnd: $now,
|
|
aggregatedData: $aggregatedData
|
|
);
|
|
}
|
|
|
|
public function clear(): void
|
|
{
|
|
$cacheKey = $this->getCacheKey();
|
|
$this->cache->forget($cacheKey);
|
|
}
|
|
|
|
public function getWindowSize(): Duration
|
|
{
|
|
return $this->windowSize;
|
|
}
|
|
|
|
public function getIdentifier(): string
|
|
{
|
|
return $this->identifier;
|
|
}
|
|
|
|
/**
|
|
* @return array<WindowEntry>
|
|
*/
|
|
private function getEntriesFromCache(): array
|
|
{
|
|
$cacheKey = $this->getCacheKey();
|
|
$cacheItem = $this->cache->get($cacheKey);
|
|
|
|
if (! $cacheItem->isHit) {
|
|
return [];
|
|
}
|
|
|
|
$serializedEntries = $cacheItem->value;
|
|
if (! is_array($serializedEntries)) {
|
|
return [];
|
|
}
|
|
|
|
return array_map(
|
|
fn (array $data) => new WindowEntry(
|
|
$data['value'],
|
|
Timestamp::fromFloat($data['timestamp'])
|
|
),
|
|
$serializedEntries
|
|
);
|
|
}
|
|
|
|
/**
|
|
* @param array<WindowEntry> $entries
|
|
*/
|
|
private function storeEntriesToCache(array $entries): void
|
|
{
|
|
$cacheKey = $this->getCacheKey();
|
|
$serializedEntries = array_map(
|
|
fn (WindowEntry $entry) => [
|
|
'value' => $entry->value,
|
|
'timestamp' => $entry->timestamp->toFloat(),
|
|
],
|
|
$entries
|
|
);
|
|
|
|
$this->cache->set($cacheKey, $serializedEntries, $this->cacheTtl);
|
|
}
|
|
|
|
/**
|
|
* @return array<WindowEntry>
|
|
*/
|
|
private function getValidEntries(Timestamp $now): array
|
|
{
|
|
$entries = $this->getEntriesFromCache();
|
|
$cutoffTime = Timestamp::fromFloat($now->toFloat() - $this->windowSize->toSeconds());
|
|
|
|
return array_filter(
|
|
$entries,
|
|
fn (WindowEntry $entry) => $entry->isWithinWindow($cutoffTime)
|
|
);
|
|
}
|
|
|
|
private function cleanup(?Timestamp $now = null): void
|
|
{
|
|
$now ??= Timestamp::now();
|
|
$validEntries = $this->getValidEntries($now);
|
|
$this->storeEntriesToCache($validEntries);
|
|
}
|
|
|
|
private function getCacheKey(): CacheKey
|
|
{
|
|
return CacheKey::fromString(self::CACHE_PREFIX . $this->identifier);
|
|
}
|
|
}
|