0) { ob_end_flush(); } $sendEvent = static function (string $eventName, array $payload): void { echo 'event: ' . $eventName . "\n"; echo 'data: ' . json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) . "\n\n"; flush(); }; $lastHeartbeatAt = time(); $state = []; foreach (get_sales_order_realtime_snapshot($pdo) as $row) { $state[(int) $row['id']] = (string) $row['updated_at']; } echo ": connected\n\n"; flush(); while (!connection_aborted()) { try { $snapshot = get_sales_order_realtime_snapshot($pdo); $nextState = []; foreach ($snapshot as $row) { $orderId = (int) ($row['id'] ?? 0); if ($orderId <= 0) { continue; } $updatedAt = (string) ($row['updated_at'] ?? ''); $nextState[$orderId] = $updatedAt; if (!array_key_exists($orderId, $state)) { $sendEvent('bestellungen.changed', [ 'kind' => 'created', 'orderId' => $orderId, 'updatedAt' => $updatedAt, ]); continue; } if ($state[$orderId] !== $updatedAt) { $sendEvent('bestellungen.changed', [ 'kind' => 'updated', 'orderId' => $orderId, 'updatedAt' => $updatedAt, ]); } } $state = $nextState; if ((time() - $lastHeartbeatAt) >= 15) { echo ": ping\n\n"; flush(); $lastHeartbeatAt = time(); } usleep(2000000); } catch (Throwable $e) { echo ": error\n\n"; flush(); try { $pdo = connect_database($env); $state = []; foreach (get_sales_order_realtime_snapshot($pdo) as $row) { $state[(int) $row['id']] = (string) $row['updated_at']; } } catch (Throwable $reconnectError) { // Keep the connection open and try again on the next loop. } usleep(5000000); } }