Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,22 @@
def background_trackers_update():
from karrio.server.events.task_definitions.base import tracking

try:
with huey_instance.lock_task("background_trackers_update"):
@utils.run_on_all_tenants
def _run(**kwargs):
tracking.update_trackers(schema=kwargs.get("schema"))

_run()
except TaskLockedException:
logger.info("Tracker update already in progress, skipping duplicate run")
@utils.run_on_all_tenants
def _run(**kwargs):
schema = kwargs.get("schema")
lock_name = tracking.get_scheduler_lock_name(schema)

try:
with huey_instance.lock_task(lock_name):
tracking.update_trackers(schema=schema)
except TaskLockedException:
logger.info(
"Tracker update already in progress, skipping duplicate run",
schema=schema,
lock_name=lock_name,
)

_run()


@db_task(retries=2, retry_delay=30)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
TRACKER_MAX_ACTIVE_DAYS = int(getattr(settings, "TRACKER_MAX_ACTIVE_DAYS", 90))


def get_scheduler_lock_name(schema: typing.Optional[str] = None) -> str:
return f"background_trackers_update:{schema or 'public'}"


# ─────────────────────────────────────────────────────────────────
# Dispatcher
# ─────────────────────────────────────────────────────────────────
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
from django.contrib.auth import get_user_model

from karrio.core import utils
from karrio.server.core.utils import identity
from karrio.server.core.logging import logger
from karrio.server.serializers import Context
from karrio.server.events import models
import karrio.server.events.serializers.event as serializers
NotificationResponse = typing.Tuple[str, requests.Response]
NotificationResponse = typing.Tuple[str, typing.Optional[requests.Response]]
User = get_user_model()
WEBHOOK_REQUEST_TIMEOUT = float(getattr(settings, "WEBHOOK_REQUEST_TIMEOUT", 10))


def notify_webhook_subscribers(
Expand Down Expand Up @@ -54,16 +54,24 @@ def notify_webhook_subscribers(

def notify_subscribers(webhooks: typing.List[models.Webhook], payload: dict):
def notify_subscriber(webhook: models.Webhook):
response = identity(
lambda: requests.post(
try:
response = requests.post(
webhook.url,
json=payload,
headers={
"Content-type": "application/json",
"X-Event-Id": webhook.secret,
},
timeout=WEBHOOK_REQUEST_TIMEOUT,
)
)
except requests.RequestException as request_error:
logger.warning(
"Webhook notification request failed",
webhook_id=webhook.id,
webhook_url=webhook.url,
error=str(request_error),
)
return webhook.id, None

return webhook.id, response

Expand All @@ -82,7 +90,7 @@ def update_notified_webhooks(
logger.debug("Updating webhook", webhook_id=webhook_id)

webhook = next((w for w in webhooks if w.id == webhook_id))
if response.ok:
if response is not None and response.ok:
webhook.last_event_at = event_at
webhook.failure_streak_count = 0
else:
Expand Down
21 changes: 21 additions & 0 deletions modules/events/karrio/server/events/tests/test_tracking_tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import datetime
from contextlib import nullcontext
from time import sleep
from unittest.mock import patch, ANY
from django.urls import reverse
Expand All @@ -8,6 +9,7 @@
from karrio.server.core.tests import APITestCase
from karrio.server.core.utils import create_carrier_snapshot
from karrio.server.manager import models
import karrio.server.events.task_definitions.base as tasks
from karrio.server.events.task_definitions.base import tracking


Expand Down Expand Up @@ -105,6 +107,25 @@ def test_dispatcher_passes_schema(self):
for call_args in mock_task.call_args_list:
self.assertEqual(call_args.kwargs["schema"], "test_schema")

def test_background_trackers_update_locks_per_schema(self):
def run_on_schema(fn):
def wrapper(*args, **kwargs):
return fn(schema="tenant_a")

return wrapper

with patch.object(
tasks.utils, "run_on_all_tenants", side_effect=run_on_schema
), patch.object(
tasks.huey_instance, "lock_task", return_value=nullcontext()
) as lock_task, patch(
"karrio.server.events.task_definitions.base.tracking.update_trackers"
) as update_trackers:
tasks.background_trackers_update.call_local()

lock_task.assert_called_once_with("background_trackers_update:tenant_a")
update_trackers.assert_called_once_with(schema="tenant_a")

def test_process_carrier_trackers_incremental_save(self):
"""process_carrier_trackers fetches and saves each batch immediately."""
dhl_tracker = models.Tracking.objects.get(
Expand Down
36 changes: 30 additions & 6 deletions modules/events/karrio/server/events/tests/test_webhooks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
from unittest.mock import ANY, patch
from requests import Response
from requests import RequestException, Response

from django.urls import reverse
from django.utils import timezone
Expand Down Expand Up @@ -61,11 +61,11 @@ def test_webhook_notify(self):
)

with patch(
"karrio.server.events.task_definitions.base.webhook.identity"
) as mocks:
"karrio.server.events.task_definitions.base.webhook.requests.post"
) as requests_post:
response = Response()
response.status_code = 200
mocks.return_value = response
requests_post.return_value = response

notify_webhook_subscribers(
event="shipment.purchased",
Expand All @@ -81,6 +81,8 @@ def test_webhook_notify(self):
response_data = json.loads(response.content)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertDictEqual(response_data, WEBHOOK_NOTIFIED_RESPONSE)
_, kwargs = requests_post.call_args
self.assertEqual(kwargs["timeout"], 10)

def test_tracker_updated_payload_structure(self):
"""Webhook payload for tracker_updated contains tracking-specific fields."""
Expand Down Expand Up @@ -142,10 +144,12 @@ def test_shipment_purchased_payload_structure(self):

def test_webhook_failure_streak_increments_and_auto_disables_after_threshold(self):
"""Failed deliveries increment failure_streak_count and eventually disable."""
with patch("karrio.server.events.task_definitions.base.webhook.identity") as mocks:
with patch(
"karrio.server.events.task_definitions.base.webhook.requests.post"
) as requests_post:
failed = Response()
failed.status_code = 500
mocks.return_value = failed
requests_post.return_value = failed

# Trigger 6 failed notifications (disable when > 5)
for _ in range(6):
Expand All @@ -161,6 +165,26 @@ def test_webhook_failure_streak_increments_and_auto_disables_after_threshold(sel
self.assertTrue(self.webhook.disabled)
self.assertIsNone(self.webhook.last_event_at)

def test_webhook_request_exception_increments_failure_streak(self):
with patch(
"karrio.server.events.task_definitions.base.webhook.requests.post",
side_effect=RequestException("timeout"),
):
notify_webhook_subscribers(
event="shipment.purchased",
data={"shipment": "content"},
event_at=NOTIFICATION_DATETIME,
ctx=dict(
user_id=self.user.id,
test_mode=True,
),
)

self.webhook.refresh_from_db()
self.assertEqual(self.webhook.failure_streak_count, 1)
self.assertFalse(self.webhook.disabled)
self.assertIsNone(self.webhook.last_event_at)


WEBHOOK_DATA = {
"url": "https://api.karrio.io",
Expand Down