diff --git a/gateway/.envs/example/django.env b/gateway/.envs/example/django.env index 1287a26fa..f306a3f6b 100644 --- a/gateway/.envs/example/django.env +++ b/gateway/.envs/example/django.env @@ -8,6 +8,19 @@ HOSTNAME=localhost:8000 API_VERSION=v1 API_KEY= +# FEDERATION +# ------------------------------------------------------------------------------ +# Peer sync uses the federation-sync Docker service (same sds-network as gateway; +# service definition lives under /federation). Bootstrap: enable federation, run +# create_federation_sync_api_key, pass the key to federation-sync. Set FEDERATION_SITE_NAME +# (e.g. crc) when enabling federation; use SDS_SITE_FQDN for the public host (RFC [site].fqdn). +# FEDERATION_ENABLED=true # Master switch for export APIs and Redis federation events. +# FEDERATION_SITE_NAME=crc # RFC [site].name (short peer id); set SDS_SITE_FQDN separately for [site].fqdn. +# FEDERATION_EVENTS_CHANNEL=federation:events # Redis pub/sub channel federation-sync subscribes to. +# FEDERATION_SYNC_HEALTH_URL=http://federation-sync:8000/sync/health # Health probe target (federation-sync service). +# FEDERATION_SYNC_USER_EMAIL=federation-sync@internal.local # Service user email for create_federation_sync_api_key. +# FEDERATION_EXPORT_ALLOWED_CIDRS= # Comma-separated CIDRs allowed to call export (default: private Docker ranges). + # AUTH0 # ------------------------------------------------------------------------------ # Set these from your Auth0 application settings diff --git a/gateway/config/api_router.py b/gateway/config/api_router.py index cc9a80f44..b181398ea 100644 --- a/gateway/config/api_router.py +++ b/gateway/config/api_router.py @@ -5,6 +5,7 @@ from sds_gateway.api_methods.views.auth_endpoints import ValidateAuthViewSet from sds_gateway.api_methods.views.capture_endpoints import CaptureViewSet from sds_gateway.api_methods.views.dataset_endpoints import DatasetViewSet +from sds_gateway.api_methods.views.federation_endpoints import FederationViewSet from sds_gateway.api_methods.views.file_endpoints import FileViewSet from sds_gateway.api_methods.views.file_endpoints import check_contents_exist from sds_gateway.users.api.views import UserViewSet @@ -17,6 +18,7 @@ router.register(r"assets/files", FileViewSet, basename="files") router.register(r"assets/captures", CaptureViewSet, basename="captures") router.register(r"assets/datasets", DatasetViewSet, basename="datasets") +router.register(r"federation", FederationViewSet, basename="federation") if settings.VISUALIZATIONS_ENABLED: router.register(r"visualizations", VisualizationViewSet, basename="visualizations") diff --git a/gateway/config/settings/base.py b/gateway/config/settings/base.py index 87fb85ab4..e254b1f61 100644 --- a/gateway/config/settings/base.py +++ b/gateway/config/settings/base.py @@ -1,6 +1,7 @@ """Base settings to build other settings files upon.""" # ruff: noqa: ERA001 +import ipaddress import random import string from pathlib import Path @@ -23,6 +24,8 @@ def __get_random_token(length: int) -> str: __rng.choice(string.ascii_letters + string.digits) for _ in range(length) ) +def _parse_cidrs(raw: list[str]) -> list[ipaddress.IPv4Network | ipaddress.IPv6Network]: + return [ipaddress.ip_network(item.strip(), strict=False) for item in raw] env.read_env() @@ -610,7 +613,10 @@ def _strip_endpoint_scheme(endpoint_url: str) -> str: "rest_framework.authentication.SessionAuthentication", "sds_gateway.api_methods.authentication.APIKeyAuthentication", ), - "DEFAULT_PERMISSION_CLASSES": ("rest_framework.permissions.IsAuthenticated",), + "DEFAULT_PERMISSION_CLASSES": ( + "rest_framework.permissions.IsAuthenticated", + "sds_gateway.api_methods.permissions.DisallowFederationSyncKey", + ), "DEFAULT_SCHEMA_CLASS": "drf_spectacular.openapi.AutoSchema", "DEFAULT_THROTTLE_RATES": { "vis_stream": VIS_STREAM_THROTTLE_RATE, @@ -709,6 +715,59 @@ def _strip_endpoint_scheme(endpoint_url: str) -> str: SDS_PROGRAMMATIC_SITE_NAME: str = env.str("SDS_PROGRAMMATIC_SITE_NAME", default="sds") SDS_SITE_FQDN: str = env.str("SDS_SITE_FQDN", default="localhost") +# Federation peer short name (RFC [site].name, e.g. crc, haystack); not SDS_PROGRAMMATIC_SITE_NAME. +FEDERATION_SITE_NAME: str = env.str("FEDERATION_SITE_NAME", default="").strip() +# Master switch: when False, federation export and Redis events are inactive. +FEDERATION_ENABLED: bool = env.bool("FEDERATION_ENABLED", default=False) +FEDERATION_EVENTS_CHANNEL: str = env.str( + "FEDERATION_EVENTS_CHANNEL", + default="federation:events", +) +FEDERATION_SYNC_USER_EMAIL: str = env.str( + "FEDERATION_SYNC_USER_EMAIL", + default="federation-sync@internal.local", +) +FEDERATION_SYNC_HEALTH_URL: str = env.str( + "FEDERATION_SYNC_HEALTH_URL", + default="http://federation-sync:8000/sync/health", +) +FEDERATION_SYNC_HEALTH_PROBE_TIMEOUT: float = env.float( + "FEDERATION_SYNC_HEALTH_PROBE_TIMEOUT", + default=2.0, +) +FEDERATION_SKIP_SYNC_HEALTH_PROBE: bool = env.bool( + "FEDERATION_SKIP_SYNC_HEALTH_PROBE", + default=False, +) +FEDERATION_SKIP_SYNC_API_KEY_CHECK: bool = env.bool( + "FEDERATION_SKIP_SYNC_API_KEY_CHECK", + default=False, +) +FEDERATION_SKIP_REDIS_PROBE: bool = env.bool( + "FEDERATION_SKIP_REDIS_PROBE", + default=False, +) +# Set at startup / periodic recheck by federation.availability. +FEDERATION_OPERATIONAL: bool = False +FEDERATION_OPERATIONAL_REASON: str = "" +# Tests may set via override_settings without running probes. +FEDERATION_OPERATIONAL_OVERRIDE: bool | None = None +# Export API: internal Docker/private networks (sync → django on sds-network). +FEDERATION_EXPORT_ALLOWED_CIDRS: list[ + ipaddress.IPv4Network | ipaddress.IPv6Network +] = _parse_cidrs( + env.list( + "FEDERATION_EXPORT_ALLOWED_CIDRS", + default=[ + "127.0.0.1/32", + "::1/128", + "10.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", + ], + ), +) + # ADMIN_CONSOLE_ENV is used to visually distinguish between different environments # (production, staging, local) in the admin console and error emails. It does not affect # any functionality and it is meant to prevent changes in production meant for testing diff --git a/gateway/pyproject.toml b/gateway/pyproject.toml index 4ed42b8b8..a88f1bdee 100644 --- a/gateway/pyproject.toml +++ b/gateway/pyproject.toml @@ -147,6 +147,10 @@ # https://deptry.com/usage/#per-rule-ignores [tool.deptry.per_rule_ignores] + DEP001 = [ + # optional monorepo sibling; contract tests add federation/ to sys.path + "sds_federation", + ] DEP002 = [ # packages that are installed but not imported "argon2-cffi", # used by django for argon2 password hashing diff --git a/gateway/scripts/fallow-cross-file-dupes.sh b/gateway/scripts/fallow-cross-file-dupes.sh index 5d2e91c14..c1ac45594 100755 --- a/gateway/scripts/fallow-cross-file-dupes.sh +++ b/gateway/scripts/fallow-cross-file-dupes.sh @@ -2,24 +2,31 @@ set -euo pipefail cd "$(dirname "$0")/.." -# Use first available: vpx > bunx > npx -if command -v pnpx &>/dev/null; then - RUNNER=pnpx -elif command -v vpx &>/dev/null; then - RUNNER=vpx -elif command -v bunx &>/dev/null; then - RUNNER=bunx -elif command -v npx &>/dev/null; then - RUNNER=npx +LOCAL_FALLOW="node_modules/.bin/fallow" +if [[ -x "${LOCAL_FALLOW}" ]]; then + "${LOCAL_FALLOW}" dupes --format json -q | jq -e ' + [ (.clone_groups // .dupes.clone_groups // [])[] + | select((.instances | map(.file) | unique | length) > 1) + ] | length == 0 +' >/dev/null else - echo "Error: neither vpx, bunx, nor npx found in PATH" >&2 - exit 1 -fi + # Use first available: vpx > bunx > npx (avoid pnpx; global pnpm may need newer Node) + if command -v vpx &>/dev/null; then + RUNNER=(vpx) + elif command -v bunx &>/dev/null; then + RUNNER=(bunx) + elif command -v npx &>/dev/null; then + RUNNER=(npx) + else + echo "Error: neither local fallow, vpx, bunx, nor npx found" >&2 + exit 1 + fi -"${RUNNER}" fallow dupes --format json -q | jq -e ' + "${RUNNER[@]}" fallow dupes --format json -q | jq -e ' [ (.clone_groups // .dupes.clone_groups // [])[] | select((.instances | map(.file) | unique | length) > 1) ] | length == 0 ' >/dev/null +fi echo "No cross-file clone groups detected." diff --git a/gateway/sds_gateway/api_methods/apps.py b/gateway/sds_gateway/api_methods/apps.py index 73073e45a..3d189bbef 100644 --- a/gateway/sds_gateway/api_methods/apps.py +++ b/gateway/sds_gateway/api_methods/apps.py @@ -12,7 +12,13 @@ class ApiMethodsConfig(AppConfig): # pattern to import application modules here in ready() # ruff: noqa: PLC0415 def ready(self) -> None: + import sds_gateway.api_methods.federation.signals import sds_gateway.api_methods.schema # noqa: F401 + from sds_gateway.api_methods.federation.availability import ( + initialize_federation_operational_state, + ) + + initialize_federation_operational_state() silence_unwanted_logs() diff --git a/gateway/sds_gateway/api_methods/authentication.py b/gateway/sds_gateway/api_methods/authentication.py index 2a210478f..60e831970 100644 --- a/gateway/sds_gateway/api_methods/authentication.py +++ b/gateway/sds_gateway/api_methods/authentication.py @@ -13,7 +13,7 @@ class APIKeyAuthentication(BaseAuthentication): keyword = "Api-Key" - def authenticate(self, request) -> tuple[User, bool]: + def authenticate(self, request) -> tuple[User, UserAPIKey]: """Authenticates the user with their API key. Args: request: Contains the API key in the Authorization header. @@ -47,7 +47,7 @@ def authenticate(self, request) -> tuple[User, bool]: raise AuthenticationFailed(msg) from err user = api_key_obj.user - return (user, True) + return (user, api_key_obj) def authenticate_header(self, request) -> str: return self.keyword diff --git a/gateway/sds_gateway/api_methods/federation/__init__.py b/gateway/sds_gateway/api_methods/federation/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/gateway/sds_gateway/api_methods/federation/availability.py b/gateway/sds_gateway/api_methods/federation/availability.py new file mode 100644 index 000000000..75cd6fd39 --- /dev/null +++ b/gateway/sds_gateway/api_methods/federation/availability.py @@ -0,0 +1,174 @@ +"""Federation operational status: config, sync health, Redis, sync API key.""" + +from __future__ import annotations + +import ipaddress +import json +import time +import urllib.error +import urllib.request +from typing import Any + +from django.conf import settings +from loguru import logger as log + +from sds_gateway.api_methods.models import KeySources +from sds_gateway.api_methods.tasks import get_redis_client +from sds_gateway.users.models import UserAPIKey + +_HTTP_OK = 200 +_RECHECK_INTERVAL_SECONDS = 60.0 +_last_evaluated_at: float = 0.0 +_cached_operational: bool = False +_cached_reason: str = "not evaluated" + + +def _setting(name: str, *, default: Any = None) -> Any: + return getattr(settings, name, default) + + +def _export_allowed_networks() -> list[ipaddress.IPv4Network | ipaddress.IPv6Network]: + """Networks from settings (parsed at startup); tests may override with strings.""" + raw = _setting("FEDERATION_EXPORT_ALLOWED_CIDRS", default=[]) + networks: list[ipaddress.IPv4Network | ipaddress.IPv6Network] = [] + for item in raw: + if isinstance(item, (ipaddress.IPv4Network, ipaddress.IPv6Network)): + networks.append(item) + else: + networks.append(ipaddress.ip_network(str(item).strip(), strict=False)) + return networks + + +def federation_client_ip(request) -> str | None: + """Client IP for export access control (direct internal connections only).""" + remote = request.META.get("REMOTE_ADDR") + if remote: + return str(remote).strip() + return None + + +def is_client_ip_allowed_for_federation_export(request) -> bool: + cidrs = _export_allowed_networks() + if not cidrs: + return False + client_ip = federation_client_ip(request) + if not client_ip: + return False + try: + addr = ipaddress.ip_address(client_ip) + except ValueError: + return False + return any(addr in network for network in cidrs) + + +def _sync_health_ok() -> tuple[bool, str]: # noqa: PLR0911 + if _setting("FEDERATION_SKIP_SYNC_HEALTH_PROBE", default=False): + return True, "health probe skipped" + url = (_setting("FEDERATION_SYNC_HEALTH_URL") or "").strip() + if not url: + return False, "FEDERATION_SYNC_HEALTH_URL is not set" + if not url.startswith(("http://", "https://")): + return False, "FEDERATION_SYNC_HEALTH_URL must be http(s)" + timeout = float( + _setting("FEDERATION_SYNC_HEALTH_PROBE_TIMEOUT", default=2.0), + ) + request = urllib.request.Request(url, method="GET") # noqa: S310 + try: + with urllib.request.urlopen(request, timeout=timeout) as response: # noqa: S310 + if response.status != _HTTP_OK: + return False, f"sync health returned HTTP {response.status}" + body = response.read().decode("utf-8", errors="replace") + except urllib.error.URLError as exc: + return False, f"sync health probe failed: {exc.reason}" + except TimeoutError: + return False, "sync health probe timed out" + if not body.strip(): + return True, "sync health returned 200" + + try: + payload = json.loads(body) + except json.JSONDecodeError: + return True, "sync health returned 200" + + if isinstance(payload, dict): + if payload.get("status") == "ok": + return True, "sync health ok" + status_value = payload.get("status") + return False, f"sync health status is not ok: {status_value!r}" + + return True, "sync health returned 200" + + +def _sync_api_key_present() -> tuple[bool, str]: + if _setting("FEDERATION_SKIP_SYNC_API_KEY_CHECK", default=False): + return True, "sync API key check skipped" + exists = UserAPIKey.objects.filter(source=KeySources.FederationSync).exists() + if not exists: + return False, "no FederationSync API key in database" + return True, "FederationSync API key present" + + +def _redis_ok() -> tuple[bool, str]: + if not _setting("FEDERATION_ENABLED", default=False): + return True, "redis not required (federation disabled)" + if _setting("FEDERATION_SKIP_REDIS_PROBE", default=False): + return True, "redis probe skipped" + + try: + client = get_redis_client() + client.ping() + except Exception as exc: # noqa: BLE001 + return False, f"redis ping failed: {exc}" + return True, "redis ok" + + +def evaluate_federation_operational() -> tuple[bool, str]: + if not _setting("FEDERATION_ENABLED", default=False): + return False, "FEDERATION_ENABLED is False" + + site_name = (_setting("FEDERATION_SITE_NAME", default="") or "").strip() + if not site_name: + return False, "FEDERATION_SITE_NAME must be set when federation is enabled" + + for check in (_sync_api_key_present, _sync_health_ok, _redis_ok): + ok, reason = check() + if not ok: + return False, reason + return True, "federation operational" + + +def refresh_federation_operational_state(*, force: bool = False) -> tuple[bool, str]: + global _cached_operational, _cached_reason, _last_evaluated_at # noqa: PLW0603 + + now = time.monotonic() + if ( + not force + and _last_evaluated_at + and (now - _last_evaluated_at) < _RECHECK_INTERVAL_SECONDS + ): + return _cached_operational, _cached_reason + + operational, reason = evaluate_federation_operational() + _cached_operational = operational + _cached_reason = reason + _last_evaluated_at = now + settings.FEDERATION_OPERATIONAL = operational + settings.FEDERATION_OPERATIONAL_REASON = reason + return operational, reason + + +def initialize_federation_operational_state() -> None: + operational, reason = refresh_federation_operational_state(force=True) + if operational: + log.info("Federation is operational: {}", reason) + else: + log.warning("Federation disabled: {}", reason) + + +def is_federation_operational() -> bool: + if _setting("FEDERATION_OPERATIONAL_OVERRIDE", default=None) is not None: + return bool(_setting("FEDERATION_OPERATIONAL_OVERRIDE")) + if not _setting("FEDERATION_ENABLED", default=False): + return False + operational, _reason = refresh_federation_operational_state() + return operational diff --git a/gateway/sds_gateway/api_methods/federation/events.py b/gateway/sds_gateway/api_methods/federation/events.py new file mode 100644 index 000000000..1fc815847 --- /dev/null +++ b/gateway/sds_gateway/api_methods/federation/events.py @@ -0,0 +1,53 @@ +"""Publish federation change notifications to Redis.""" + +from __future__ import annotations + +import json +from datetime import UTC +from datetime import datetime +from typing import TYPE_CHECKING +from typing import Any + +from django.conf import settings +from loguru import logger as log + +from sds_gateway.api_methods.federation.availability import is_federation_operational +from sds_gateway.api_methods.tasks import get_redis_client + +if TYPE_CHECKING: + from uuid import UUID + + from sds_gateway.api_methods.models import ItemType + +class FederationEventType(StrEnum): + CREATED = "created" + UPDATED = "updated" + DELETED = "deleted" + + def __str__(self) -> str: + return self.value + + +def publish_federation_event( + *, + event_type: FederationEventType, + item_type: ItemType, + uuid: UUID, + timestamp: datetime | None = None, +) -> None: + """Notify the local federation sync service via Redis pub/sub.""" + if not is_federation_operational(): + log.debug("Federation not operational, skipping Redis publish") + return + channel = getattr(settings, "FEDERATION_EVENTS_CHANNEL", "federation:events") + payload: dict[str, Any] = { + "event_type": event_type, + "item_type": item_type.value, + "uuid": str(uuid), + "timestamp": (timestamp or datetime.now(UTC)).isoformat(), + } + try: + client = get_redis_client() + client.publish(channel, json.dumps(payload)) + except Exception as err: # noqa: BLE001 + log.warning("Failed to publish federation event: {}", err) diff --git a/gateway/sds_gateway/api_methods/federation/export_contract.py b/gateway/sds_gateway/api_methods/federation/export_contract.py new file mode 100644 index 000000000..0a7d65df1 --- /dev/null +++ b/gateway/sds_gateway/api_methods/federation/export_contract.py @@ -0,0 +1,32 @@ +"""Helpers to keep gateway export serializers aligned with sync Pydantic models.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING +from typing import Any + +if TYPE_CHECKING: + from pydantic import BaseModel + from rest_framework.serializers import BaseSerializer + + +def serializer_output_field_names(serializer: BaseSerializer[Any]) -> set[str]: + return set(serializer.fields.keys()) + + +def assert_field_names_match( + serializer: BaseSerializer[Any], + pydantic_model: BaseModel, + *, + label: str, +) -> None: + expected = set(pydantic_model.model_fields.keys()) + actual = serializer_output_field_names(serializer) + if expected != actual: + missing = expected - actual + extra = actual - expected + msg = ( + f"{label} field mismatch: missing={sorted(missing)!r} " + f"extra={sorted(extra)!r}" + ) + raise AssertionError(msg) diff --git a/gateway/sds_gateway/api_methods/federation/permissions.py b/gateway/sds_gateway/api_methods/federation/permissions.py new file mode 100644 index 000000000..2936c2e62 --- /dev/null +++ b/gateway/sds_gateway/api_methods/federation/permissions.py @@ -0,0 +1,39 @@ +"""Permissions specific to federation export endpoints.""" + +from django.conf import settings +from rest_framework.exceptions import APIException +from rest_framework.permissions import BasePermission + +from sds_gateway.api_methods.federation.availability import ( + is_client_ip_allowed_for_federation_export, +) +from sds_gateway.api_methods.federation.availability import is_federation_operational + + +class FederationNotOperational(APIException): + status_code = 503 + default_detail = "Federation is not configured or the sync service is unavailable." + default_code = "federation_unavailable" + + +class IsFederationOperational(BasePermission): + """Deny export when federation failed startup / health checks.""" + + message = FederationNotOperational.default_detail + + def has_permission(self, request, view) -> bool: + if not is_federation_operational(): + detail = getattr(settings, "FEDERATION_OPERATIONAL_REASON", "") or ( + FederationNotOperational.default_detail + ) + raise FederationNotOperational(detail=detail) + return True + + +class IsFederationInternalExportClient(BasePermission): + """Restrict export to internal network clients (sync service on sds-network).""" + + message = "Federation export is only available to internal clients." + + def has_permission(self, request, view) -> bool: + return is_client_ip_allowed_for_federation_export(request) diff --git a/gateway/sds_gateway/api_methods/federation/signals.py b/gateway/sds_gateway/api_methods/federation/signals.py new file mode 100644 index 000000000..c5cc06e04 --- /dev/null +++ b/gateway/sds_gateway/api_methods/federation/signals.py @@ -0,0 +1,76 @@ +"""Django signals that publish federation Redis events.""" + +from __future__ import annotations + +from django.conf import settings +from django.db.models.signals import post_save +from django.dispatch import receiver +from loguru import logger as log + +from sds_gateway.api_methods.federation.availability import is_federation_operational +from sds_gateway.api_methods.federation.events import publish_federation_event +from sds_gateway.api_methods.helpers.compile_federated_data import ( + is_federation_exportable_capture, +) +from sds_gateway.api_methods.helpers.compile_federated_data import ( + is_federation_exportable_dataset, +) +from sds_gateway.api_methods.models import Capture +from sds_gateway.api_methods.models import Dataset +from sds_gateway.api_methods.models import ItemType + + +def _event_type(*, created: bool, exportable: bool) -> str: + if not exportable: + return "deleted" + return "created" if created else "updated" + + +def _skip_signal() -> bool: + if not getattr(settings, "FEDERATION_ENABLED", False): + log.debug("FEDERATION_ENABLED is False, skipping federation signal") + return True + + if not is_federation_operational(): + log.debug("Federation not operational, skipping signal") + return True + + return False + + +@receiver(post_save, sender=Dataset) +def federation_dataset_changed( + sender: type[Dataset], + instance: Dataset, + created: bool, # noqa: FBT001 + **kwargs, +) -> None: + if _skip_signal(): + return + + exportable = is_federation_exportable_dataset(instance) + publish_federation_event( + event_type=_event_type(created=created, exportable=exportable), + item_type=ItemType.DATASET, + uuid=instance.uuid, + timestamp=instance.updated_at, + ) + + +@receiver(post_save, sender=Capture) +def federation_capture_changed( + sender: type[Capture], + instance: Capture, + created: bool, # noqa: FBT001 + **kwargs, +) -> None: + if _skip_signal(): + return + + exportable = is_federation_exportable_capture(instance) + publish_federation_event( + event_type=_event_type(created=created, exportable=exportable), + item_type=ItemType.CAPTURE, + uuid=instance.uuid, + timestamp=instance.updated_at, + ) diff --git a/gateway/sds_gateway/api_methods/helpers/compile_federated_data.py b/gateway/sds_gateway/api_methods/helpers/compile_federated_data.py new file mode 100644 index 000000000..53be29f53 --- /dev/null +++ b/gateway/sds_gateway/api_methods/helpers/compile_federated_data.py @@ -0,0 +1,75 @@ +"""Build federation export payloads (RFC fed-* index shape).""" + +from __future__ import annotations + +from typing import TYPE_CHECKING +from typing import Any + +from django.conf import settings + +from sds_gateway.api_methods.models import Capture +from sds_gateway.api_methods.models import Dataset +from sds_gateway.api_methods.models import DatasetStatus +from sds_gateway.api_methods.serializers.capture_serializers import ( + CaptureFederationSerializer, +) +from sds_gateway.api_methods.serializers.dataset_serializers import ( + DatasetFederationSerializer, +) + +if TYPE_CHECKING: + from django.db.models import QuerySet + + +def federation_site_name() -> str: + return getattr(settings, "FEDERATION_SITE_NAME", "").strip() + + +def public_datasets_queryset() -> QuerySet[Dataset]: + return ( + Dataset.objects.filter( + status=DatasetStatus.FINAL, + is_public=True, + is_deleted=False, + ) + .prefetch_related("keywords", "owner") + .order_by("-updated_at") + ) + + +def public_captures_queryset() -> QuerySet[Capture]: + return Capture.objects.filter(is_public=True, is_deleted=False).order_by( + "-updated_at", + ) + + +def is_federation_exportable_dataset(dataset: Dataset) -> bool: + return ( + not dataset.is_deleted + and dataset.is_public + and dataset.status == DatasetStatus.FINAL + ) + + +def is_federation_exportable_capture(capture: Capture) -> bool: + return not capture.is_deleted and capture.is_public + + +def compile_federated_dataset_doc(dataset: Dataset) -> dict[str, Any]: + """Serialize a public dataset for federation sync / OpenSearch.""" + site_name = federation_site_name() + federation_data = DatasetFederationSerializer( + dataset, + context={"site_name": site_name}, + ) + return federation_data.data + + +def compile_federated_capture_doc(capture: Capture) -> dict[str, Any]: + """Serialize a public capture for federation sync / OpenSearch.""" + site_name = federation_site_name() + federation_data = CaptureFederationSerializer( + capture, + context={"site_name": site_name}, + ) + return federation_data.data diff --git a/gateway/sds_gateway/api_methods/models.py b/gateway/sds_gateway/api_methods/models.py index 8c56b3c87..322fef020 100644 --- a/gateway/sds_gateway/api_methods/models.py +++ b/gateway/sds_gateway/api_methods/models.py @@ -88,6 +88,7 @@ class KeySources(StrEnum): SDSWebUI = "sds_web_ui" SVIBackend = "svi_backend" SVIWebUI = "svi_web_ui" + FederationSync = "federation_sync" class ItemType(StrEnum): diff --git a/gateway/sds_gateway/api_methods/permissions.py b/gateway/sds_gateway/api_methods/permissions.py new file mode 100644 index 000000000..2fe7bf2eb --- /dev/null +++ b/gateway/sds_gateway/api_methods/permissions.py @@ -0,0 +1,31 @@ +"""DRF permissions for API key scoping.""" + +from rest_framework.permissions import BasePermission + +from sds_gateway.api_methods.models import KeySources +from sds_gateway.users.models import UserAPIKey + + +class IsFederationSyncKey(BasePermission): + """Allow only federation sync service API keys.""" + + message = "Federation sync API key required." + + def has_permission(self, request, view) -> bool: + key = request.auth + return isinstance(key, UserAPIKey) and key.source == KeySources.FederationSync + + +class DisallowFederationSyncKey(BasePermission): + """Block federation sync keys from non-federation API routes. + + Applied globally via REST_FRAMEWORK DEFAULT_PERMISSION_CLASSES. + """ + + message = "This API key is restricted to federation export endpoints." + + def has_permission(self, request, view) -> bool: + key = request.auth + return not ( + isinstance(key, UserAPIKey) and key.source == KeySources.FederationSync + ) diff --git a/gateway/sds_gateway/api_methods/serializers/capture_serializers.py b/gateway/sds_gateway/api_methods/serializers/capture_serializers.py index fd5c4cb4d..168178013 100644 --- a/gateway/sds_gateway/api_methods/serializers/capture_serializers.py +++ b/gateway/sds_gateway/api_methods/serializers/capture_serializers.py @@ -883,3 +883,55 @@ def serialize_capture_or_composite( # Serialize as single capture serializer = CaptureGetSerializer(capture_data["capture"], context=context) return serializer.data + + +class CaptureFederationSerializer(serializers.ModelSerializer[Capture]): + """Public-safe capture payload for federation export (sync / OpenSearch).""" + + site_name = serializers.SerializerMethodField() + file_count = serializers.SerializerMethodField() + size = serializers.SerializerMethodField() + capture_props = serializers.SerializerMethodField() + dataset_ids = serializers.SerializerMethodField() + created_at = serializers.DateTimeField( + format="%Y-%m-%d %H:%M:%S%z", + read_only=True, + ) + updated_at = serializers.DateTimeField( + format="%Y-%m-%d %H:%M:%S%z", + read_only=True, + ) + + class Meta: + model = Capture + fields = [ + "uuid", + "name", + "capture_type", + "channel", + "scan_group", + "top_level_dir", + "created_at", + "updated_at", + "site_name", + "file_count", + "size", + "capture_props", + "dataset_ids", + ] + + def get_site_name(self, obj: Capture) -> str: + return str((self.context or {})["site_name"]) + + def get_file_count(self, obj: Capture) -> int: + return int(obj.get_files_summary()["total_count"]) + + def get_size(self, obj: Capture) -> int: + return int(obj.get_files_summary()["total_size"]) + + def get_capture_props(self, obj: Capture) -> dict[str, Any]: + return obj.get_opensearch_metadata() or {} + + def get_dataset_ids(self, obj: Capture) -> list[str]: + qs = get_capture_datasets(obj, include_deleted=False) + return [str(dataset.uuid) for dataset in qs] diff --git a/gateway/sds_gateway/api_methods/serializers/dataset_serializers.py b/gateway/sds_gateway/api_methods/serializers/dataset_serializers.py index 9f57ceffa..519a4bf79 100644 --- a/gateway/sds_gateway/api_methods/serializers/dataset_serializers.py +++ b/gateway/sds_gateway/api_methods/serializers/dataset_serializers.py @@ -334,6 +334,46 @@ class Meta: ] +class DatasetFederationSerializer(DatasetPublicSerializer): + """Serializer for dataset data for federation export.""" + + site_name = serializers.SerializerMethodField() + updated_at = serializers.DateTimeField( + format=READABLE_ISO_DATE_TIME, + read_only=True, + ) + size = serializers.SerializerMethodField() + capture_count = serializers.SerializerMethodField() + capture_file_count = serializers.SerializerMethodField() + artifact_file_count = serializers.SerializerMethodField() + + class Meta(DatasetPublicSerializer.Meta): + fields = [ + *DatasetPublicSerializer.Meta.fields, + "updated_at", + "site_name", + "size", + "capture_count", + "capture_file_count", + "artifact_file_count", + ] + + def get_site_name(self, obj: Dataset) -> str: + return str((self.context or {})["site_name"]) + + def get_size(self, obj: Dataset) -> int: + return int(obj.get_dataset_file_statistics()["total_size"]) + + def get_capture_count(self, obj: Dataset) -> int: + return obj.captures.filter(is_deleted=False).count() + + def get_capture_file_count(self, obj: Dataset) -> int: + return int(obj.get_dataset_file_statistics()["captures"]) + + def get_artifact_file_count(self, obj: Dataset) -> int: + return int(obj.get_dataset_file_statistics()["artifacts"]) + + def get_dataset_serializer( dataset: Dataset, *, diff --git a/gateway/sds_gateway/api_methods/tests/test_authenticate.py b/gateway/sds_gateway/api_methods/tests/test_authenticate.py index f070fc251..7c9620213 100644 --- a/gateway/sds_gateway/api_methods/tests/test_authenticate.py +++ b/gateway/sds_gateway/api_methods/tests/test_authenticate.py @@ -34,7 +34,7 @@ def test_auth_valid_key(self) -> None: user, auth = self.auth.authenticate(request) # Verify that the user is authenticated - assert auth is True + assert isinstance(auth, UserAPIKey) assert user is not None assert user.email == "test@example.com" diff --git a/gateway/sds_gateway/api_methods/tests/test_federation_export.py b/gateway/sds_gateway/api_methods/tests/test_federation_export.py new file mode 100644 index 000000000..9dfe2a864 --- /dev/null +++ b/gateway/sds_gateway/api_methods/tests/test_federation_export.py @@ -0,0 +1,118 @@ +"""Tests for federation export endpoints and API key scoping.""" + +from django.contrib.auth import get_user_model +from django.test import override_settings +from django.urls import reverse +from rest_framework import status +from rest_framework.test import APITestCase + +from sds_gateway.api_methods.models import DatasetStatus +from sds_gateway.api_methods.models import KeySources +from sds_gateway.api_methods.tests.factories import CaptureFactory +from sds_gateway.api_methods.tests.factories import DatasetFactory +from sds_gateway.users.models import UserAPIKey + +User = get_user_model() + + +@override_settings( + FEDERATION_ENABLED=True, + FEDERATION_SITE_NAME="crc", + FEDERATION_OPERATIONAL_OVERRIDE=True, + FEDERATION_EXPORT_ALLOWED_CIDRS=["0.0.0.0/0", "::/0"], +) +class FederationExportAPITest(APITestCase): + def setUp(self) -> None: + self.owner = User.objects.create(email="owner@example.com", is_approved=True) + self.sync_user = User.objects.create( + email="sync@internal.local", + is_approved=True, + ) + _obj, self.sync_key = UserAPIKey.objects.create_key( + name="sync", + user=self.sync_user, + source=KeySources.FederationSync, + ) + _obj, self.user_key = UserAPIKey.objects.create_key( + name="regular", + user=self.owner, + source=KeySources.SDSWebUI, + ) + self.public_dataset = DatasetFactory( + owner=self.owner, + is_public=True, + status=DatasetStatus.FINAL, + keywords=None, + ) + self.private_dataset = DatasetFactory( + owner=self.owner, + is_public=False, + status=DatasetStatus.DRAFT, + keywords=None, + ) + self.public_capture = CaptureFactory(owner=self.owner, is_public=True) + self.list_datasets_url = reverse( + "api:federation-export-datasets-list", + ) + self.detail_dataset_url = reverse( + "api:federation-export-dataset-detail", + kwargs={"pk": str(self.public_dataset.uuid)}, + ) + self.list_captures_url = reverse( + "api:federation-export-captures-list", + ) + + def _auth(self, key: str) -> dict[str, str]: + return {"HTTP_AUTHORIZATION": f"Api-Key: {key}"} + + def test_sync_key_can_list_public_datasets(self) -> None: + response = self.client.get( + self.list_datasets_url, + REMOTE_ADDR="127.0.0.1", + **self._auth(self.sync_key), + ) + assert response.status_code == status.HTTP_200_OK + uuids = {row["uuid"] for row in response.json()} + assert str(self.public_dataset.uuid) in uuids + assert str(self.private_dataset.uuid) not in uuids + + def test_sync_key_can_retrieve_public_dataset(self) -> None: + response = self.client.get( + self.detail_dataset_url, + REMOTE_ADDR="127.0.0.1", + **self._auth(self.sync_key), + ) + assert response.status_code == status.HTTP_200_OK + assert response.json()["uuid"] == str(self.public_dataset.uuid) + assert response.json()["site_name"] == "crc" + + def test_regular_key_denied_on_export(self) -> None: + response = self.client.get( + self.list_datasets_url, + REMOTE_ADDR="127.0.0.1", + **self._auth(self.user_key), + ) + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_sync_key_denied_on_dataset_assets_api(self) -> None: + url = reverse( + "api:datasets-detail", + kwargs={"pk": str(self.public_dataset.uuid)}, + ) + response = self.client.get(url, **self._auth(self.sync_key)) + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_sync_key_denied_on_capture_list(self) -> None: + url = reverse("api:captures-list") + response = self.client.get(url, **self._auth(self.sync_key)) + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_export_captures_list(self) -> None: + response = self.client.get( + self.list_captures_url, + REMOTE_ADDR="127.0.0.1", + **self._auth(self.sync_key), + ) + assert response.status_code == status.HTTP_200_OK + uuids = {row["uuid"] for row in response.json()} + assert str(self.public_capture.uuid) in uuids diff --git a/gateway/sds_gateway/api_methods/tests/test_federation_export_contract.py b/gateway/sds_gateway/api_methods/tests/test_federation_export_contract.py new file mode 100644 index 000000000..910e0731c --- /dev/null +++ b/gateway/sds_gateway/api_methods/tests/test_federation_export_contract.py @@ -0,0 +1,94 @@ +"""Contract tests: gateway federation export JSON ↔ sync Pydantic models.""" + +from __future__ import annotations + +import sys +from pathlib import Path + +import pytest +from django.contrib.auth import get_user_model + +from sds_gateway.api_methods.federation.export_contract import assert_field_names_match +from sds_gateway.api_methods.helpers.compile_federated_data import ( + compile_federated_capture_doc, +) +from sds_gateway.api_methods.helpers.compile_federated_data import ( + compile_federated_dataset_doc, +) +from sds_gateway.api_methods.models import DatasetStatus +from sds_gateway.api_methods.serializers.capture_serializers import ( + CaptureFederationSerializer, +) +from sds_gateway.api_methods.serializers.dataset_serializers import ( + DatasetFederationSerializer, +) +from sds_gateway.api_methods.tests.factories import CaptureFactory +from sds_gateway.api_methods.tests.factories import DatasetFactory + +_repo_root = Path(__file__).resolve().parents[4] +_federation_root = _repo_root / "federation" +if _federation_root.is_dir(): + sys.path.insert(0, str(_federation_root)) + +pytest.importorskip("sds_federation") + +from sds_federation.schemas.webhooks import FederatedCaptureDoc # noqa: E402 +from sds_federation.schemas.webhooks import FederatedDatasetDoc # noqa: E402 + +User = get_user_model() + + +@pytest.mark.django_db +def test_dataset_export_field_names_match_pydantic() -> None: + owner = User.objects.create(email="owner@example.com", is_approved=True) + dataset = DatasetFactory( + owner=owner, + is_public=True, + status=DatasetStatus.FINAL, + keywords=None, + ) + serializer = DatasetFederationSerializer( + dataset, + context={"site_name": "crc"}, + ) + assert_field_names_match( + serializer, + FederatedDatasetDoc, + label="DatasetFederationSerializer", + ) + + +@pytest.mark.django_db +def test_capture_export_field_names_match_pydantic() -> None: + owner = User.objects.create(email="cap-owner@example.com", is_approved=True) + capture = CaptureFactory(owner=owner, is_public=True) + serializer = CaptureFederationSerializer( + capture, + context={"site_name": "crc"}, + ) + assert_field_names_match( + serializer, + FederatedCaptureDoc, + label="CaptureFederationSerializer", + ) + + +@pytest.mark.django_db +def test_compile_federated_dataset_doc_validates_against_pydantic() -> None: + owner = User.objects.create(email="d@example.com", is_approved=True) + dataset = DatasetFactory( + owner=owner, + is_public=True, + status=DatasetStatus.FINAL, + keywords=None, + ) + payload = compile_federated_dataset_doc(dataset) + FederatedDatasetDoc.model_validate(payload) + + +@pytest.mark.django_db +def test_compile_federated_capture_doc_validates_against_pydantic() -> None: + owner = User.objects.create(email="c@example.com", is_approved=True) + capture = CaptureFactory(owner=owner, is_public=True) + payload = compile_federated_capture_doc(capture) + FederatedCaptureDoc.model_validate(payload) diff --git a/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py b/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py new file mode 100644 index 000000000..d52ef7b49 --- /dev/null +++ b/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py @@ -0,0 +1,206 @@ +"""Tests for federation operational checks and export access control.""" + +from __future__ import annotations + +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest +from django.conf import settings +from django.contrib.auth import get_user_model +from django.test import RequestFactory +from django.test import override_settings +from django.urls import reverse +from rest_framework import status +from rest_framework.test import APITestCase + +from sds_gateway.api_methods.federation.availability import ( + evaluate_federation_operational, +) +from sds_gateway.api_methods.federation.availability import ( + is_client_ip_allowed_for_federation_export, +) +from sds_gateway.api_methods.federation.availability import ( + refresh_federation_operational_state, +) +from sds_gateway.api_methods.models import DatasetStatus +from sds_gateway.api_methods.models import KeySources +from sds_gateway.api_methods.tests.factories import DatasetFactory +from sds_gateway.users.models import UserAPIKey + +User = get_user_model() + +pytestmark = pytest.mark.django_db + + +class TestFederationAvailability: + @override_settings(FEDERATION_ENABLED=False) + def test_disabled_when_master_switch_off(self) -> None: + ok, reason = evaluate_federation_operational() + assert ok is False + assert "FEDERATION_ENABLED" in reason + + @override_settings( + FEDERATION_ENABLED=True, + FEDERATION_SITE_NAME="crc", + FEDERATION_SKIP_SYNC_API_KEY_CHECK=True, + FEDERATION_SKIP_SYNC_HEALTH_PROBE=True, + FEDERATION_SKIP_REDIS_PROBE=True, + ) + def test_operational_when_probes_skipped(self) -> None: + ok, _reason = evaluate_federation_operational() + assert ok is True + + @override_settings( + FEDERATION_ENABLED=True, + FEDERATION_SITE_NAME="crc", + FEDERATION_SKIP_SYNC_HEALTH_PROBE=True, + FEDERATION_SKIP_REDIS_PROBE=True, + ) + def test_fails_without_sync_api_key(self) -> None: + ok, reason = evaluate_federation_operational() + assert ok is False + assert "FederationSync" in reason + + @override_settings( + FEDERATION_ENABLED=True, + FEDERATION_SITE_NAME="crc", + FEDERATION_SKIP_SYNC_API_KEY_CHECK=True, + FEDERATION_SYNC_HEALTH_URL="http://sync.test/health", + FEDERATION_SKIP_REDIS_PROBE=True, + ) + @patch("sds_gateway.api_methods.federation.availability.urllib.request.urlopen") + def test_health_probe_success(self, mock_urlopen: MagicMock) -> None: + response = MagicMock() + response.status = 200 + response.read.return_value = b'{"status":"ok"}' + response.__enter__.return_value = response + response.__exit__.return_value = None + mock_urlopen.return_value = response + + ok, _reason = evaluate_federation_operational() + assert ok is True + + @override_settings( + FEDERATION_ENABLED=True, + FEDERATION_SITE_NAME="crc", + FEDERATION_SKIP_SYNC_API_KEY_CHECK=True, + FEDERATION_SYNC_HEALTH_URL="http://sync.test/health", + FEDERATION_SKIP_REDIS_PROBE=True, + ) + @patch("sds_gateway.api_methods.federation.availability.urllib.request.urlopen") + def test_health_probe_fails_when_json_status_not_ok( + self, + mock_urlopen: MagicMock, + ) -> None: + response = MagicMock() + response.status = 200 + response.read.return_value = b'{"status":"degraded"}' + response.__enter__.return_value = response + response.__exit__.return_value = None + mock_urlopen.return_value = response + + ok, reason = evaluate_federation_operational() + assert ok is False + assert "not ok" in reason + + @override_settings( + FEDERATION_EXPORT_ALLOWED_CIDRS=["10.0.0.0/8"], + ) + def test_client_ip_allowlist(self) -> None: + factory = RequestFactory() + allowed = factory.get("/", REMOTE_ADDR="10.1.2.3") + denied = factory.get("/", REMOTE_ADDR="203.0.113.8") + assert is_client_ip_allowed_for_federation_export(allowed) is True + assert is_client_ip_allowed_for_federation_export(denied) is False + + @override_settings( + FEDERATION_ENABLED=True, + FEDERATION_SITE_NAME="", + FEDERATION_SKIP_SYNC_API_KEY_CHECK=True, + FEDERATION_SKIP_SYNC_HEALTH_PROBE=True, + FEDERATION_SKIP_REDIS_PROBE=True, + ) + def test_fails_without_site_name(self) -> None: + ok, reason = evaluate_federation_operational() + assert ok is False + assert "FEDERATION_SITE_NAME" in reason + + @override_settings( + FEDERATION_ENABLED=True, + FEDERATION_SITE_NAME="crc", + FEDERATION_SKIP_SYNC_API_KEY_CHECK=True, + FEDERATION_SKIP_SYNC_HEALTH_PROBE=True, + FEDERATION_SKIP_REDIS_PROBE=True, + ) + def test_refresh_sets_settings_flags(self) -> None: + operational, reason = refresh_federation_operational_state(force=True) + + assert operational is True + assert settings.FEDERATION_OPERATIONAL is True + assert reason + + +@override_settings( + FEDERATION_ENABLED=True, + FEDERATION_SITE_NAME="crc", + FEDERATION_OPERATIONAL_OVERRIDE=True, + FEDERATION_EXPORT_ALLOWED_CIDRS=["0.0.0.0/0", "::/0"], +) +class FederationExportAccessControlTest(APITestCase): + """Export API with operational + network permissions enabled for tests.""" + + def setUp(self) -> None: + self.sync_user = User.objects.create( + email="sync@internal.local", + is_approved=True, + ) + _obj, self.sync_key = UserAPIKey.objects.create_key( + name="sync", + user=self.sync_user, + source=KeySources.FederationSync, + ) + owner = User.objects.create(email="owner@example.com", is_approved=True) + self.public_dataset = DatasetFactory( + owner=owner, + is_public=True, + status=DatasetStatus.FINAL, + keywords=None, + ) + self.list_datasets_url = reverse("api:federation-export-datasets-list") + + def _auth(self, key: str) -> dict[str, str]: + return {"HTTP_AUTHORIZATION": f"Api-Key: {key}"} + + def test_sync_key_allowed_from_loopback(self) -> None: + response = self.client.get( + self.list_datasets_url, + REMOTE_ADDR="127.0.0.1", + **self._auth(self.sync_key), + ) + assert response.status_code == status.HTTP_200_OK + + @override_settings( + FEDERATION_EXPORT_ALLOWED_CIDRS=["10.0.0.0/8"], + FEDERATION_OPERATIONAL_OVERRIDE=True, + ) + def test_sync_key_denied_from_public_ip(self) -> None: + response = self.client.get( + self.list_datasets_url, + REMOTE_ADDR="203.0.113.1", + **self._auth(self.sync_key), + ) + assert response.status_code == status.HTTP_403_FORBIDDEN + + @override_settings( + FEDERATION_OPERATIONAL_OVERRIDE=False, + FEDERATION_OPERATIONAL_REASON="sync health probe failed", + FEDERATION_EXPORT_ALLOWED_CIDRS=["0.0.0.0/0"], + ) + def test_export_returns_503_when_not_operational(self) -> None: + response = self.client.get( + self.list_datasets_url, + REMOTE_ADDR="127.0.0.1", + **self._auth(self.sync_key), + ) + assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE diff --git a/gateway/sds_gateway/api_methods/views/capture_endpoints.py b/gateway/sds_gateway/api_methods/views/capture_endpoints.py index 94f23808e..9e2ef28a6 100644 --- a/gateway/sds_gateway/api_methods/views/capture_endpoints.py +++ b/gateway/sds_gateway/api_methods/views/capture_endpoints.py @@ -27,7 +27,6 @@ from rest_framework.authentication import SessionAuthentication from rest_framework.decorators import action from rest_framework.pagination import PageNumberPagination -from rest_framework.permissions import IsAuthenticated from rest_framework.request import Request from rest_framework.response import Response @@ -208,7 +207,6 @@ class CapturePagination(PageNumberPagination): class CaptureViewSet(viewsets.ViewSet): authentication_classes = [SessionAuthentication, APIKeyAuthentication] - permission_classes = [IsAuthenticated] def _validate_and_index_metadata( self, diff --git a/gateway/sds_gateway/api_methods/views/dataset_endpoints.py b/gateway/sds_gateway/api_methods/views/dataset_endpoints.py index 0785ece85..2ef78566f 100644 --- a/gateway/sds_gateway/api_methods/views/dataset_endpoints.py +++ b/gateway/sds_gateway/api_methods/views/dataset_endpoints.py @@ -11,7 +11,6 @@ from rest_framework import status from rest_framework.authentication import SessionAuthentication from rest_framework.decorators import action -from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from rest_framework.viewsets import ViewSet @@ -56,8 +55,7 @@ def _truthy_query_param(raw: str | None) -> bool: class DatasetViewSet(ViewSet): - authentication_classes = [SessionAuthentication, APIKeyAuthentication] - permission_classes = [IsAuthenticated] + authentication_classes = [APIKeyAuthentication, SessionAuthentication] def _get_file_objects( self, diff --git a/gateway/sds_gateway/api_methods/views/federation_endpoints.py b/gateway/sds_gateway/api_methods/views/federation_endpoints.py new file mode 100644 index 000000000..d9a215a26 --- /dev/null +++ b/gateway/sds_gateway/api_methods/views/federation_endpoints.py @@ -0,0 +1,103 @@ +"""Federation export API (sync service → gateway, Postgres public metadata).""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from django.shortcuts import get_object_or_404 +from drf_spectacular.utils import extend_schema +from rest_framework.decorators import action +from rest_framework.response import Response +from rest_framework.viewsets import ViewSet + +if TYPE_CHECKING: + from rest_framework.request import Request + +from sds_gateway.api_methods.authentication import APIKeyAuthentication +from sds_gateway.api_methods.federation.permissions import ( + IsFederationInternalExportClient, +) +from sds_gateway.api_methods.federation.permissions import IsFederationOperational +from sds_gateway.api_methods.helpers.compile_federated_data import ( + compile_federated_capture_doc, +) +from sds_gateway.api_methods.helpers.compile_federated_data import ( + compile_federated_dataset_doc, +) +from sds_gateway.api_methods.helpers.compile_federated_data import ( + is_federation_exportable_capture, +) +from sds_gateway.api_methods.helpers.compile_federated_data import ( + is_federation_exportable_dataset, +) +from sds_gateway.api_methods.helpers.compile_federated_data import ( + public_captures_queryset, +) +from sds_gateway.api_methods.helpers.compile_federated_data import ( + public_datasets_queryset, +) +from sds_gateway.api_methods.models import Capture +from sds_gateway.api_methods.models import Dataset +from sds_gateway.api_methods.permissions import IsFederationSyncKey + + +@extend_schema(exclude=True) +class FederationViewSet(ViewSet): + """Internal export endpoints for the federation sync service.""" + + authentication_classes = [APIKeyAuthentication] + permission_classes = [ + IsFederationSyncKey, + IsFederationOperational, + IsFederationInternalExportClient, + ] + + @action(detail=False, methods=["get"], url_path="export/datasets") + def export_datasets_list(self, request: Request) -> Response: + """List all public finalized datasets for federation bootstrap.""" + datasets = public_datasets_queryset() + return Response( + [compile_federated_dataset_doc(dataset) for dataset in datasets], + ) + + @action( + detail=False, + methods=["get"], + url_path=r"export/datasets/(?P[^/.]+)", + ) + def export_dataset_detail( + self, request: Request, pk: str | None = None + ) -> Response: + """Return one public dataset for sync after a local Redis event.""" + dataset = get_object_or_404(Dataset, pk=pk, is_deleted=False) + if not is_federation_exportable_dataset(dataset): + return Response( + {"detail": "Dataset is not available for federation export."}, + status=404, + ) + return Response(compile_federated_dataset_doc(dataset)) + + @action(detail=False, methods=["get"], url_path="export/captures") + def export_captures_list(self, request: Request) -> Response: + """List all public captures for federation bootstrap.""" + captures = public_captures_queryset() + return Response( + [compile_federated_capture_doc(capture) for capture in captures], + ) + + @action( + detail=False, + methods=["get"], + url_path=r"export/captures/(?P[^/.]+)", + ) + def export_capture_detail( + self, request: Request, pk: str | None = None + ) -> Response: + """Return one public capture for sync after a local Redis event.""" + capture = get_object_or_404(Capture, pk=pk, is_deleted=False) + if not is_federation_exportable_capture(capture): + return Response( + {"detail": "Capture is not available for federation export."}, + status=404, + ) + return Response(compile_federated_capture_doc(capture)) diff --git a/gateway/sds_gateway/api_methods/views/file_endpoints.py b/gateway/sds_gateway/api_methods/views/file_endpoints.py index 0a441566b..1e9308df8 100644 --- a/gateway/sds_gateway/api_methods/views/file_endpoints.py +++ b/gateway/sds_gateway/api_methods/views/file_endpoints.py @@ -24,7 +24,6 @@ from rest_framework import status from rest_framework.decorators import action from rest_framework.pagination import PageNumberPagination -from rest_framework.permissions import IsAuthenticated from rest_framework.request import Request from rest_framework.response import Response from rest_framework.views import APIView @@ -65,7 +64,6 @@ class FilePagination(PageNumberPagination): class FileViewSet(ViewSet): authentication_classes = [APIKeyAuthentication] - permission_classes = [IsAuthenticated] @staticmethod def _paginated_list_response( @@ -698,7 +696,6 @@ def download_file(self, request: Request, pk: str | None = None) -> HttpResponse class CheckFileContentsExistView(APIView): authentication_classes = [APIKeyAuthentication] - permission_classes = [IsAuthenticated] @extend_schema( request=FilePostSerializer, diff --git a/gateway/sds_gateway/users/management/commands/create_federation_sync_api_key.py b/gateway/sds_gateway/users/management/commands/create_federation_sync_api_key.py new file mode 100644 index 000000000..2c7ede4e2 --- /dev/null +++ b/gateway/sds_gateway/users/management/commands/create_federation_sync_api_key.py @@ -0,0 +1,46 @@ +"""Create the federation sync service user and API key.""" + +from django.conf import settings +from django.contrib.auth import get_user_model +from django.core.management.base import BaseCommand +from loguru import logger as log + +from sds_gateway.api_methods.models import KeySources +from sds_gateway.users.models import UserAPIKey + + +class Command(BaseCommand): + """Provision federation-sync@internal user and a FederationSync API key.""" + + help = "Create federation sync service user and API key (prints raw key once)." + + def handle(self, *args, **options) -> None: + user_model = get_user_model() + email = settings.FEDERATION_SYNC_USER_EMAIL + + user, created = user_model.objects.get_or_create( + email=email, + defaults={ + "is_active": True, + "is_approved": True, + }, + ) + if created: + user.set_unusable_password() + user.save(update_fields=["password"]) + log.info("Created federation sync user {}", email) + else: + log.info("Using existing federation sync user {}", email) + + UserAPIKey.objects.filter( + user=user, + source=KeySources.FederationSync, + ).delete() + + _obj, raw_key = UserAPIKey.objects.create_key( + name="federation-sync", + user=user, + source=KeySources.FederationSync, + description="Federation sync service (export endpoints only)", + ) + self.stdout.write(self.style.SUCCESS(f"Federation sync API key: {raw_key}")) diff --git a/gateway/sds_gateway/users/management/commands/create_test_files.py b/gateway/sds_gateway/users/management/commands/create_test_files.py index 9c6f0b553..b85b28e0d 100644 --- a/gateway/sds_gateway/users/management/commands/create_test_files.py +++ b/gateway/sds_gateway/users/management/commands/create_test_files.py @@ -10,9 +10,9 @@ from django.db import transaction from django.utils import timezone -from sds_gateway.captures.models import Capture -from sds_gateway.files.models import File -from sds_gateway.minio_client import MinioClient +from sds_gateway.api_methods.models import Capture +from sds_gateway.api_methods.models import File +from sds_gateway.api_methods.utils.minio_client import get_minio_client User = get_user_model() # This is the only User import we need @@ -69,7 +69,7 @@ def _create_test_files(self, user): directory_path.mkdir(parents=True, exist_ok=True) # Create files - minio_client = MinioClient() + minio_client = get_minio_client() created_files = [] try: diff --git a/gateway/sds_gateway/users/migrations/0012_alter_userapikey_source_federation_sync.py b/gateway/sds_gateway/users/migrations/0012_alter_userapikey_source_federation_sync.py new file mode 100644 index 000000000..084b10c0b --- /dev/null +++ b/gateway/sds_gateway/users/migrations/0012_alter_userapikey_source_federation_sync.py @@ -0,0 +1,38 @@ +# Generated manually for FederationSync API key source + +from django.db import migrations, models + +import sds_gateway.api_methods.models + + +class Migration(migrations.Migration): + + dependencies = [ + ("users", "0011_user_orcid_id_alter_user_is_approved"), + ] + + operations = [ + migrations.AlterField( + model_name="userapikey", + name="source", + field=models.CharField( + choices=[ + (sds_gateway.api_methods.models.KeySources["SDSWebUI"], "SDS Web UI"), + ( + sds_gateway.api_methods.models.KeySources["SVIBackend"], + "SVI Backend", + ), + ( + sds_gateway.api_methods.models.KeySources["SVIWebUI"], + "SVI Web UI", + ), + ( + sds_gateway.api_methods.models.KeySources["FederationSync"], + "Federation Sync", + ), + ], + default=sds_gateway.api_methods.models.KeySources["SDSWebUI"], + max_length=255, + ), + ), + ] diff --git a/gateway/sds_gateway/users/migrations/max_migration.txt b/gateway/sds_gateway/users/migrations/max_migration.txt index fda121cc0..7707335c4 100644 --- a/gateway/sds_gateway/users/migrations/max_migration.txt +++ b/gateway/sds_gateway/users/migrations/max_migration.txt @@ -1 +1 @@ -0011_user_orcid_id_alter_user_is_approved +0012_alter_userapikey_source_federation_sync \ No newline at end of file diff --git a/gateway/sds_gateway/users/models.py b/gateway/sds_gateway/users/models.py index b1a31d0db..f5b23370f 100644 --- a/gateway/sds_gateway/users/models.py +++ b/gateway/sds_gateway/users/models.py @@ -64,6 +64,7 @@ class UserAPIKey(AbstractAPIKey): (KeySources.SDSWebUI, "SDS Web UI"), (KeySources.SVIBackend, "SVI Backend"), (KeySources.SVIWebUI, "SVI Web UI"), + (KeySources.FederationSync, "Federation Sync"), ] user = cast( "User",