Files
michaelschiemer/src/Framework/Database/AsyncDatabaseAdapter.php
Michael Schiemer 55a330b223 Enable Discovery debug logging for production troubleshooting
- Add DISCOVERY_LOG_LEVEL=debug
- Add DISCOVERY_SHOW_PROGRESS=true
- Temporary changes for debugging InitializerProcessor fixes on production
2025-08-11 20:13:26 +02:00

336 lines
10 KiB
PHP

<?php
declare(strict_types=1);
namespace App\Framework\Database;
use App\Framework\Async\AsyncPromise;
use App\Framework\Async\AsyncService;
use App\Framework\Core\ValueObjects\Duration;
/**
* Async Adapter für Database - Fluent API Style
*
* Provides async operations as property access: $db->async->queryMultiple()
*/
final readonly class AsyncDatabaseAdapter
{
public function __construct(
private ConnectionInterface $connection,
private AsyncService $asyncService
) {
}
/**
* Execute query asynchronously
*/
public function query(string $sql, array $parameters = []): AsyncPromise
{
return $this->asyncService->promise(
fn () => $this->connection->query($sql, $parameters)
);
}
/**
* Execute queryOne asynchronously
*/
public function queryOne(string $sql, array $parameters = []): AsyncPromise
{
return $this->asyncService->promise(
fn () => $this->connection->queryOne($sql, $parameters)
);
}
/**
* Execute queryScalar asynchronously
*/
public function queryScalar(string $sql, array $parameters = []): AsyncPromise
{
return $this->asyncService->promise(
fn () => $this->connection->queryScalar($sql, $parameters)
);
}
/**
* Execute queryColumn asynchronously
*/
public function queryColumn(string $sql, array $parameters = []): AsyncPromise
{
return $this->asyncService->promise(
fn () => $this->connection->queryColumn($sql, $parameters)
);
}
/**
* Execute statement asynchronously
*/
public function execute(string $sql, array $parameters = []): AsyncPromise
{
return $this->asyncService->promise(
fn () => $this->connection->execute($sql, $parameters)
);
}
/**
* Execute multiple queries in parallel
*/
public function queryMultiple(array $queries): array
{
$operations = [];
foreach ($queries as $key => $queryData) {
$sql = $queryData['sql'] ?? $queryData;
$params = $queryData['params'] ?? [];
$operations[$key] = fn () => $this->connection->query($sql, $params);
}
$results = $this->asyncService->parallel($operations);
return $results->await();
}
/**
* Parallel data aggregation from multiple tables
*/
public function aggregate(array $namedQueries): array
{
$operations = [];
foreach ($namedQueries as $name => $queryData) {
$operations[$name] = function () use ($queryData) {
try {
$sql = $queryData['sql'] ?? $queryData;
$params = $queryData['params'] ?? [];
$method = $queryData['method'] ?? 'query';
$result = match($method) {
'queryOne' => $this->connection->queryOne($sql, $params),
'queryColumn' => $this->connection->queryColumn($sql, $params),
'queryScalar' => $this->connection->queryScalar($sql, $params),
default => $this->connection->query($sql, $params),
};
return [
'success' => true,
'data' => $result,
'error' => null,
];
} catch (\Exception $e) {
return [
'success' => false,
'data' => null,
'error' => $e->getMessage(),
];
}
};
}
$results = $this->asyncService->parallel($operations);
return $results->await();
}
/**
* Batch insert operations
*/
public function batchInsert(string $table, array $columns, array $rows, int $batchSize = 100): array
{
if (empty($rows)) {
return [];
}
$batches = array_chunk($rows, $batchSize);
$placeholders = '(' . str_repeat('?,', count($columns) - 1) . '?)';
$operations = [];
foreach ($batches as $i => $batch) {
$operations["batch_$i"] = function () use ($table, $columns, $batch, $placeholders) {
$values = str_repeat($placeholders . ',', count($batch) - 1) . $placeholders;
$sql = "INSERT INTO `$table` (`" . implode('`, `', $columns) . "`) VALUES $values";
$params = [];
foreach ($batch as $row) {
$params = array_merge($params, array_values($row));
}
return $this->connection->execute($sql, $params);
};
}
$results = $this->asyncService->parallel($operations);
return $results->await();
}
/**
* Batch update operations
*/
public function batchUpdate(string $table, array $updates, string $keyColumn = 'id', int $batchSize = 50): array
{
if (empty($updates)) {
return [];
}
$batches = array_chunk($updates, $batchSize);
$operations = [];
foreach ($batches as $i => $batch) {
$operations["batch_$i"] = function () use ($table, $batch, $keyColumn) {
$affected = 0;
foreach ($batch as $update) {
$keyValue = $update[$keyColumn];
unset($update[$keyColumn]);
$setParts = [];
$params = [];
foreach ($update as $column => $value) {
$setParts[] = "`$column` = ?";
$params[] = $value;
}
$params[] = $keyValue;
$sql = "UPDATE `$table` SET " . implode(', ', $setParts) . " WHERE `$keyColumn` = ?";
$affected += $this->connection->execute($sql, $params);
}
return $affected;
};
}
$results = $this->asyncService->parallel($operations);
return $results->await();
}
/**
* Parallel read-ahead for related data
*/
public function readAhead(array $queries, ?Duration $timeout = null): array
{
$timeout ??= Duration::fromSeconds(10);
$operations = [];
foreach ($queries as $key => $queryData) {
$operations[$key] = function () use ($queryData, $timeout) {
return $this->asyncService->withTimeout(function () use ($queryData) {
$sql = $queryData['sql'] ?? $queryData;
$params = $queryData['params'] ?? [];
$method = $queryData['method'] ?? 'query';
return match($method) {
'queryOne' => $this->connection->queryOne($sql, $params),
'queryColumn' => $this->connection->queryColumn($sql, $params),
'queryScalar' => $this->connection->queryScalar($sql, $params),
default => $this->connection->query($sql, $params),
};
}, $timeout);
};
}
$results = $this->asyncService->parallel($operations);
return $results->await();
}
/**
* Parallel replica queries (if ReadWriteConnection is used)
*/
public function queryReplicas(string $sql, array $parameters = []): array
{
if (! ($this->connection instanceof ReadWriteConnection)) {
// Fallback to single query
return [$this->connection->query($sql, $parameters)];
}
$readConnections = $this->connection->getReadConnections();
if (count($readConnections) <= 1) {
return [$this->connection->query($sql, $parameters)];
}
$operations = [];
foreach ($readConnections as $i => $readConnection) {
$operations["replica_$i"] = fn () => $readConnection->query($sql, $parameters);
}
$results = $this->asyncService->parallel($operations);
return $results->await();
}
/**
* Transaction with timeout
*/
public function transactionWithTimeout(callable $callback, ?Duration $timeout = null): mixed
{
$timeout ??= Duration::fromSeconds(30);
return $this->asyncService->withTimeout(function () use ($callback) {
$this->connection->beginTransaction();
try {
$result = $callback($this->connection);
$this->connection->commit();
return $result;
} catch (\Exception $e) {
$this->connection->rollback();
throw $e;
}
}, $timeout);
}
/**
* Parallel table statistics collection
*/
public function getTableStats(array $tables): array
{
$operations = [];
foreach ($tables as $table) {
$operations[$table] = function () use ($table) {
try {
$stats = [];
// Row count
$stats['row_count'] = $this->connection->queryScalar(
"SELECT COUNT(*) FROM `$table`"
);
// Table size (MySQL specific)
$sizeResult = $this->connection->queryOne(
"SELECT
ROUND(((DATA_LENGTH + INDEX_LENGTH) / 1024 / 1024), 2) AS size_mb,
DATA_LENGTH,
INDEX_LENGTH
FROM information_schema.TABLES
WHERE table_schema = DATABASE() AND table_name = ?",
[$table]
);
$stats['size_mb'] = $sizeResult['size_mb'] ?? 0;
$stats['data_size'] = $sizeResult['DATA_LENGTH'] ?? 0;
$stats['index_size'] = $sizeResult['INDEX_LENGTH'] ?? 0;
return $stats;
} catch (\Exception $e) {
return ['error' => $e->getMessage()];
}
};
}
$results = $this->asyncService->parallel($operations);
return $results->await();
}
/**
* Get statistics
*/
public function getStats(): array
{
return [
'async_enabled' => true,
'async_stats' => $this->asyncService->getStats(),
];
}
}