Zentrale Excel-Webhook-Zustellung
This commit is contained in:
@@ -516,7 +516,11 @@ DECLARE
|
|||||||
v_event_key TEXT;
|
v_event_key TEXT;
|
||||||
BEGIN
|
BEGIN
|
||||||
IF TG_OP = 'INSERT' THEN
|
IF TG_OP = 'INSERT' THEN
|
||||||
v_event_key := format('order.imported:%s', NEW.external_ref);
|
v_event_key := format(
|
||||||
|
'order.imported:%s:%s',
|
||||||
|
NEW.external_ref,
|
||||||
|
to_char(NEW.imported_at, 'YYYYMMDDHH24MISSUS')
|
||||||
|
);
|
||||||
PERFORM fn_enqueue_event(
|
PERFORM fn_enqueue_event(
|
||||||
'order.imported',
|
'order.imported',
|
||||||
v_event_key,
|
v_event_key,
|
||||||
@@ -531,6 +535,27 @@ BEGIN
|
|||||||
)
|
)
|
||||||
);
|
);
|
||||||
ELSIF TG_OP = 'UPDATE' THEN
|
ELSIF TG_OP = 'UPDATE' THEN
|
||||||
|
IF OLD.imported_at IS DISTINCT FROM NEW.imported_at THEN
|
||||||
|
v_event_key := format(
|
||||||
|
'order.imported:%s:%s',
|
||||||
|
NEW.external_ref,
|
||||||
|
to_char(NEW.imported_at, 'YYYYMMDDHH24MISSUS')
|
||||||
|
);
|
||||||
|
PERFORM fn_enqueue_event(
|
||||||
|
'order.imported',
|
||||||
|
v_event_key,
|
||||||
|
'sales_order',
|
||||||
|
NEW.id::TEXT,
|
||||||
|
jsonb_build_object(
|
||||||
|
'orderId', NEW.id,
|
||||||
|
'externalRef', NEW.external_ref,
|
||||||
|
'orderStatus', NEW.order_status,
|
||||||
|
'paymentStatus', NEW.payment_status,
|
||||||
|
'occurredAt', NOW()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
END IF;
|
||||||
|
|
||||||
IF OLD.order_status IS DISTINCT FROM NEW.order_status AND NEW.order_status = 'cancelled' THEN
|
IF OLD.order_status IS DISTINCT FROM NEW.order_status AND NEW.order_status = 'cancelled' THEN
|
||||||
v_event_key := format('order.cancelled.full:%s:%s', NEW.external_ref, COALESCE(NEW.cancelled_at, NOW()));
|
v_event_key := format('order.cancelled.full:%s:%s', NEW.external_ref, COALESCE(NEW.cancelled_at, NOW()));
|
||||||
PERFORM fn_enqueue_event(
|
PERFORM fn_enqueue_event(
|
||||||
|
|||||||
@@ -90,7 +90,11 @@ DECLARE
|
|||||||
v_event_key TEXT;
|
v_event_key TEXT;
|
||||||
BEGIN
|
BEGIN
|
||||||
IF TG_OP = 'INSERT' THEN
|
IF TG_OP = 'INSERT' THEN
|
||||||
v_event_key := format('order.imported:%s', NEW.external_ref);
|
v_event_key := format(
|
||||||
|
'order.imported:%s:%s',
|
||||||
|
NEW.external_ref,
|
||||||
|
to_char(NEW.imported_at, 'YYYYMMDDHH24MISSUS')
|
||||||
|
);
|
||||||
PERFORM fn_enqueue_event(
|
PERFORM fn_enqueue_event(
|
||||||
'order.imported',
|
'order.imported',
|
||||||
v_event_key,
|
v_event_key,
|
||||||
@@ -106,6 +110,28 @@ BEGIN
|
|||||||
)
|
)
|
||||||
);
|
);
|
||||||
ELSIF TG_OP = 'UPDATE' THEN
|
ELSIF TG_OP = 'UPDATE' THEN
|
||||||
|
IF OLD.imported_at IS DISTINCT FROM NEW.imported_at THEN
|
||||||
|
v_event_key := format(
|
||||||
|
'order.imported:%s:%s',
|
||||||
|
NEW.external_ref,
|
||||||
|
to_char(NEW.imported_at, 'YYYYMMDDHH24MISSUS')
|
||||||
|
);
|
||||||
|
PERFORM fn_enqueue_event(
|
||||||
|
'order.imported',
|
||||||
|
v_event_key,
|
||||||
|
'sales_order',
|
||||||
|
NEW.id::TEXT,
|
||||||
|
jsonb_build_object(
|
||||||
|
'orderId', NEW.id,
|
||||||
|
'externalRef', NEW.external_ref,
|
||||||
|
'orderSource', NEW.order_source,
|
||||||
|
'orderStatus', NEW.order_status,
|
||||||
|
'paymentStatus', NEW.payment_status,
|
||||||
|
'occurredAt', NOW()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
END IF;
|
||||||
|
|
||||||
IF OLD.order_status IS DISTINCT FROM NEW.order_status AND NEW.order_status = 'cancelled' THEN
|
IF OLD.order_status IS DISTINCT FROM NEW.order_status AND NEW.order_status = 'cancelled' THEN
|
||||||
v_event_key := format('order.cancelled.full:%s:%s', NEW.external_ref, COALESCE(NEW.cancelled_at, NOW()));
|
v_event_key := format('order.cancelled.full:%s:%s', NEW.external_ref, COALESCE(NEW.cancelled_at, NOW()));
|
||||||
PERFORM fn_enqueue_event(
|
PERFORM fn_enqueue_event(
|
||||||
|
|||||||
@@ -166,7 +166,7 @@ try {
|
|||||||
|
|
||||||
$pdo->commit();
|
$pdo->commit();
|
||||||
|
|
||||||
$excelTrigger = trigger_excel_webhook($externalRef, $env);
|
$excelTrigger = dispatch_order_import_webhooks($pdo, $env);
|
||||||
|
|
||||||
json_response(201, [
|
json_response(201, [
|
||||||
'ok' => true,
|
'ok' => true,
|
||||||
|
|||||||
@@ -165,7 +165,7 @@ try {
|
|||||||
$pdo->commit();
|
$pdo->commit();
|
||||||
|
|
||||||
$labelTrigger = trigger_shipping_label_flow($data, $env);
|
$labelTrigger = trigger_shipping_label_flow($data, $env);
|
||||||
$excelTrigger = trigger_excel_webhook($externalRef, $env);
|
$excelTrigger = dispatch_order_import_webhooks($pdo, $env);
|
||||||
|
|
||||||
json_response(200, [
|
json_response(200, [
|
||||||
'ok' => true,
|
'ok' => true,
|
||||||
|
|||||||
@@ -96,6 +96,20 @@ function post_json(string $url, array $payload, array $headers = [], int $timeou
|
|||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function build_n8n_webhook_headers(array $localEnv): array
|
||||||
|
{
|
||||||
|
$headers = [];
|
||||||
|
$secret = env_value('N8N_WEBHOOK_SECRET', $localEnv);
|
||||||
|
if ($secret !== '') {
|
||||||
|
$headers['X-Webhook-Secret'] = $secret;
|
||||||
|
$headers['X-N8N-Secret'] = $secret;
|
||||||
|
$headers['X-API-Key'] = $secret;
|
||||||
|
$headers['Authorization'] = 'Bearer ' . $secret;
|
||||||
|
}
|
||||||
|
|
||||||
|
return $headers;
|
||||||
|
}
|
||||||
|
|
||||||
function trigger_shipping_label_flow(array $order, array $localEnv): array
|
function trigger_shipping_label_flow(array $order, array $localEnv): array
|
||||||
{
|
{
|
||||||
$url = derive_label_webhook_url($localEnv);
|
$url = derive_label_webhook_url($localEnv);
|
||||||
@@ -107,15 +121,7 @@ function trigger_shipping_label_flow(array $order, array $localEnv): array
|
|||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
$headers = [];
|
$headers = build_n8n_webhook_headers($localEnv);
|
||||||
$secret = env_value('N8N_WEBHOOK_SECRET', $localEnv);
|
|
||||||
if ($secret !== '') {
|
|
||||||
$headers['X-Webhook-Secret'] = $secret;
|
|
||||||
$headers['X-N8N-Secret'] = $secret;
|
|
||||||
$headers['X-API-Key'] = $secret;
|
|
||||||
$headers['Authorization'] = 'Bearer ' . $secret;
|
|
||||||
}
|
|
||||||
|
|
||||||
throttle_webhook_channel('label', 10);
|
throttle_webhook_channel('label', 10);
|
||||||
$result = post_json($url, $order, $headers, 20);
|
$result = post_json($url, $order, $headers, 20);
|
||||||
|
|
||||||
@@ -140,15 +146,7 @@ function trigger_excel_webhook(string $externalRef, array $localEnv): array
|
|||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
$headers = [];
|
$headers = build_n8n_webhook_headers($localEnv);
|
||||||
$secret = env_value('N8N_WEBHOOK_SECRET', $localEnv);
|
|
||||||
if ($secret !== '') {
|
|
||||||
$headers['X-Webhook-Secret'] = $secret;
|
|
||||||
$headers['X-N8N-Secret'] = $secret;
|
|
||||||
$headers['X-API-Key'] = $secret;
|
|
||||||
$headers['Authorization'] = 'Bearer ' . $secret;
|
|
||||||
}
|
|
||||||
|
|
||||||
throttle_webhook_channel('excel', 10);
|
throttle_webhook_channel('excel', 10);
|
||||||
$result = post_json($url, ['Bestellnummer' => $externalRef], $headers, 20);
|
$result = post_json($url, ['Bestellnummer' => $externalRef], $headers, 20);
|
||||||
|
|
||||||
@@ -161,3 +159,169 @@ function trigger_excel_webhook(string $externalRef, array $localEnv): array
|
|||||||
'responseBody' => $result['body'],
|
'responseBody' => $result['body'],
|
||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function dispatch_order_import_webhooks(PDO $pdo, array $localEnv, int $limit = 20): array
|
||||||
|
{
|
||||||
|
$url = derive_excel_webhook_url($localEnv);
|
||||||
|
if ($url === '') {
|
||||||
|
return [
|
||||||
|
'enabled' => false,
|
||||||
|
'ok' => false,
|
||||||
|
'processed' => 0,
|
||||||
|
'sent' => 0,
|
||||||
|
'failed' => 0,
|
||||||
|
'deadLetter' => 0,
|
||||||
|
'message' => 'Excel webhook URL not configured',
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
$headers = build_n8n_webhook_headers($localEnv);
|
||||||
|
$limit = max(1, min(100, $limit));
|
||||||
|
|
||||||
|
$summary = [
|
||||||
|
'enabled' => true,
|
||||||
|
'ok' => false,
|
||||||
|
'processed' => 0,
|
||||||
|
'sent' => 0,
|
||||||
|
'failed' => 0,
|
||||||
|
'deadLetter' => 0,
|
||||||
|
'status' => 0,
|
||||||
|
'url' => $url,
|
||||||
|
'message' => 'No outbound order.imported event queued',
|
||||||
|
'responseBody' => '',
|
||||||
|
];
|
||||||
|
|
||||||
|
try {
|
||||||
|
for ($i = 0; $i < $limit; $i++) {
|
||||||
|
$pdo->beginTransaction();
|
||||||
|
|
||||||
|
$stmt = $pdo->query(
|
||||||
|
"SELECT id, payload, attempt_count
|
||||||
|
FROM outbound_webhook_event
|
||||||
|
WHERE event_type = 'order.imported'
|
||||||
|
AND status IN ('pending', 'failed')
|
||||||
|
AND next_attempt_at <= NOW()
|
||||||
|
ORDER BY created_at ASC, id ASC
|
||||||
|
LIMIT 1
|
||||||
|
FOR UPDATE SKIP LOCKED"
|
||||||
|
);
|
||||||
|
$event = $stmt !== false ? $stmt->fetch(PDO::FETCH_ASSOC) : false;
|
||||||
|
if (!is_array($event)) {
|
||||||
|
$pdo->commit();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
$eventId = (int) $event['id'];
|
||||||
|
$attemptCount = max(0, (int) $event['attempt_count']) + 1;
|
||||||
|
$payloadRaw = $event['payload'];
|
||||||
|
$payload = [];
|
||||||
|
if (is_string($payloadRaw) && $payloadRaw !== '') {
|
||||||
|
try {
|
||||||
|
$payload = json_decode($payloadRaw, true, 512, JSON_THROW_ON_ERROR);
|
||||||
|
} catch (JsonException) {
|
||||||
|
$payload = [];
|
||||||
|
}
|
||||||
|
} elseif (is_array($payloadRaw)) {
|
||||||
|
$payload = $payloadRaw;
|
||||||
|
}
|
||||||
|
|
||||||
|
$externalRef = trim((string) ($payload['externalRef'] ?? ''));
|
||||||
|
if ($externalRef === '') {
|
||||||
|
$update = $pdo->prepare(
|
||||||
|
"UPDATE outbound_webhook_event
|
||||||
|
SET status = 'dead_letter',
|
||||||
|
last_attempt_at = NOW(),
|
||||||
|
last_error = :last_error,
|
||||||
|
next_attempt_at = NOW(),
|
||||||
|
attempt_count = :attempt_count
|
||||||
|
WHERE id = :id"
|
||||||
|
);
|
||||||
|
$update->execute([
|
||||||
|
':last_error' => 'Missing externalRef in outbound payload',
|
||||||
|
':attempt_count' => $attemptCount,
|
||||||
|
':id' => $eventId,
|
||||||
|
]);
|
||||||
|
|
||||||
|
$pdo->commit();
|
||||||
|
$summary['processed']++;
|
||||||
|
$summary['failed']++;
|
||||||
|
$summary['deadLetter']++;
|
||||||
|
$summary['ok'] = false;
|
||||||
|
$summary['message'] = 'Outbound payload missing externalRef';
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
$result = post_json($url, ['Bestellnummer' => $externalRef], $headers, 20);
|
||||||
|
|
||||||
|
$summary['processed']++;
|
||||||
|
$summary['status'] = $result['status'];
|
||||||
|
$summary['responseBody'] = $result['body'];
|
||||||
|
|
||||||
|
if ($result['ok']) {
|
||||||
|
$update = $pdo->prepare(
|
||||||
|
"UPDATE outbound_webhook_event
|
||||||
|
SET status = 'sent',
|
||||||
|
last_attempt_at = NOW(),
|
||||||
|
last_error = NULL,
|
||||||
|
sent_at = NOW(),
|
||||||
|
next_attempt_at = NOW(),
|
||||||
|
attempt_count = :attempt_count
|
||||||
|
WHERE id = :id"
|
||||||
|
);
|
||||||
|
$update->execute([
|
||||||
|
':attempt_count' => $attemptCount,
|
||||||
|
':id' => $eventId,
|
||||||
|
]);
|
||||||
|
|
||||||
|
$pdo->commit();
|
||||||
|
$summary['sent']++;
|
||||||
|
if ($summary['failed'] === 0) {
|
||||||
|
$summary['ok'] = true;
|
||||||
|
}
|
||||||
|
$summary['message'] = 'Excel webhook triggered';
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
$backoffSeconds = min(3600, 60 * (2 ** max(0, $attemptCount - 1)));
|
||||||
|
$status = $attemptCount >= 5 ? 'dead_letter' : 'failed';
|
||||||
|
$nextAttemptAt = (new DateTimeImmutable('now'))
|
||||||
|
->modify('+' . $backoffSeconds . ' seconds')
|
||||||
|
->format('Y-m-d H:i:s');
|
||||||
|
$lastError = $result['error'] !== '' ? $result['error'] : ('HTTP ' . $result['status']);
|
||||||
|
$update = $pdo->prepare(
|
||||||
|
"UPDATE outbound_webhook_event
|
||||||
|
SET status = :status,
|
||||||
|
last_attempt_at = NOW(),
|
||||||
|
last_error = :last_error,
|
||||||
|
next_attempt_at = :next_attempt_at,
|
||||||
|
attempt_count = :attempt_count
|
||||||
|
WHERE id = :id"
|
||||||
|
);
|
||||||
|
$update->execute([
|
||||||
|
':status' => $status,
|
||||||
|
':last_error' => $lastError,
|
||||||
|
':next_attempt_at' => $nextAttemptAt,
|
||||||
|
':attempt_count' => $attemptCount,
|
||||||
|
':id' => $eventId,
|
||||||
|
]);
|
||||||
|
|
||||||
|
$pdo->commit();
|
||||||
|
$summary['failed']++;
|
||||||
|
$summary['ok'] = false;
|
||||||
|
if ($status === 'dead_letter') {
|
||||||
|
$summary['deadLetter']++;
|
||||||
|
}
|
||||||
|
$summary['message'] = $lastError;
|
||||||
|
}
|
||||||
|
|
||||||
|
return $summary;
|
||||||
|
} catch (Throwable $e) {
|
||||||
|
if ($pdo->inTransaction()) {
|
||||||
|
$pdo->rollBack();
|
||||||
|
}
|
||||||
|
|
||||||
|
$summary['ok'] = false;
|
||||||
|
$summary['message'] = $e->getMessage();
|
||||||
|
return $summary;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user