Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions gateway/.envs/example/django.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@ HOSTNAME=localhost:8000
API_VERSION=v1
API_KEY=

# FEDERATION
# ------------------------------------------------------------------------------
# Peer sync uses the federation-sync Docker service (same sds-network as gateway;
# service definition lives under /federation). Bootstrap: enable federation, run
# create_federation_sync_api_key, pass the key to federation-sync. Set FEDERATION_SITE_NAME
# (e.g. crc) when enabling federation; use SDS_SITE_FQDN for the public host (RFC [site].fqdn).
# FEDERATION_ENABLED=true # Master switch for export APIs and Redis federation events.
# FEDERATION_SITE_NAME=crc # RFC [site].name (short peer id); set SDS_SITE_FQDN separately for [site].fqdn.
# FEDERATION_EVENTS_CHANNEL=federation:events # Redis pub/sub channel federation-sync subscribes to.
# FEDERATION_SYNC_HEALTH_URL=http://federation-sync:8000/sync/health # Health probe target (federation-sync service).
# FEDERATION_SYNC_USER_EMAIL=federation-sync@internal.local # Service user email for create_federation_sync_api_key.
# FEDERATION_EXPORT_ALLOWED_CIDRS= # Comma-separated CIDRs allowed to call export (default: private Docker ranges).

Comment thread
klpoland marked this conversation as resolved.
# AUTH0
# ------------------------------------------------------------------------------
# Set these from your Auth0 application settings
Expand Down
2 changes: 2 additions & 0 deletions gateway/config/api_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from sds_gateway.api_methods.views.auth_endpoints import ValidateAuthViewSet
from sds_gateway.api_methods.views.capture_endpoints import CaptureViewSet
from sds_gateway.api_methods.views.dataset_endpoints import DatasetViewSet
from sds_gateway.api_methods.views.federation_endpoints import FederationViewSet
from sds_gateway.api_methods.views.file_endpoints import FileViewSet
from sds_gateway.api_methods.views.file_endpoints import check_contents_exist
from sds_gateway.users.api.views import UserViewSet
Expand All @@ -17,6 +18,7 @@
router.register(r"assets/files", FileViewSet, basename="files")
router.register(r"assets/captures", CaptureViewSet, basename="captures")
router.register(r"assets/datasets", DatasetViewSet, basename="datasets")
router.register(r"federation", FederationViewSet, basename="federation")

if settings.VISUALIZATIONS_ENABLED:
router.register(r"visualizations", VisualizationViewSet, basename="visualizations")
Expand Down
61 changes: 60 additions & 1 deletion gateway/config/settings/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Base settings to build other settings files upon."""
# ruff: noqa: ERA001

import ipaddress
import random
import string
from pathlib import Path
Expand All @@ -23,6 +24,8 @@ def __get_random_token(length: int) -> str:
__rng.choice(string.ascii_letters + string.digits) for _ in range(length)
)

def _parse_cidrs(raw: list[str]) -> list[ipaddress.IPv4Network | ipaddress.IPv6Network]:
return [ipaddress.ip_network(item.strip(), strict=False) for item in raw]

env.read_env()

Expand Down Expand Up @@ -610,7 +613,10 @@ def _strip_endpoint_scheme(endpoint_url: str) -> str:
"rest_framework.authentication.SessionAuthentication",
"sds_gateway.api_methods.authentication.APIKeyAuthentication",
),
"DEFAULT_PERMISSION_CLASSES": ("rest_framework.permissions.IsAuthenticated",),
"DEFAULT_PERMISSION_CLASSES": (
"rest_framework.permissions.IsAuthenticated",
"sds_gateway.api_methods.permissions.DisallowFederationSyncKey",
),
"DEFAULT_SCHEMA_CLASS": "drf_spectacular.openapi.AutoSchema",
"DEFAULT_THROTTLE_RATES": {
"vis_stream": VIS_STREAM_THROTTLE_RATE,
Expand Down Expand Up @@ -709,6 +715,59 @@ def _strip_endpoint_scheme(endpoint_url: str) -> str:
SDS_PROGRAMMATIC_SITE_NAME: str = env.str("SDS_PROGRAMMATIC_SITE_NAME", default="sds")
SDS_SITE_FQDN: str = env.str("SDS_SITE_FQDN", default="localhost")

# Federation peer short name (RFC [site].name, e.g. crc, haystack); not SDS_PROGRAMMATIC_SITE_NAME.
FEDERATION_SITE_NAME: str = env.str("FEDERATION_SITE_NAME", default="").strip()
# Master switch: when False, federation export and Redis events are inactive.
FEDERATION_ENABLED: bool = env.bool("FEDERATION_ENABLED", default=False)
FEDERATION_EVENTS_CHANNEL: str = env.str(
"FEDERATION_EVENTS_CHANNEL",
default="federation:events",
)
FEDERATION_SYNC_USER_EMAIL: str = env.str(
"FEDERATION_SYNC_USER_EMAIL",
default="federation-sync@internal.local",
)
FEDERATION_SYNC_HEALTH_URL: str = env.str(
"FEDERATION_SYNC_HEALTH_URL",
default="http://federation-sync:8000/sync/health",
)
FEDERATION_SYNC_HEALTH_PROBE_TIMEOUT: float = env.float(
"FEDERATION_SYNC_HEALTH_PROBE_TIMEOUT",
default=2.0,
)
FEDERATION_SKIP_SYNC_HEALTH_PROBE: bool = env.bool(
"FEDERATION_SKIP_SYNC_HEALTH_PROBE",
default=False,
)
FEDERATION_SKIP_SYNC_API_KEY_CHECK: bool = env.bool(
"FEDERATION_SKIP_SYNC_API_KEY_CHECK",
default=False,
)
FEDERATION_SKIP_REDIS_PROBE: bool = env.bool(
"FEDERATION_SKIP_REDIS_PROBE",
default=False,
)
# Set at startup / periodic recheck by federation.availability.
FEDERATION_OPERATIONAL: bool = False
FEDERATION_OPERATIONAL_REASON: str = ""
# Tests may set via override_settings without running probes.
FEDERATION_OPERATIONAL_OVERRIDE: bool | None = None
# Export API: internal Docker/private networks (sync → django on sds-network).
FEDERATION_EXPORT_ALLOWED_CIDRS: list[
ipaddress.IPv4Network | ipaddress.IPv6Network
] = _parse_cidrs(
env.list(
"FEDERATION_EXPORT_ALLOWED_CIDRS",
default=[
"127.0.0.1/32",
"::1/128",
"10.0.0.0/8",
"172.16.0.0/12",
"192.168.0.0/16",
],
),
)

Comment thread
klpoland marked this conversation as resolved.
# ADMIN_CONSOLE_ENV is used to visually distinguish between different environments
# (production, staging, local) in the admin console and error emails. It does not affect
# any functionality and it is meant to prevent changes in production meant for testing
Expand Down
4 changes: 4 additions & 0 deletions gateway/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@

# https://deptry.com/usage/#per-rule-ignores
[tool.deptry.per_rule_ignores]
DEP001 = [
# optional monorepo sibling; contract tests add federation/ to sys.path
"sds_federation",
]
DEP002 = [
# packages that are installed but not imported
"argon2-cffi", # used by django for argon2 password hashing
Expand Down
33 changes: 20 additions & 13 deletions gateway/scripts/fallow-cross-file-dupes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,31 @@
set -euo pipefail
cd "$(dirname "$0")/.."

# Use first available: vpx > bunx > npx
if command -v pnpx &>/dev/null; then
RUNNER=pnpx
elif command -v vpx &>/dev/null; then
RUNNER=vpx
elif command -v bunx &>/dev/null; then
RUNNER=bunx
elif command -v npx &>/dev/null; then
RUNNER=npx
LOCAL_FALLOW="node_modules/.bin/fallow"
if [[ -x "${LOCAL_FALLOW}" ]]; then
"${LOCAL_FALLOW}" dupes --format json -q | jq -e '
[ (.clone_groups // .dupes.clone_groups // [])[]
| select((.instances | map(.file) | unique | length) > 1)
] | length == 0
' >/dev/null
else
echo "Error: neither vpx, bunx, nor npx found in PATH" >&2
exit 1
fi
# Use first available: vpx > bunx > npx (avoid pnpx; global pnpm may need newer Node)
if command -v vpx &>/dev/null; then
RUNNER=(vpx)
elif command -v bunx &>/dev/null; then
RUNNER=(bunx)
elif command -v npx &>/dev/null; then
RUNNER=(npx)
else
echo "Error: neither local fallow, vpx, bunx, nor npx found" >&2
exit 1
fi

"${RUNNER}" fallow dupes --format json -q | jq -e '
"${RUNNER[@]}" fallow dupes --format json -q | jq -e '
[ (.clone_groups // .dupes.clone_groups // [])[]
| select((.instances | map(.file) | unique | length) > 1)
] | length == 0
' >/dev/null
fi

echo "No cross-file clone groups detected."
6 changes: 6 additions & 0 deletions gateway/sds_gateway/api_methods/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ class ApiMethodsConfig(AppConfig):
# pattern to import application modules here in ready()
# ruff: noqa: PLC0415
def ready(self) -> None:
import sds_gateway.api_methods.federation.signals
import sds_gateway.api_methods.schema # noqa: F401
from sds_gateway.api_methods.federation.availability import (
initialize_federation_operational_state,
)

initialize_federation_operational_state()

silence_unwanted_logs()

Expand Down
4 changes: 2 additions & 2 deletions gateway/sds_gateway/api_methods/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
class APIKeyAuthentication(BaseAuthentication):
keyword = "Api-Key"

def authenticate(self, request) -> tuple[User, bool]:
def authenticate(self, request) -> tuple[User, UserAPIKey]:
"""Authenticates the user with their API key.
Args:
request: Contains the API key in the Authorization header.
Expand Down Expand Up @@ -47,7 +47,7 @@ def authenticate(self, request) -> tuple[User, bool]:
raise AuthenticationFailed(msg) from err

user = api_key_obj.user
return (user, True)
return (user, api_key_obj)

def authenticate_header(self, request) -> str:
return self.keyword
Empty file.
174 changes: 174 additions & 0 deletions gateway/sds_gateway/api_methods/federation/availability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
"""Federation operational status: config, sync health, Redis, sync API key."""

from __future__ import annotations

import ipaddress
import json
import time
import urllib.error
import urllib.request
from typing import Any

from django.conf import settings
from loguru import logger as log

from sds_gateway.api_methods.models import KeySources
from sds_gateway.api_methods.tasks import get_redis_client
from sds_gateway.users.models import UserAPIKey

_HTTP_OK = 200
_RECHECK_INTERVAL_SECONDS = 60.0
_last_evaluated_at: float = 0.0
_cached_operational: bool = False
_cached_reason: str = "not evaluated"


def _setting(name: str, *, default: Any = None) -> Any:
return getattr(settings, name, default)


def _export_allowed_networks() -> list[ipaddress.IPv4Network | ipaddress.IPv6Network]:
"""Networks from settings (parsed at startup); tests may override with strings."""
raw = _setting("FEDERATION_EXPORT_ALLOWED_CIDRS", default=[])
networks: list[ipaddress.IPv4Network | ipaddress.IPv6Network] = []
for item in raw:
if isinstance(item, (ipaddress.IPv4Network, ipaddress.IPv6Network)):
networks.append(item)
else:
networks.append(ipaddress.ip_network(str(item).strip(), strict=False))
return networks


def federation_client_ip(request) -> str | None:
"""Client IP for export access control (direct internal connections only)."""
remote = request.META.get("REMOTE_ADDR")
if remote:
return str(remote).strip()
return None


def is_client_ip_allowed_for_federation_export(request) -> bool:
cidrs = _export_allowed_networks()
if not cidrs:
return False
client_ip = federation_client_ip(request)
if not client_ip:
return False
try:
addr = ipaddress.ip_address(client_ip)
except ValueError:
return False
return any(addr in network for network in cidrs)


def _sync_health_ok() -> tuple[bool, str]: # noqa: PLR0911
if _setting("FEDERATION_SKIP_SYNC_HEALTH_PROBE", default=False):
return True, "health probe skipped"
url = (_setting("FEDERATION_SYNC_HEALTH_URL") or "").strip()
if not url:
return False, "FEDERATION_SYNC_HEALTH_URL is not set"
if not url.startswith(("http://", "https://")):
return False, "FEDERATION_SYNC_HEALTH_URL must be http(s)"
timeout = float(
_setting("FEDERATION_SYNC_HEALTH_PROBE_TIMEOUT", default=2.0),
)
request = urllib.request.Request(url, method="GET") # noqa: S310
try:
with urllib.request.urlopen(request, timeout=timeout) as response: # noqa: S310
if response.status != _HTTP_OK:
return False, f"sync health returned HTTP {response.status}"
body = response.read().decode("utf-8", errors="replace")
except urllib.error.URLError as exc:
return False, f"sync health probe failed: {exc.reason}"
except TimeoutError:
return False, "sync health probe timed out"
if not body.strip():
return True, "sync health returned 200"

try:
payload = json.loads(body)
except json.JSONDecodeError:
return True, "sync health returned 200"
Comment thread
klpoland marked this conversation as resolved.

if isinstance(payload, dict):
if payload.get("status") == "ok":
return True, "sync health ok"
status_value = payload.get("status")
return False, f"sync health status is not ok: {status_value!r}"

return True, "sync health returned 200"


def _sync_api_key_present() -> tuple[bool, str]:
if _setting("FEDERATION_SKIP_SYNC_API_KEY_CHECK", default=False):
return True, "sync API key check skipped"
exists = UserAPIKey.objects.filter(source=KeySources.FederationSync).exists()
if not exists:
return False, "no FederationSync API key in database"
return True, "FederationSync API key present"


def _redis_ok() -> tuple[bool, str]:
if not _setting("FEDERATION_ENABLED", default=False):
return True, "redis not required (federation disabled)"
if _setting("FEDERATION_SKIP_REDIS_PROBE", default=False):
return True, "redis probe skipped"

try:
client = get_redis_client()
client.ping()
except Exception as exc: # noqa: BLE001
return False, f"redis ping failed: {exc}"
return True, "redis ok"


def evaluate_federation_operational() -> tuple[bool, str]:
if not _setting("FEDERATION_ENABLED", default=False):
return False, "FEDERATION_ENABLED is False"

site_name = (_setting("FEDERATION_SITE_NAME", default="") or "").strip()
if not site_name:
return False, "FEDERATION_SITE_NAME must be set when federation is enabled"

for check in (_sync_api_key_present, _sync_health_ok, _redis_ok):
ok, reason = check()
if not ok:
return False, reason
return True, "federation operational"


def refresh_federation_operational_state(*, force: bool = False) -> tuple[bool, str]:
global _cached_operational, _cached_reason, _last_evaluated_at # noqa: PLW0603

now = time.monotonic()
if (
not force
and _last_evaluated_at
and (now - _last_evaluated_at) < _RECHECK_INTERVAL_SECONDS
):
return _cached_operational, _cached_reason

operational, reason = evaluate_federation_operational()
_cached_operational = operational
_cached_reason = reason
_last_evaluated_at = now
settings.FEDERATION_OPERATIONAL = operational
settings.FEDERATION_OPERATIONAL_REASON = reason
return operational, reason


def initialize_federation_operational_state() -> None:
operational, reason = refresh_federation_operational_state(force=True)
if operational:
log.info("Federation is operational: {}", reason)
else:
log.warning("Federation disabled: {}", reason)


def is_federation_operational() -> bool:
if _setting("FEDERATION_OPERATIONAL_OVERRIDE", default=None) is not None:
return bool(_setting("FEDERATION_OPERATIONAL_OVERRIDE"))
if not _setting("FEDERATION_ENABLED", default=False):
return False
operational, _reason = refresh_federation_operational_state()
return operational
Loading
Loading