Skip to content
This repository was archived by the owner on Jun 3, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/danger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ concurrency:
jobs:
danger:
name: Danger Review
if: github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository
runs-on: ubuntu-latest
timeout-minutes: 10

Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/deploy-staging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ jobs:
# ── 1. Deploy PR branch to staging EC2 ─────────────────────────────────────
deploy:
name: Deploy to staging EC2
if: github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository
runs-on: ubuntu-latest
timeout-minutes: 20
environment:
Expand Down Expand Up @@ -133,6 +134,7 @@ jobs:
# ── 2. Smoke tests against live staging ────────────────────────────────────
smoke-test:
name: Smoke test staging
if: github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository
runs-on: ubuntu-latest
needs: deploy
timeout-minutes: 10
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/security-scan.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
cache: pip

- name: Install bandit
run: pip install bandit[toml]
run: pip install bandit[toml] bandit-sarif-formatter

- name: Run Bandit
run: |
Expand All @@ -41,6 +41,7 @@ jobs:
continue-on-error: true

- name: Upload Bandit SARIF
if: github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: bandit-results.sarif
Expand Down
43 changes: 7 additions & 36 deletions src/api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@
import hashlib
import hmac
import logging
import time
from collections import defaultdict
from typing import TYPE_CHECKING, Optional

from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt

from src.config import settings
from src.database.control_plane_store import control_plane_store
from src.database.api_key_store import APIKeyStore
from src.database.user_store import UserStore
from src.pipelines.ingest import IngestPipeline
Expand Down Expand Up @@ -288,48 +287,20 @@ async def require_user(current_user: Optional[dict] = Depends(get_current_user))


# ═══════════════════════════════════════════════════════════════════════════
# Sliding-window rate limiter (in-process, per-key)
# Sliding-window rate limiter
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _SlidingWindowRateLimiter class and its instance _rate_limiter appear to be redundant now that rate limiting logic has been moved to ControlPlaneStore. Consider removing them and updating the associated tests to avoid maintaining dead code.

# ═══════════════════════════════════════════════════════════════════════════

class _SlidingWindowRateLimiter:
"""Thread-safe sliding-window counter keyed by API identity."""

def __init__(self, max_requests: int, window_seconds: int = 60):
self.max_requests = max_requests
self.window = window_seconds
self._hits: dict[str, list[float]] = defaultdict(list)
self._lock = asyncio.Lock()

async def check(self, key: str) -> tuple[bool, int]:
"""Return (allowed, remaining) for *key*."""
now = time.monotonic()
cutoff = now - self.window

async with self._lock:
timestamps = self._hits[key]
self._hits[key] = [t for t in timestamps if t > cutoff]

if len(self._hits[key]) >= self.max_requests:
return False, 0

self._hits[key].append(now)
remaining = self.max_requests - len(self._hits[key])
return True, remaining


_rate_limiter = _SlidingWindowRateLimiter(
max_requests=settings.rate_limit,
window_seconds=60,
)


async def enforce_rate_limit(
request: Request,
user: dict = Depends(require_api_key),
) -> dict:
"""Raise 429 if the caller has exceeded their per-minute quota."""
identity = user.get("id", "anonymous")
allowed, remaining = await _rate_limiter.check(identity)
allowed, remaining = await control_plane_store.check_rate_limit(
identity,
max_requests=settings.rate_limit,
window_seconds=60,
)

request.state.rate_limit_remaining = remaining

Expand Down
33 changes: 15 additions & 18 deletions src/api/routes/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

from src.config import settings
from src.config.analytics import analytics
from src.database.control_plane_store import control_plane_store

logger = logging.getLogger("xmem.api.admin")

Expand All @@ -44,7 +45,6 @@
# ═══════════════════════════════════════════════════════════════════════════

_admin_collection = None
_admin_sessions: Dict[str, Dict[str, Any]] = {} # token → {user, expires}


def _get_admin_collection():
Expand Down Expand Up @@ -79,23 +79,22 @@ class AdminLoginRequest(BaseModel):
password: str


def _verify_admin_token(request: Request) -> Dict[str, Any]:
async def _verify_admin_token(request: Request) -> Dict[str, Any]:
"""Validate admin session token from cookie or Authorization header."""
token = request.cookies.get("xmem_admin_token")
if not token:
auth = request.headers.get("Authorization", "")
if auth.startswith("Bearer "):
token = auth[7:]

if not token or token not in _admin_sessions:
if not token:
raise HTTPException(status_code=401, detail="Not authenticated")

session = _admin_sessions[token]
if datetime.now(timezone.utc) > session["expires"]:
del _admin_sessions[token]
user = await control_plane_store.get_admin_session_async(token)
if not user:
raise HTTPException(status_code=401, detail="Session expired")

return session["user"]
return user


# ═══════════════════════════════════════════════════════════════════════════
Expand All @@ -114,11 +113,11 @@ async def admin_login(req: AdminLoginRequest):
raise HTTPException(status_code=401, detail="Invalid credentials")

# Generate session token
token = hashlib.sha256(f"{req.username}{time.time()}".encode()).hexdigest()
_admin_sessions[token] = {
"user": {"username": user["username"], "role": user.get("role", "admin")},
"expires": datetime.now(timezone.utc) + timedelta(hours=24),
}
session = await control_plane_store.create_admin_session_async(
user={"username": user["username"], "role": user.get("role", "admin")},
ttl_seconds=24 * 60 * 60,
)
token = session["token"]

response = JSONResponse({"status": "ok", "token": token, "username": user["username"]})
response.set_cookie(
Expand All @@ -134,8 +133,8 @@ async def admin_login(req: AdminLoginRequest):
@router.post("/api/logout")
async def admin_logout(request: Request):
token = request.cookies.get("xmem_admin_token")
if token and token in _admin_sessions:
del _admin_sessions[token]
if token:
await control_plane_store.delete_admin_session_async(token)
response = JSONResponse({"status": "ok"})
response.delete_cookie("xmem_admin_token")
return response
Expand Down Expand Up @@ -219,7 +218,7 @@ async def ws_live_logs(websocket: WebSocket):

# Validate auth token from query param
token = websocket.query_params.get("token", "")
if token not in _admin_sessions:
if not token or not (await control_plane_store.get_admin_session_async(token)):
await websocket.close(code=4001, reason="Not authenticated")
return

Expand Down Expand Up @@ -313,7 +312,7 @@ async def _journal_stream():

if not line:
# journalctl exited — send error event and stop
yield f"event: error\ndata: journalctl process exited\n\n"
yield "event: error\ndata: journalctl process exited\n\n"
break

text = line.decode("utf-8", errors="replace").rstrip("\n")
Expand Down Expand Up @@ -385,8 +384,6 @@ async def analytics_summary(request: Request, user: dict = Depends(_verify_admin
now = datetime.now(timezone.utc)
last_24h = now - timedelta(hours=24)
last_7d = now - timedelta(days=7)
last_30d = now - timedelta(days=30)

# API call stats (last 24h)
api_calls_24h = list(collection.aggregate([
{"$match": {"event": "api_call", "ts": {"$gte": last_24h}}},
Expand Down
Loading
Loading