Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Discriminated ISO probe cycle outcomes (`CycleResult`: success / empty / failed) with distinct logging and `/health` fields (`last_cycle_status`, `last_cycle_error`).
- Atomic `SchedulerSnapshot` for `/health` scheduler extras (`last_updated`, `poll_count`, probe stats) published under a lock from `Scheduler.health_snapshot()`.
- Post the same Slack **status** summary as the interactive command to `NOTIFICATION_CHANNEL` once when the process starts (when that channel is configured).
- Open-source hygiene: contributing guide, security policy, code of conduct, onboarding and handoff docs, pre-commit (Ruff), GitHub issue templates, Dependabot, CodeQL, CODEOWNERS template, and `.gitattributes`.

Expand Down
4 changes: 2 additions & 2 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ These components share one thread and the main event loop. They may await I/O bu

- **`Scheduler.run_forever` / `poll_once`** — orchestrates index refresh, probing, and notifications.
- **`WG21Index.refresh`** — fetches and parses wg21.link index (httpx async).
- **`ISOProber.run_cycle` / `_probe_one`** — concurrent HEAD probes via `asyncio.gather` and an httpx async client.
- **`ISOProber.run_cycle` / `_probe_one`** — concurrent HEAD probes via `asyncio.gather` and an httpx async client. `run_cycle` returns a discriminated `CycleResult` (success / empty / failed).
- **Slack Bolt handlers** — run on Bolt’s thread; they should not read mutable source state directly (use snapshots or health callbacks).

`ISOProber._stats` is updated from many coroutines in one `run_cycle()`. This is safe on the event loop because asyncio never preempts between awaits. A `threading.Lock` guards `_stats` as defense-in-depth if code is ever called from a worker thread by mistake.
Expand All @@ -17,7 +17,7 @@ These components share one thread and the main event loop. They may await I/O bu

| Thread | Role |
|--------|------|
| **Health server** (`health.py`) | Serves `GET /health`; reads `len(index.papers)` via a callback and scheduler snapshot fields. |
| **Health server** (`health.py`) | Serves `GET /health`; reads `len(index.papers)` via a callback and scheduler fields from `Scheduler.health_snapshot()` (immutable snapshot, lock-protected publish). |
| **MessageQueue sender** (`scout.py`) | Drains Slack post queue with rate limiting. |
| **`run_blocking_io` / `asyncio.to_thread`** | Runs blocking psycopg2 calls (e.g. `UserWatchlist.matches_for_users`) off the loop. |

Expand Down
76 changes: 62 additions & 14 deletions src/paperscout/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,63 @@

log = logging.getLogger("paperscout")

# MessageQueue keys allowed in /health extras (must not overlap scheduler.health_snapshot()).
_MQ_HEALTH_FIELD_NAMES = frozenset(
{
"mq_depth",
"mq_max_size",
"mq_utilization",
"mq_circuit_state",
}
)


def _mq_health_fields(mq: MessageQueue) -> dict:
"""MQ metrics for /health; from health_fields() when present, else depth only."""
if hasattr(mq, "health_fields"):
try:
raw = mq.health_fields()
except Exception as exc:
log.warning(
"health: mq.health_fields() failed for %s id=%s: %s",
type(mq).__name__,
id(mq),
exc,
exc_info=True,
)
return {"mq_depth": mq.depth()}
if isinstance(raw, dict):
return raw
log.warning("health: mq.health_fields() returned non-dict, using mq_depth only")
return {"mq_depth": mq.depth()}


def _merge_extra_health_fields(
scheduler_snap: dict,
mq_extra: dict,
db_pool: dict,
) -> dict:
"""Merge health JSON with scheduler winning on key conflicts."""
scheduler_keys = set(scheduler_snap)
mq_filtered: dict = {}
for key, value in mq_extra.items():
if key in _MQ_HEALTH_FIELD_NAMES:
if key in scheduler_keys:
log.debug(
"health: mq_extra key %r conflicts with scheduler snapshot; scheduler wins",
key,
)
else:
mq_filtered[key] = value
elif key in scheduler_keys:
log.debug(
"health: mq_extra key %r not allow-listed; scheduler snapshot kept",
key,
)
else:
log.debug("health: mq_extra key %r not allow-listed, dropping", key)
return {**scheduler_snap, **mq_filtered, "db_pool": db_pool}


def _setup_logging(data_dir: Path, console_level: str = "INFO", retention_days: int = 7) -> None:
"""Console + daily rotating file logging; third-party loggers capped at WARNING."""
Expand Down Expand Up @@ -141,20 +198,11 @@ def _pool_status(p) -> dict:
)

def _extra_health_fields() -> dict:
lsp = scheduler._last_successful_poll
s = scheduler._last_probe_stats
# HTTP 200 outcomes / non-skipped probe attempts (excludes skipped_discovered, skipped_in_index).
hits = s.get("hit_recent", 0) + s.get("hit_old", 0) + s.get("hit_no_lm", 0)
attempted = hits + s.get("miss", 0) + s.get("error", 0)
probe_success_rate = hits / attempted if attempted > 0 else None
return {
"last_successful_poll": (
datetime.fromtimestamp(lsp, tz=timezone.utc).isoformat() if lsp else None
),
"probe_success_rate": probe_success_rate,
"mq_depth": mq.depth(),
"db_pool": _pool_status(pool),
}
return _merge_extra_health_fields(
scheduler.health_snapshot(),
_mq_health_fields(mq),
_pool_status(pool),
)

register_handlers(app, user_watchlist, state, paper_count_fn, launch_time)

Expand Down
6 changes: 5 additions & 1 deletion src/paperscout/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,12 @@ class Settings(BaseSettings):
notification_channel: str = ""
# Slack channel ID for ops alerts (stale poll). Empty = disabled.
ops_alert_channel: str = ""
# Log a warning when MessageQueue depth reaches or exceeds this (unbounded queue).
# Log a warning when MessageQueue depth reaches or exceeds this (legacy threshold).
mq_backpressure_threshold: int = Field(default=100, ge=1)
mq_max_size: int = Field(default=1000, ge=1)
mq_max_retries: int = Field(default=10, ge=0)
mq_circuit_breaker_threshold: int = Field(default=5, ge=1)
mq_circuit_breaker_cooldown_seconds: int = Field(default=60, ge=1)
notify_on_frontier_hit: bool = True
notify_on_any_draft: bool = True
# Alert when a D-paper we previously probed appears in the wg21.link index
Expand Down
4 changes: 3 additions & 1 deletion src/paperscout/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ def do_GET(self) -> None:
except Exception:
log.exception("health: extra_fields_fn failed")
extra = {}
body = json.dumps({**base, **extra}).encode()
# Base handler fields win if extra_fields_fn returns overlapping keys.
safe_extra = {k: v for k, v in extra.items() if k not in base}
body = json.dumps({**base, **safe_extra}).encode()

self.send_response(200)
self.send_header("Content-Type", "application/json")
Expand Down
32 changes: 32 additions & 0 deletions src/paperscout/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,38 @@ class ProbeHit:
is_recent: bool = False


class CycleStatus(str, Enum):
"""Outcome of one ``ISOProber.run_cycle()`` invocation."""

SUCCESS = "success"
EMPTY = "empty"
FAILED = "failed"


@dataclass(frozen=True, slots=True)
class CycleResult:
"""Discriminated probe cycle result (success vs empty vs failed)."""

status: CycleStatus
results: tuple[ProbeHit, ...] = ()
error: str | None = None

@property
def hits(self) -> list[ProbeHit]:
"""Probe hits when ``status`` is ``SUCCESS``; otherwise empty."""
return list(self.results) if self.status == CycleStatus.SUCCESS else []

def __post_init__(self) -> None:
if self.status == CycleStatus.FAILED and not self.error:
raise ValueError("CycleResult FAILED must carry a non-empty error string")
if self.status == CycleStatus.SUCCESS and not self.results:
raise ValueError("CycleResult SUCCESS must carry at least one ProbeHit")
if self.status == CycleStatus.EMPTY and self.results:
raise ValueError("CycleResult EMPTY must not carry results")
if self.status != CycleStatus.FAILED and self.error is not None:
raise ValueError("CycleResult error is only valid for FAILED status")


@dataclass
class PerUserMatches:
"""One user's watchlist hits: ``(paper|hit, 'author'|'paper')`` tuples."""
Expand Down
Loading