Skip to content

Commit 976dbd2

Browse files
authored
Merge pull request #527 from lbedner/streaming-worker
Streaming worker
2 parents adb3d71 + 53095fa commit 976dbd2

31 files changed

Lines changed: 2350 additions & 538 deletions

aegis/core/post_gen_tasks.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,11 @@ def is_enabled(key: str) -> bool:
337337
for taskiq_file in queues_dir.glob("*_taskiq.py"):
338338
taskiq_file.unlink()
339339

340-
# arq: Remove TaskIQ pool, API, and registry files
340+
# arq: Remove TaskIQ middleware, pool, API, and registry files
341+
middleware_taskiq = worker_dir / "middleware_taskiq.py"
342+
if middleware_taskiq.exists():
343+
middleware_taskiq.unlink()
344+
341345
pools_taskiq = worker_dir / "pools_taskiq.py"
342346
if pools_taskiq.exists():
343347
pools_taskiq.unlink()

aegis/templates/copier-aegis-project/{{ project_slug }}/app/cli/load_test.py.jinja

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class QueueChoice(str, Enum):
5454
@app.command("run")
5555
def run_load_test(
5656
num_tasks: int = typer.Option(
57-
100, "--tasks", "-n", help="Number of tasks to spawn", min=10, max=10000
57+
100, "--tasks", "-n", help="Number of tasks to spawn", min=1
5858
),
5959
task_type: LoadTestTypes = typer.Option(
6060
LoadTestTypes.CPU_INTENSIVE, "--type", "-t", help="Type of load test to run"
@@ -131,7 +131,7 @@ def run_load_test(
131131
@app.command("cpu")
132132
def quick_cpu_test_cmd(
133133
num_tasks: int = typer.Option(
134-
50, "--tasks", "-n", help="Number of CPU tasks", min=10, max=1000
134+
50, "--tasks", "-n", help="Number of CPU tasks", min=1
135135
),
136136
wait: bool = typer.Option(True, "--wait/--no-wait", help="Wait for completion"),
137137
) -> None:
@@ -178,7 +178,7 @@ def quick_cpu_test_cmd(
178178
@app.command("io")
179179
def quick_io_test_cmd(
180180
num_tasks: int = typer.Option(
181-
100, "--tasks", "-n", help="Number of I/O tasks", min=10, max=1000
181+
100, "--tasks", "-n", help="Number of I/O tasks", min=1
182182
),
183183
wait: bool = typer.Option(True, "--wait/--no-wait", help="Wait for completion"),
184184
) -> None:
@@ -227,7 +227,7 @@ def quick_io_test_cmd(
227227
@app.command("memory")
228228
def quick_memory_test_cmd(
229229
num_tasks: int = typer.Option(
230-
200, "--tasks", "-n", help="Number of memory tasks", min=10, max=1000
230+
200, "--tasks", "-n", help="Number of memory tasks", min=1
231231
),
232232
wait: bool = typer.Option(True, "--wait/--no-wait", help="Wait for completion"),
233233
) -> None:
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
{%- if include_worker %}
2+
"""Server-Sent Events endpoint for real-time worker event streaming."""
3+
4+
import asyncio
5+
import json
6+
from collections.abc import AsyncGenerator
7+
8+
import redis.asyncio as aioredis
9+
from fastapi import APIRouter, Request
10+
from starlette.responses import StreamingResponse
11+
12+
from app.components.worker.events import WORKER_EVENT_STREAM, read_queue_totals
13+
from app.core.config import settings
14+
from app.core.log import logger
15+
16+
router = APIRouter()
17+
18+
# XREAD block timeout in milliseconds (5 seconds)
19+
_XREAD_BLOCK_MS = 5000
20+
21+
# Maximum messages to read per XREAD call — high enough to drain the stream
22+
# faster than workers produce events during load tests
23+
_XREAD_COUNT = 500
24+
25+
26+
27+
@router.get("/worker/stream")
28+
async def worker_event_stream(request: Request) -> StreamingResponse:
29+
"""
30+
Stream real-time worker events via Server-Sent Events (SSE).
31+
32+
On connect, sends a "totals" event with absolute counters read from
33+
arq's Redis keys. Then tails the Redis Stream and forwards individual
34+
events as deltas. The frontend is responsible for batching UI updates.
35+
"""
36+
37+
async def event_generator() -> AsyncGenerator[str, None]:
38+
redis_url = (
39+
settings.redis_url_effective
40+
if hasattr(settings, "redis_url_effective")
41+
else settings.REDIS_URL
42+
)
43+
redis_client: aioredis.Redis = aioredis.from_url(
44+
redis_url, decode_responses=True
45+
)
46+
try:
47+
# Send initial baseline totals (read from Redis ONCE)
48+
totals = await read_queue_totals(redis_client)
49+
yield f"data: {json.dumps({'type': 'totals', 'queues': totals})}\n\n"
50+
logger.debug(f"SSE: sent initial totals baseline: {totals}")
51+
52+
# Start from latest entries only (no replay of old events)
53+
last_id = "$"
54+
55+
while True:
56+
# Check if client disconnected
57+
if await request.is_disconnected():
58+
break
59+
60+
results = await redis_client.xread(
61+
{WORKER_EVENT_STREAM: last_id},
62+
block=_XREAD_BLOCK_MS,
63+
count=_XREAD_COUNT,
64+
)
65+
66+
if not results:
67+
# No events within the block timeout — send keepalive
68+
yield ": heartbeat\n\n"
69+
continue
70+
71+
# Forward every stream event as-is
72+
msg_count = 0
73+
for _stream_name, messages in results:
74+
last_id = messages[-1][0]
75+
msg_count += len(messages)
76+
for _msg_id, data in messages:
77+
yield f"data: {json.dumps(data)}\n\n"
78+
79+
logger.debug(f"SSE: forwarded {msg_count} events")
80+
81+
# No sleep needed — XREAD(block=5s) naturally waits
82+
# when the stream is empty. Events are forwarded instantly.
83+
84+
except asyncio.CancelledError:
85+
# Client disconnected — normal SSE lifecycle
86+
pass
87+
except Exception as e:
88+
logger.warning(f"SSE stream error: {e}")
89+
finally:
90+
await redis_client.aclose()
91+
92+
return StreamingResponse(
93+
event_generator(),
94+
media_type="text/event-stream",
95+
headers={
96+
"Cache-Control": "no-cache",
97+
"X-Accel-Buffering": "no",
98+
},
99+
)
100+
{%- endif %}

aegis/templates/copier-aegis-project/{{ project_slug }}/app/components/backend/api/models.py.jinja

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,7 @@ class LoadTestRequest(BaseModel):
101101
num_tasks: int = Field(
102102
100,
103103
description="Number of tasks to spawn for the load test",
104-
ge=10,
105-
le=10000
104+
ge=1,
106105
)
107106
task_type: LoadTestTypes = Field(
108107
LoadTestTypes.CPU_INTENSIVE,

aegis/templates/copier-aegis-project/{{ project_slug }}/app/components/backend/api/routing.py.jinja

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ from fastapi import FastAPI
22

33
from app.components.backend.api import health
44
{%- if include_worker %}
5+
from app.components.backend.api import events as worker_events
56
from app.components.backend.api import worker
67
{%- endif %}
78
{%- if scheduler_backend != "memory" %}
@@ -32,6 +33,7 @@ def include_routers(app: FastAPI) -> None:
3233
app.include_router(health.router, prefix="/health", tags=["health"])
3334
{%- if include_worker %}
3435
app.include_router(worker.router, prefix="/api/v1", tags=["worker"])
36+
app.include_router(worker_events.router, prefix="/events", tags=["events"])
3537
{%- endif %}
3638
{%- if scheduler_backend != "memory" %}
3739
app.include_router(scheduler.router, prefix="/api/v1", tags=["scheduler"])

aegis/templates/copier-aegis-project/{{ project_slug }}/app/components/backend/api/worker.py.jinja

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ from app.components.backend.api.models import (
1515
TaskStatusResponse,
1616
)
1717
from app.components.worker.constants import LoadTestTypes
18+
from app.components.worker.events import publish_event
1819
from app.components.worker.pools import get_queue_pool
1920
from app.components.worker.tasks import get_task_by_name, list_available_tasks
2021
from app.core.config import get_default_queue
@@ -93,11 +94,18 @@ async def enqueue_task(task_request: TaskRequest) -> TaskResponse:
9394
if task_request.delay_seconds:
9495
estimated_start = queued_at + timedelta(seconds=task_request.delay_seconds)
9596

96-
await pool.aclose()
97-
9897
if job is None:
98+
await pool.aclose()
9999
raise HTTPException(status_code=500, detail="Failed to enqueue task")
100100

101+
# Publish enqueue event for real-time dashboard updates
102+
await publish_event(
103+
pool, "job.enqueued", task_request.queue_type,
104+
{"job_id": job.job_id, "task": task_request.task_name},
105+
)
106+
107+
await pool.aclose()
108+
101109
logger.info(f"Task enqueued: {job.job_id} ({task_request.task_name})")
102110

103111
return TaskResponse(

aegis/templates/copier-aegis-project/{{ project_slug }}/app/components/backend/startup/database_init.py.jinja

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""
22
Database initialization startup hook.
33

4-
Runs Alembic migrations and {% if database_engine == "sqlite" %}ensures directory structure exists{% else %}verifies connectivity{% endif %}
4+
{% if database_engine == "sqlite" %}Ensures directory structure exists and creates tables{% else %}Runs Alembic migrations and verifies connectivity{% endif %}
55
when the backend starts up (only when database component is included).
66
"""
77
{% if include_database %}
@@ -16,7 +16,7 @@ from app.core.log import logger
1616
from app.models.user import User # noqa: F401
1717
{% endif %}
1818

19-
19+
{% if database_engine == "postgres" %}
2020
def _check_and_stamp_existing_tables() -> None:
2121
"""
2222
Detect when tables for a pending migration already exist.
@@ -155,16 +155,19 @@ def _run_migrations() -> bool:
155155
except Exception as e:
156156
logger.error(f"Failed to run migrations: {e}")
157157
return False
158+
{% endif %}
158159

159160

160161
async def startup_database_init() -> None:
161162
"""
162163
Initialize database and run migrations.
163164

164165
This hook runs when the backend starts to:
165-
1. {% if database_engine == "sqlite" %}Ensure database directory exists{% else %}Verify database connectivity{% endif %}
166+
1. {% if database_engine == "sqlite" %}Ensure database directory exists and create tables{% else %}Verify database connectivity{% endif %}
167+
{% if database_engine == "postgres" %}
166168
2. Run Alembic migrations (idempotent - safe to run multiple times)
167169
3. Verify database schema is ready
170+
{%- endif %}
168171
{%- if include_ai and database_engine == "postgres" %}
169172
4. Sync LLM catalog (AI service, postgres only)
170173
{%- endif %}
@@ -175,33 +178,47 @@ async def startup_database_init() -> None:
175178
from app.core.db import DATABASE_PATH
176179
db_path = Path(DATABASE_PATH)
177180
db_path.parent.mkdir(parents=True, exist_ok=True)
181+
182+
# Create tables via SQLModel (no Alembic needed for SQLite)
183+
from sqlmodel import SQLModel
184+
185+
from app.core.db import engine
186+
SQLModel.metadata.create_all(engine)
187+
logger.info("Database tables created/verified (SQLite)")
178188
{% endif %}
179189

190+
{% if database_engine == "postgres" %}
180191
# Run Alembic migrations (idempotent)
181192
migrations_ok = _run_migrations()
182193

183194
if not migrations_ok:
184195
logger.warning("Migrations failed - database may not be fully initialized")
185196
# Continue anyway to allow debugging
197+
{% endif %}
186198

187199
# Verify database connectivity
188200
try:
189-
from app.core.db import db_session
190-
from sqlmodel import text
191201
from sqlalchemy import inspect
202+
from sqlmodel import text
203+
204+
from app.core.db import db_session
192205

193206
with db_session(autocommit=False) as session:
194207
# Basic connectivity check
195208
session.exec(text("SELECT 1"))
196209

197-
# Verify alembic_version table exists
198210
inspector = inspect(session.connection())
199211
table_names = inspector.get_table_names()
212+
{% if database_engine == "postgres" %}
200213

214+
# Verify alembic_version table exists
201215
if "alembic_version" in table_names:
202216
logger.info(f"Database ready with {len(table_names)} tables")
203217
else:
204218
logger.warning("alembic_version table missing after migrations")
219+
{% else %}
220+
logger.info(f"Database ready with {len(table_names)} tables")
221+
{% endif %}
205222

206223
except Exception as e:
207224
logger.warning(f"Database verification failed: {e}")
@@ -232,4 +249,4 @@ async def startup_database_init() -> None:
232249

233250
# Export the startup hook function
234251
startup_hook = startup_database_init
235-
{% endif %}
252+
{% endif %}

aegis/templates/copier-aegis-project/{{ project_slug }}/app/components/frontend/dashboard/cards/card_utils.py.jinja

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ No inheritance or ABC complexity - just common functionality extracted.
88
from collections.abc import Callable
99

1010
import flet as ft
11+
{%- if include_observability %}
1112
import logfire
13+
{%- endif %}
1214
from app.components.frontend.controls import (
1315
H3Text,
1416
LabelText,
@@ -434,7 +436,9 @@ def create_progress_indicator(
434436
)
435437

436438

439+
{%- if include_observability %}
437440
@logfire.instrument("overseer.modal.create {component_name}")
441+
{%- endif %}
438442
def create_modal_for_component(
439443
component_name: str, component_data: ComponentStatus, page: ft.Page
440444
) -> ft.Container | None:
@@ -526,7 +530,9 @@ def create_modal_for_component(
526530
return None
527531

528532

533+
{%- if include_observability %}
529534
@logfire.instrument("overseer.modal.open {component_name}")
535+
{%- endif %}
530536
def _open_modal(
531537
component_name: str, component_data: ComponentStatus, page: ft.Page
532538
) -> None:
@@ -538,6 +544,10 @@ def _open_modal(
538544
if popup:
539545
modal_cache[component_name] = popup
540546
page.overlay.append(popup)
547+
else:
548+
# Refresh cached modal with latest health check data
549+
if hasattr(popup, "update_data"):
550+
popup.update_data(component_data)
541551
if popup:
542552
popup.show()
543553
page.update()

aegis/templates/copier-aegis-project/{{ project_slug }}/app/components/frontend/dashboard/diagram/diagram_node.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from app.services.system.models import ComponentStatus, ComponentStatusType
1111
from app.services.system.ui import get_component_label
1212

13-
from ..cards.card_utils import create_modal_for_component, get_ai_engine_display
13+
from ..cards.card_utils import _open_modal, get_ai_engine_display
1414

1515
# Component configuration: display names and modal routing
1616
# Subtitles are generated dynamically via get_component_label()
@@ -207,15 +207,7 @@ def _handle_click(self, e: ft.ControlEvent) -> None:
207207
if not e.page:
208208
return
209209

210-
popup = create_modal_for_component(
211-
self._modal_name,
212-
self._component_data,
213-
e.page,
214-
)
215-
if popup:
216-
e.page.overlay.append(popup)
217-
popup.show()
218-
e.page.update()
210+
_open_modal(self._modal_name, self._component_data, e.page)
219211

220212
def update_data(self, component_data: ComponentStatus) -> None:
221213
"""

0 commit comments

Comments
 (0)