Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/kernelbot/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,28 @@ def get_db():
return backend_instance.db


async def get_runner_queue_status(
gpu_type: str,
req: ProcessedSubmissionRequest | None = None,
) -> dict[str, Any]:
if not backend_instance:
raise HTTPException(status_code=500, detail="Bot instance not initialized")

config = None
if req is not None and req.task is not None:
config = {"lang": req.task.lang.value}

status = await backend_instance.get_runner_queue_status(gpu_type, config)
return status.to_dict()


async def get_submission_runner_queue_status(submission: dict) -> dict[str, Any] | None:
runs = submission.get("runs") or []
if not runs:
return None
return await get_runner_queue_status(runs[0]["runner"])


async def validate_cli_header(
x_popcorn_cli_id: Optional[str] = Header(None, alias="X-Popcorn-Cli-Id"),
db_context=Depends(get_db),
Expand Down Expand Up @@ -567,11 +589,13 @@ async def run_submission_async(
sub_id, job_status_id = await enqueue_background_job(
req, submission_mode_enum, backend_instance, background_submission_manager
)
runner_queue = await get_runner_queue_status(req.gpus[0], req)

return JSONResponse(
status_code=202,
content={
"details": {"id": sub_id, "job_status_id": job_status_id},
"runner_queue": runner_queue,
"status": "accepted",
},
)
Expand Down Expand Up @@ -874,6 +898,12 @@ async def get_gpus(
raise HTTPException(status_code=500, detail=f"Error fetching GPU data: {e}") from e


@app.get("/runner_queue/{gpu_type}")
async def get_runner_queue(gpu_type: str) -> dict:
await simple_rate_limit()
return await get_runner_queue_status(gpu_type)


@app.get("/submissions/{leaderboard_name}/{gpu_name}")
async def get_submissions(
leaderboard_name: str,
Expand Down Expand Up @@ -985,6 +1015,7 @@ async def get_user_submission(

# RunItem is a TypedDict (already a dict), select fields to expose
run_fields = ("start_time", "end_time", "mode", "secret", "runner", "score", "passed")
runner_queue = await get_submission_runner_queue_status(submission)
return {
"id": submission["submission_id"],
"leaderboard_id": submission["leaderboard_id"],
Expand All @@ -1000,6 +1031,7 @@ async def get_user_submission(
"error": submission.get("job_error"),
"last_heartbeat": submission.get("job_last_heartbeat"),
},
"runner_queue": runner_queue,
}
except HTTPException:
raise
Expand Down
27 changes: 26 additions & 1 deletion src/libkernelbot/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
enforce_submission_precheck,
should_precheck_submission,
)
from libkernelbot.launchers import Launcher
from libkernelbot.launchers import Launcher, RunnerQueueStatus
from libkernelbot.leaderboard_db import LeaderboardDB
from libkernelbot.report import (
MultiProgressReporter,
Expand Down Expand Up @@ -52,6 +52,31 @@ def register_launcher(self, launcher: Launcher):
for gpu in launcher.gpus:
self.launcher_map[gpu.value] = launcher

async def get_runner_queue_status(
self, gpu_name: str, config: dict | None = None
) -> RunnerQueueStatus:
gpu = get_gpu_by_name(gpu_name)
if gpu is None:
return RunnerQueueStatus(
runner="unknown",
gpu=gpu_name,
queued_jobs=None,
status="unavailable",
error="unknown gpu",
)

launcher = self.launcher_map.get(gpu.value)
if launcher is None:
return RunnerQueueStatus(
runner=gpu.runner,
gpu=gpu.name,
queued_jobs=None,
status="unavailable",
error="runner is not registered",
)

return await launcher.get_queue_status(gpu, config)

async def submit_full(
self,
req: ProcessedSubmissionRequest,
Expand Down
4 changes: 2 additions & 2 deletions src/libkernelbot/launchers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .github import GitHubLauncher
from .launcher import Launcher
from .launcher import Launcher, RunnerQueueStatus
from .modal import ModalLauncher

__all__ = [Launcher, GitHubLauncher, ModalLauncher]
__all__ = [Launcher, RunnerQueueStatus, GitHubLauncher, ModalLauncher]
62 changes: 46 additions & 16 deletions src/libkernelbot/launchers/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
)
from libkernelbot.utils import KernelBotError, setup_logging

from .launcher import Launcher
from .launcher import Launcher, RunnerQueueStatus

logger = setup_logging()

Expand All @@ -66,6 +66,21 @@ def __init__(self, repo: str, token: str, branch: str):
self._token_idx = 0
self.branch = branch

@staticmethod
def _workflow_for_gpu(gpu_type: GPU) -> tuple[str, str | None]:
if gpu_type.value in ["MI300", "MI250", "MI300x8", "MI355X"]:
return "amd_workflow.yml", {
"MI300": "amdgpu-mi300-x86-64",
"MI250": "amdgpu-mi250-x86-64",
"MI300x8": "amdgpu-mi300-8-x86-64",
"MI355X": "arc-runner-set",
}[gpu_type.value]
if gpu_type.value == "B200_Nebius":
return "helion_workflow.yml", None
if gpu_type.value == "NVIDIA":
return "nvidia_workflow.yml", None
raise ValueError(f"Invalid GPU type: {gpu_type.value}")

@staticmethod
def _load_github_tokens(fallback_token: str) -> list[str]:
primary = (os.getenv("GITHUB_TOKEN") or fallback_token).strip()
Expand Down Expand Up @@ -94,29 +109,16 @@ async def run_submission( # noqa: C901
self, config: dict, gpu_type: GPU, status: RunProgressReporter
) -> FullResult:
gpu_vendor = None
runner_name = None
selected_workflow, runner_name = self._workflow_for_gpu(gpu_type)
if gpu_type.value in ["MI300", "MI250", "MI300x8", "MI355X"]:
selected_workflow = "amd_workflow.yml"
runner_name = {
"MI300": "amdgpu-mi300-x86-64",
"MI250": "amdgpu-mi250-x86-64",
"MI300x8": "amdgpu-mi300-8-x86-64",
"MI355X": "arc-runner-set",
}[gpu_type.value]
gpu_vendor = "AMD"
requirements = AMD_REQUIREMENTS
elif gpu_type.value == "B200_Nebius":
selected_workflow = "helion_workflow.yml"
runner_name = None
gpu_vendor = "NVIDIA"
requirements = NVIDIA_REQUIREMENTS
elif gpu_type.value == "NVIDIA":
selected_workflow = "nvidia_workflow.yml"
runner_name = None
gpu_vendor = "NVIDIA"
requirements = NVIDIA_REQUIREMENTS
else:
raise ValueError(f"Invalid GPU type: {gpu_type.value}")

lang = config["lang"]
if lang == "cu" and gpu_vendor == "AMD":
Expand Down Expand Up @@ -199,6 +201,35 @@ async def run_submission( # noqa: C901
system = SystemInfo(**data.get("system", {}))
return FullResult(success=True, error="", runs=runs, system=system)

async def get_queue_status(
self, gpu_type: GPU, config: dict | None = None
) -> RunnerQueueStatus:
try:
selected_workflow, _ = self._workflow_for_gpu(gpu_type)
run = GitHubRun(self.repo, self._next_token(), self.branch, selected_workflow)
workflow = await run.get_workflow()
queued_runs = await asyncio.to_thread(
workflow.get_runs,
event="workflow_dispatch",
status="queued",
)
queued_jobs = await asyncio.to_thread(lambda: queued_runs.totalCount)
except Exception as e:
logger.warning("Could not get GitHub queue stats for %s", gpu_type.name, exc_info=e)
return RunnerQueueStatus(
runner=self.name,
gpu=gpu_type.name,
queued_jobs=None,
status="unavailable",
error=str(e),
)

return RunnerQueueStatus(
runner=self.name,
gpu=gpu_type.name,
queued_jobs=queued_jobs,
)

async def wait_callback(self, run: "GitHubRun", status: RunProgressReporter):
await status.update(
f"⏳ Workflow [{run.run_id}](<{run.html_url}>): {run.status} "
Expand Down Expand Up @@ -246,7 +277,6 @@ def patched_create_dispatch(
return status == 200 or status == 204



class GitHubRun:
def __init__(self, repo: str, token: str, branch: str, workflow_file: str):
gh = Github(auth=Auth.Token(token))
Expand Down
28 changes: 27 additions & 1 deletion src/libkernelbot/launchers/launcher.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,40 @@
from dataclasses import asdict, dataclass
from enum import Enum
from typing import Type
from typing import Any, Type

from libkernelbot.consts import GPU
from libkernelbot.report import RunProgressReporter


@dataclass
class RunnerQueueStatus:
runner: str
gpu: str
queued_jobs: int | None
running_jobs: int | None = None
available_runners: int | None = None
status: str = "available"
error: str | None = None

def to_dict(self) -> dict[str, Any]:
return asdict(self)


class Launcher:
def __init__(self, name: str, gpus: Type[Enum]):
self.name = name
self.gpus = gpus

async def run_submission(self, config: dict, gpu_type: GPU, status: RunProgressReporter):
raise NotImplementedError()

async def get_queue_status(
self, gpu_type: GPU, config: dict | None = None
) -> RunnerQueueStatus:
return RunnerQueueStatus(
runner=self.name,
gpu=gpu_type.name,
queued_jobs=None,
status="unavailable",
error="queue status is not supported for this runner",
)
39 changes: 36 additions & 3 deletions src/libkernelbot/launchers/modal.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from libkernelbot.run_eval import FullResult
from libkernelbot.utils import setup_logging

from .launcher import Launcher
from .launcher import Launcher, RunnerQueueStatus

logger = setup_logging(__name__)

Expand All @@ -23,8 +23,7 @@ async def run_submission(
loop = asyncio.get_event_loop()
if config["lang"] == "cu":
config["include_dirs"] = config.get("include_dirs", []) + self.additional_include_dirs
func_type = "pytorch" if config["lang"] == "py" else "cuda"
func_name = f"run_{func_type}_script_{gpu_type.value.lower()}"
func_name = self._function_name(config, gpu_type)

logger.info(f"Starting Modal run using {func_name}")

Expand All @@ -38,3 +37,37 @@ async def run_submission(
await status.update("✅ Waiting for modal run to finish... Done")

return result

def _function_name(self, config: dict, gpu_type: GPU) -> str:
func_type = "pytorch" if config["lang"] == "py" else "cuda"
return f"run_{func_type}_script_{gpu_type.value.lower()}"

async def get_queue_status(
self, gpu_type: GPU, config: dict | None = None
) -> RunnerQueueStatus:
func_name = self._function_name(config or {"lang": "cu"}, gpu_type)
loop = asyncio.get_event_loop()

try:
stats = await loop.run_in_executor(
None,
lambda: modal.Function.from_name(
"discord-bot-runner", func_name
).get_current_stats(),
)
except Exception as e:
logger.warning("Could not get Modal queue stats for %s", func_name, exc_info=e)
return RunnerQueueStatus(
runner=self.name,
gpu=gpu_type.name,
queued_jobs=None,
status="unavailable",
error=str(e),
)

return RunnerQueueStatus(
runner=self.name,
gpu=gpu_type.name,
queued_jobs=getattr(stats, "backlog", None),
available_runners=getattr(stats, "num_total_runners", None),
)
30 changes: 29 additions & 1 deletion tests/test_admin_api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Tests for admin API endpoints."""

from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from fastapi.testclient import TestClient

from libkernelbot.launchers import RunnerQueueStatus


@pytest.fixture
def mock_backend():
Expand Down Expand Up @@ -80,6 +82,32 @@ def test_admin_stop(self, test_client, mock_backend):
assert mock_backend.accepts_jobs is False


class TestRunnerQueue:
def test_get_runner_queue(self, test_client, mock_backend):
"""GET /runner_queue/{gpu_type} returns runner backlog."""
mock_backend.get_runner_queue_status = AsyncMock(
return_value=RunnerQueueStatus(
runner="Modal",
gpu="B200",
queued_jobs=6,
available_runners=1,
)
)

response = test_client.get("/runner_queue/B200")

assert response.status_code == 200
assert response.json() == {
"runner": "Modal",
"gpu": "B200",
"queued_jobs": 6,
"running_jobs": None,
"available_runners": 1,
"status": "available",
"error": None,
}


class TestAdminStats:
"""Test admin stats endpoint."""

Expand Down
Loading
Loading