Files

116 lines
3.0 KiB
PHP

<?php
declare(strict_types=1);
require_once __DIR__ . '/../../../modules/shared/db.php';
require_once __DIR__ . '/../../../modules/shared/auth/service.php';
require_once __DIR__ . '/../../../modules/erp/bestellungen/service.php';
header('Content-Type: text/event-stream; charset=utf-8');
header('Cache-Control: no-cache, no-transform');
header('Connection: keep-alive');
header('X-Accel-Buffering: no');
ignore_user_abort(true);
set_time_limit(0);
if (function_exists('apache_setenv')) {
@apache_setenv('no-gzip', '1');
}
@ini_set('zlib.output_compression', '0');
$env = expand_env_values(parse_env_file(__DIR__ . '/../../../.env'));
try {
$pdo = connect_database($env);
} catch (Throwable $e) {
http_response_code(500);
echo 'Database connection failed';
exit;
}
auth_bootstrap_session();
$currentUser = auth_current_user($pdo);
if ($currentUser === null) {
http_response_code(401);
echo 'Unauthorized';
exit;
}
session_write_close();
while (ob_get_level() > 0) {
ob_end_flush();
}
$sendEvent = static function (string $eventName, array $payload): void {
echo 'event: ' . $eventName . "\n";
echo 'data: ' . json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) . "\n\n";
flush();
};
$lastHeartbeatAt = time();
$state = [];
foreach (get_sales_order_realtime_snapshot($pdo) as $row) {
$state[(int) $row['id']] = (string) $row['updated_at'];
}
echo ": connected\n\n";
flush();
while (!connection_aborted()) {
try {
$snapshot = get_sales_order_realtime_snapshot($pdo);
$nextState = [];
foreach ($snapshot as $row) {
$orderId = (int) ($row['id'] ?? 0);
if ($orderId <= 0) {
continue;
}
$updatedAt = (string) ($row['updated_at'] ?? '');
$nextState[$orderId] = $updatedAt;
if (!array_key_exists($orderId, $state)) {
$sendEvent('bestellungen.changed', [
'kind' => 'created',
'orderId' => $orderId,
'updatedAt' => $updatedAt,
]);
continue;
}
if ($state[$orderId] !== $updatedAt) {
$sendEvent('bestellungen.changed', [
'kind' => 'updated',
'orderId' => $orderId,
'updatedAt' => $updatedAt,
]);
}
}
$state = $nextState;
if ((time() - $lastHeartbeatAt) >= 15) {
echo ": ping\n\n";
flush();
$lastHeartbeatAt = time();
}
usleep(2000000);
} catch (Throwable $e) {
echo ": error\n\n";
flush();
try {
$pdo = connect_database($env);
$state = [];
foreach (get_sales_order_realtime_snapshot($pdo) as $row) {
$state[(int) $row['id']] = (string) $row['updated_at'];
}
} catch (Throwable $reconnectError) {
// Keep the connection open and try again on the next loop.
}
usleep(5000000);
}
}