diff --git a/modules/events/karrio/server/events/task_definitions/base/__init__.py b/modules/events/karrio/server/events/task_definitions/base/__init__.py index 993dfc127..68312ffa8 100644 --- a/modules/events/karrio/server/events/task_definitions/base/__init__.py +++ b/modules/events/karrio/server/events/task_definitions/base/__init__.py @@ -49,15 +49,22 @@ def background_trackers_update(): from karrio.server.events.task_definitions.base import tracking - try: - with huey_instance.lock_task("background_trackers_update"): - @utils.run_on_all_tenants - def _run(**kwargs): - tracking.update_trackers(schema=kwargs.get("schema")) - - _run() - except TaskLockedException: - logger.info("Tracker update already in progress, skipping duplicate run") + @utils.run_on_all_tenants + def _run(**kwargs): + schema = kwargs.get("schema") + lock_name = tracking.get_scheduler_lock_name(schema) + + try: + with huey_instance.lock_task(lock_name): + tracking.update_trackers(schema=schema) + except TaskLockedException: + logger.info( + "Tracker update already in progress, skipping duplicate run", + schema=schema, + lock_name=lock_name, + ) + + _run() @db_task(retries=2, retry_delay=30) diff --git a/modules/events/karrio/server/events/task_definitions/base/tracking.py b/modules/events/karrio/server/events/task_definitions/base/tracking.py index f26c31a39..140b14248 100644 --- a/modules/events/karrio/server/events/task_definitions/base/tracking.py +++ b/modules/events/karrio/server/events/task_definitions/base/tracking.py @@ -43,6 +43,10 @@ TRACKER_MAX_ACTIVE_DAYS = int(getattr(settings, "TRACKER_MAX_ACTIVE_DAYS", 90)) +def get_scheduler_lock_name(schema: typing.Optional[str] = None) -> str: + return f"background_trackers_update:{schema or 'public'}" + + # ───────────────────────────────────────────────────────────────── # Dispatcher # ───────────────────────────────────────────────────────────────── diff --git a/modules/events/karrio/server/events/task_definitions/base/webhook.py b/modules/events/karrio/server/events/task_definitions/base/webhook.py index 75ee1de0b..c76384758 100644 --- a/modules/events/karrio/server/events/task_definitions/base/webhook.py +++ b/modules/events/karrio/server/events/task_definitions/base/webhook.py @@ -6,13 +6,13 @@ from django.contrib.auth import get_user_model from karrio.core import utils -from karrio.server.core.utils import identity from karrio.server.core.logging import logger from karrio.server.serializers import Context from karrio.server.events import models import karrio.server.events.serializers.event as serializers -NotificationResponse = typing.Tuple[str, requests.Response] +NotificationResponse = typing.Tuple[str, typing.Optional[requests.Response]] User = get_user_model() +WEBHOOK_REQUEST_TIMEOUT = float(getattr(settings, "WEBHOOK_REQUEST_TIMEOUT", 10)) def notify_webhook_subscribers( @@ -54,16 +54,24 @@ def notify_webhook_subscribers( def notify_subscribers(webhooks: typing.List[models.Webhook], payload: dict): def notify_subscriber(webhook: models.Webhook): - response = identity( - lambda: requests.post( + try: + response = requests.post( webhook.url, json=payload, headers={ "Content-type": "application/json", "X-Event-Id": webhook.secret, }, + timeout=WEBHOOK_REQUEST_TIMEOUT, ) - ) + except requests.RequestException as request_error: + logger.warning( + "Webhook notification request failed", + webhook_id=webhook.id, + webhook_url=webhook.url, + error=str(request_error), + ) + return webhook.id, None return webhook.id, response @@ -82,7 +90,7 @@ def update_notified_webhooks( logger.debug("Updating webhook", webhook_id=webhook_id) webhook = next((w for w in webhooks if w.id == webhook_id)) - if response.ok: + if response is not None and response.ok: webhook.last_event_at = event_at webhook.failure_streak_count = 0 else: diff --git a/modules/events/karrio/server/events/tests/test_tracking_tasks.py b/modules/events/karrio/server/events/tests/test_tracking_tasks.py index cbe14586e..22edcfe44 100644 --- a/modules/events/karrio/server/events/tests/test_tracking_tasks.py +++ b/modules/events/karrio/server/events/tests/test_tracking_tasks.py @@ -1,5 +1,6 @@ import json import datetime +from contextlib import nullcontext from time import sleep from unittest.mock import patch, ANY from django.urls import reverse @@ -8,6 +9,7 @@ from karrio.server.core.tests import APITestCase from karrio.server.core.utils import create_carrier_snapshot from karrio.server.manager import models +import karrio.server.events.task_definitions.base as tasks from karrio.server.events.task_definitions.base import tracking @@ -105,6 +107,25 @@ def test_dispatcher_passes_schema(self): for call_args in mock_task.call_args_list: self.assertEqual(call_args.kwargs["schema"], "test_schema") + def test_background_trackers_update_locks_per_schema(self): + def run_on_schema(fn): + def wrapper(*args, **kwargs): + return fn(schema="tenant_a") + + return wrapper + + with patch.object( + tasks.utils, "run_on_all_tenants", side_effect=run_on_schema + ), patch.object( + tasks.huey_instance, "lock_task", return_value=nullcontext() + ) as lock_task, patch( + "karrio.server.events.task_definitions.base.tracking.update_trackers" + ) as update_trackers: + tasks.background_trackers_update.call_local() + + lock_task.assert_called_once_with("background_trackers_update:tenant_a") + update_trackers.assert_called_once_with(schema="tenant_a") + def test_process_carrier_trackers_incremental_save(self): """process_carrier_trackers fetches and saves each batch immediately.""" dhl_tracker = models.Tracking.objects.get( diff --git a/modules/events/karrio/server/events/tests/test_webhooks.py b/modules/events/karrio/server/events/tests/test_webhooks.py index 754bd27bd..266969cd5 100644 --- a/modules/events/karrio/server/events/tests/test_webhooks.py +++ b/modules/events/karrio/server/events/tests/test_webhooks.py @@ -1,6 +1,6 @@ import json from unittest.mock import ANY, patch -from requests import Response +from requests import RequestException, Response from django.urls import reverse from django.utils import timezone @@ -61,11 +61,11 @@ def test_webhook_notify(self): ) with patch( - "karrio.server.events.task_definitions.base.webhook.identity" - ) as mocks: + "karrio.server.events.task_definitions.base.webhook.requests.post" + ) as requests_post: response = Response() response.status_code = 200 - mocks.return_value = response + requests_post.return_value = response notify_webhook_subscribers( event="shipment.purchased", @@ -81,6 +81,8 @@ def test_webhook_notify(self): response_data = json.loads(response.content) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertDictEqual(response_data, WEBHOOK_NOTIFIED_RESPONSE) + _, kwargs = requests_post.call_args + self.assertEqual(kwargs["timeout"], 10) def test_tracker_updated_payload_structure(self): """Webhook payload for tracker_updated contains tracking-specific fields.""" @@ -142,10 +144,12 @@ def test_shipment_purchased_payload_structure(self): def test_webhook_failure_streak_increments_and_auto_disables_after_threshold(self): """Failed deliveries increment failure_streak_count and eventually disable.""" - with patch("karrio.server.events.task_definitions.base.webhook.identity") as mocks: + with patch( + "karrio.server.events.task_definitions.base.webhook.requests.post" + ) as requests_post: failed = Response() failed.status_code = 500 - mocks.return_value = failed + requests_post.return_value = failed # Trigger 6 failed notifications (disable when > 5) for _ in range(6): @@ -161,6 +165,26 @@ def test_webhook_failure_streak_increments_and_auto_disables_after_threshold(sel self.assertTrue(self.webhook.disabled) self.assertIsNone(self.webhook.last_event_at) + def test_webhook_request_exception_increments_failure_streak(self): + with patch( + "karrio.server.events.task_definitions.base.webhook.requests.post", + side_effect=RequestException("timeout"), + ): + notify_webhook_subscribers( + event="shipment.purchased", + data={"shipment": "content"}, + event_at=NOTIFICATION_DATETIME, + ctx=dict( + user_id=self.user.id, + test_mode=True, + ), + ) + + self.webhook.refresh_from_db() + self.assertEqual(self.webhook.failure_streak_count, 1) + self.assertFalse(self.webhook.disabled) + self.assertIsNone(self.webhook.last_event_at) + WEBHOOK_DATA = { "url": "https://api.karrio.io",