From 57dfae624d51b100f5eb4f250854c047b70761b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mathias=20Gla=CC=88ser?= Date: Tue, 16 Jun 2026 15:16:02 +0200 Subject: [PATCH] Zentrale Excel-Webhook-Zustellung --- db/migrations/0001_phase1_core.sql | 27 ++- db/migrations/0004_phase1_direct_sales.sql | 28 ++- modules/erp/direktverkauf/api/otc-order.php | 2 +- .../erp/import-integration/order-import.php | 2 +- modules/erp/import-integration/service.php | 200 ++++++++++++++++-- 5 files changed, 237 insertions(+), 22 deletions(-) diff --git a/db/migrations/0001_phase1_core.sql b/db/migrations/0001_phase1_core.sql index 0f17af4..0165b3b 100644 --- a/db/migrations/0001_phase1_core.sql +++ b/db/migrations/0001_phase1_core.sql @@ -516,7 +516,11 @@ DECLARE v_event_key TEXT; BEGIN IF TG_OP = 'INSERT' THEN - v_event_key := format('order.imported:%s', NEW.external_ref); + v_event_key := format( + 'order.imported:%s:%s', + NEW.external_ref, + to_char(NEW.imported_at, 'YYYYMMDDHH24MISSUS') + ); PERFORM fn_enqueue_event( 'order.imported', v_event_key, @@ -531,6 +535,27 @@ BEGIN ) ); ELSIF TG_OP = 'UPDATE' THEN + IF OLD.imported_at IS DISTINCT FROM NEW.imported_at THEN + v_event_key := format( + 'order.imported:%s:%s', + NEW.external_ref, + to_char(NEW.imported_at, 'YYYYMMDDHH24MISSUS') + ); + PERFORM fn_enqueue_event( + 'order.imported', + v_event_key, + 'sales_order', + NEW.id::TEXT, + jsonb_build_object( + 'orderId', NEW.id, + 'externalRef', NEW.external_ref, + 'orderStatus', NEW.order_status, + 'paymentStatus', NEW.payment_status, + 'occurredAt', NOW() + ) + ); + END IF; + IF OLD.order_status IS DISTINCT FROM NEW.order_status AND NEW.order_status = 'cancelled' THEN v_event_key := format('order.cancelled.full:%s:%s', NEW.external_ref, COALESCE(NEW.cancelled_at, NOW())); PERFORM fn_enqueue_event( diff --git a/db/migrations/0004_phase1_direct_sales.sql b/db/migrations/0004_phase1_direct_sales.sql index d7cecc8..b4dd96c 100644 --- a/db/migrations/0004_phase1_direct_sales.sql +++ b/db/migrations/0004_phase1_direct_sales.sql @@ -90,7 +90,11 @@ DECLARE v_event_key TEXT; BEGIN IF TG_OP = 'INSERT' THEN - v_event_key := format('order.imported:%s', NEW.external_ref); + v_event_key := format( + 'order.imported:%s:%s', + NEW.external_ref, + to_char(NEW.imported_at, 'YYYYMMDDHH24MISSUS') + ); PERFORM fn_enqueue_event( 'order.imported', v_event_key, @@ -106,6 +110,28 @@ BEGIN ) ); ELSIF TG_OP = 'UPDATE' THEN + IF OLD.imported_at IS DISTINCT FROM NEW.imported_at THEN + v_event_key := format( + 'order.imported:%s:%s', + NEW.external_ref, + to_char(NEW.imported_at, 'YYYYMMDDHH24MISSUS') + ); + PERFORM fn_enqueue_event( + 'order.imported', + v_event_key, + 'sales_order', + NEW.id::TEXT, + jsonb_build_object( + 'orderId', NEW.id, + 'externalRef', NEW.external_ref, + 'orderSource', NEW.order_source, + 'orderStatus', NEW.order_status, + 'paymentStatus', NEW.payment_status, + 'occurredAt', NOW() + ) + ); + END IF; + IF OLD.order_status IS DISTINCT FROM NEW.order_status AND NEW.order_status = 'cancelled' THEN v_event_key := format('order.cancelled.full:%s:%s', NEW.external_ref, COALESCE(NEW.cancelled_at, NOW())); PERFORM fn_enqueue_event( diff --git a/modules/erp/direktverkauf/api/otc-order.php b/modules/erp/direktverkauf/api/otc-order.php index 3276ebe..2ab425f 100644 --- a/modules/erp/direktverkauf/api/otc-order.php +++ b/modules/erp/direktverkauf/api/otc-order.php @@ -166,7 +166,7 @@ try { $pdo->commit(); - $excelTrigger = trigger_excel_webhook($externalRef, $env); + $excelTrigger = dispatch_order_import_webhooks($pdo, $env); json_response(201, [ 'ok' => true, diff --git a/modules/erp/import-integration/order-import.php b/modules/erp/import-integration/order-import.php index 8ea110d..ccefb97 100644 --- a/modules/erp/import-integration/order-import.php +++ b/modules/erp/import-integration/order-import.php @@ -165,7 +165,7 @@ try { $pdo->commit(); $labelTrigger = trigger_shipping_label_flow($data, $env); - $excelTrigger = trigger_excel_webhook($externalRef, $env); + $excelTrigger = dispatch_order_import_webhooks($pdo, $env); json_response(200, [ 'ok' => true, diff --git a/modules/erp/import-integration/service.php b/modules/erp/import-integration/service.php index 039c103..445d1e1 100644 --- a/modules/erp/import-integration/service.php +++ b/modules/erp/import-integration/service.php @@ -96,6 +96,20 @@ function post_json(string $url, array $payload, array $headers = [], int $timeou ]; } +function build_n8n_webhook_headers(array $localEnv): array +{ + $headers = []; + $secret = env_value('N8N_WEBHOOK_SECRET', $localEnv); + if ($secret !== '') { + $headers['X-Webhook-Secret'] = $secret; + $headers['X-N8N-Secret'] = $secret; + $headers['X-API-Key'] = $secret; + $headers['Authorization'] = 'Bearer ' . $secret; + } + + return $headers; +} + function trigger_shipping_label_flow(array $order, array $localEnv): array { $url = derive_label_webhook_url($localEnv); @@ -107,15 +121,7 @@ function trigger_shipping_label_flow(array $order, array $localEnv): array ]; } - $headers = []; - $secret = env_value('N8N_WEBHOOK_SECRET', $localEnv); - if ($secret !== '') { - $headers['X-Webhook-Secret'] = $secret; - $headers['X-N8N-Secret'] = $secret; - $headers['X-API-Key'] = $secret; - $headers['Authorization'] = 'Bearer ' . $secret; - } - + $headers = build_n8n_webhook_headers($localEnv); throttle_webhook_channel('label', 10); $result = post_json($url, $order, $headers, 20); @@ -140,15 +146,7 @@ function trigger_excel_webhook(string $externalRef, array $localEnv): array ]; } - $headers = []; - $secret = env_value('N8N_WEBHOOK_SECRET', $localEnv); - if ($secret !== '') { - $headers['X-Webhook-Secret'] = $secret; - $headers['X-N8N-Secret'] = $secret; - $headers['X-API-Key'] = $secret; - $headers['Authorization'] = 'Bearer ' . $secret; - } - + $headers = build_n8n_webhook_headers($localEnv); throttle_webhook_channel('excel', 10); $result = post_json($url, ['Bestellnummer' => $externalRef], $headers, 20); @@ -161,3 +159,169 @@ function trigger_excel_webhook(string $externalRef, array $localEnv): array 'responseBody' => $result['body'], ]; } + +function dispatch_order_import_webhooks(PDO $pdo, array $localEnv, int $limit = 20): array +{ + $url = derive_excel_webhook_url($localEnv); + if ($url === '') { + return [ + 'enabled' => false, + 'ok' => false, + 'processed' => 0, + 'sent' => 0, + 'failed' => 0, + 'deadLetter' => 0, + 'message' => 'Excel webhook URL not configured', + ]; + } + + $headers = build_n8n_webhook_headers($localEnv); + $limit = max(1, min(100, $limit)); + + $summary = [ + 'enabled' => true, + 'ok' => false, + 'processed' => 0, + 'sent' => 0, + 'failed' => 0, + 'deadLetter' => 0, + 'status' => 0, + 'url' => $url, + 'message' => 'No outbound order.imported event queued', + 'responseBody' => '', + ]; + + try { + for ($i = 0; $i < $limit; $i++) { + $pdo->beginTransaction(); + + $stmt = $pdo->query( + "SELECT id, payload, attempt_count + FROM outbound_webhook_event + WHERE event_type = 'order.imported' + AND status IN ('pending', 'failed') + AND next_attempt_at <= NOW() + ORDER BY created_at ASC, id ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED" + ); + $event = $stmt !== false ? $stmt->fetch(PDO::FETCH_ASSOC) : false; + if (!is_array($event)) { + $pdo->commit(); + break; + } + + $eventId = (int) $event['id']; + $attemptCount = max(0, (int) $event['attempt_count']) + 1; + $payloadRaw = $event['payload']; + $payload = []; + if (is_string($payloadRaw) && $payloadRaw !== '') { + try { + $payload = json_decode($payloadRaw, true, 512, JSON_THROW_ON_ERROR); + } catch (JsonException) { + $payload = []; + } + } elseif (is_array($payloadRaw)) { + $payload = $payloadRaw; + } + + $externalRef = trim((string) ($payload['externalRef'] ?? '')); + if ($externalRef === '') { + $update = $pdo->prepare( + "UPDATE outbound_webhook_event + SET status = 'dead_letter', + last_attempt_at = NOW(), + last_error = :last_error, + next_attempt_at = NOW(), + attempt_count = :attempt_count + WHERE id = :id" + ); + $update->execute([ + ':last_error' => 'Missing externalRef in outbound payload', + ':attempt_count' => $attemptCount, + ':id' => $eventId, + ]); + + $pdo->commit(); + $summary['processed']++; + $summary['failed']++; + $summary['deadLetter']++; + $summary['ok'] = false; + $summary['message'] = 'Outbound payload missing externalRef'; + continue; + } + + $result = post_json($url, ['Bestellnummer' => $externalRef], $headers, 20); + + $summary['processed']++; + $summary['status'] = $result['status']; + $summary['responseBody'] = $result['body']; + + if ($result['ok']) { + $update = $pdo->prepare( + "UPDATE outbound_webhook_event + SET status = 'sent', + last_attempt_at = NOW(), + last_error = NULL, + sent_at = NOW(), + next_attempt_at = NOW(), + attempt_count = :attempt_count + WHERE id = :id" + ); + $update->execute([ + ':attempt_count' => $attemptCount, + ':id' => $eventId, + ]); + + $pdo->commit(); + $summary['sent']++; + if ($summary['failed'] === 0) { + $summary['ok'] = true; + } + $summary['message'] = 'Excel webhook triggered'; + continue; + } + + $backoffSeconds = min(3600, 60 * (2 ** max(0, $attemptCount - 1))); + $status = $attemptCount >= 5 ? 'dead_letter' : 'failed'; + $nextAttemptAt = (new DateTimeImmutable('now')) + ->modify('+' . $backoffSeconds . ' seconds') + ->format('Y-m-d H:i:s'); + $lastError = $result['error'] !== '' ? $result['error'] : ('HTTP ' . $result['status']); + $update = $pdo->prepare( + "UPDATE outbound_webhook_event + SET status = :status, + last_attempt_at = NOW(), + last_error = :last_error, + next_attempt_at = :next_attempt_at, + attempt_count = :attempt_count + WHERE id = :id" + ); + $update->execute([ + ':status' => $status, + ':last_error' => $lastError, + ':next_attempt_at' => $nextAttemptAt, + ':attempt_count' => $attemptCount, + ':id' => $eventId, + ]); + + $pdo->commit(); + $summary['failed']++; + $summary['ok'] = false; + if ($status === 'dead_letter') { + $summary['deadLetter']++; + } + $summary['message'] = $lastError; + } + + return $summary; + } catch (Throwable $e) { + if ($pdo->inTransaction()) { + $pdo->rollBack(); + } + + $summary['ok'] = false; + $summary['message'] = $e->getMessage(); + return $summary; + } +}