diff --git a/docs/design/brain/interaction-patterns.md b/docs/design/brain/interaction-patterns.md index 60c7cc8..e2b7605 100644 --- a/docs/design/brain/interaction-patterns.md +++ b/docs/design/brain/interaction-patterns.md @@ -73,6 +73,16 @@ Replace with **π reaction on the source message** the moment the bot starts w **Closure semantics for π:** Leave the eyes in place after success. They double as a visual marker in scrollback that the bot processed this message. The final detailed filing reply takes over the closure role. On failure the error reply does the same; the π just means "we tried." +#### Thread the answer, not just the eyes + +The π removes the intermediate noise; the final filing reply is still a top-level message in the channel. In a busy family room that reply is itself the bulk of the noise. So the bot posts its answer as an `m.thread` reply under the source message (the same event it reacted to), not inline. The user's upload, the π, and the filing summary collapse into one threaded unit, and the main timeline shows only what the family actually said. + +This is on by default. Some rooms may want the answer inline: a quiet archive room where the filing summary *is* the content, or a room where threads hurt on a given client. So the placement is a per-room knob. The framework keeps both paths. `MicroBot._answer` threads or replies inline based on `_reply_in_thread(room_id)`, which today returns a single default and is the seam where per-room config plugs in. Errors thread too: in Element a thread with an unread error still surfaces, and keeping every signal about one item in its own thread beats scattering them. + +If the filed message already lives in a thread (the user dropped a doc mid-conversation), the answer joins *that* thread rather than starting a new one, since Matrix forbids nested threads. `MicroBot.get_thread_root` / `check_in_thread` read the source event's own relation to decide, so any bot gets the same handling for free. + +Framework-resident on purpose: `_react`, `_answer`, and the π acknowledgement live on `MicroBot`, not the archivist, so scribe-bot and mail-bot inherit the same quiet-timeline behavior for free. + #### Cost ~2-3h for the user-to-bot bindings (the four-emoji table) plus unit tests around reaction-event parsing. Add ~1-2h to flip the existing "Received X, processing..." messages to π reactions on the source. Both halves use the same `EventType.REACTION` plumbing β the bot half is just emitting reactions instead of subscribing to them. diff --git a/stacklets/core/bot-runner/microbot.py b/stacklets/core/bot-runner/microbot.py index c006cfc..80076c0 100644 --- a/stacklets/core/bot-runner/microbot.py +++ b/stacklets/core/bot-runner/microbot.py @@ -72,6 +72,12 @@ from room_context import RoomContext, context_for +# The framework's "I picked this up and I'm working on it" signal. A +# bot reacts with π on the source message the moment it starts a +# capture β the same liveness role the typing indicator plays, but +# attached to the specific message instead of a separate timeline event. +EYES = "\U0001F440" + class MicroBot: """Base class for lightweight Matrix bots. @@ -508,6 +514,31 @@ async def _room_send( room_id=room_id, message_type=message_type, content=content, ) + async def _react(self, room_id: str, event_id: str, emoji: str) -> None: + """Annotate an event with an emoji reaction (MSC2677). + + The bot's way to signal state on a specific message without a + separate timeline reply β e.g. π the moment it picks up a + capture. Routes through `_room_send` so the transport seam stays + in one place. Best-effort, like the typing indicator and read + receipt: a reaction that fails (homeserver hiccup, room not + joined) must not crash a handler mid-capture. + """ + if not event_id: + return + content = { + "m.relates_to": { + "rel_type": "m.annotation", + "event_id": event_id, + "key": emoji, + }, + } + try: + await self._room_send(room_id, content, message_type="m.reaction") + except Exception as e: + logger.warning("[{}] reaction {} failed in {}: {}", + self.name, emoji, room_id, e) + async def _send( self, room_id: str, text: str, reply_to: str | None = None, *, metadata: dict | None = None, thread_root_event_id: str | None = None, @@ -559,6 +590,75 @@ async def _send( content.update(metadata) await self._room_send(room_id, content) + # Whether a bot's answer to a processed item threads under the source + # message (keeping the main timeline quiet) or posts inline. On by + # default; a per-room override is a planned knob β see + # docs/design/brain/interaction-patterns.md. + REPLY_IN_THREAD = True + + def _reply_in_thread(self, room_id: str) -> bool: + """Per-room reply-placement policy. Today a single default for + every room; the seam where per-room configuration will plug in.""" + return self.REPLY_IN_THREAD + + @staticmethod + def get_thread_root(event) -> str | None: + """The id of the thread ``event`` belongs to, or None if it's a + top-level message. + + Reads the event's own ``m.thread`` relation. Matrix is the + ledger, so the relation on the source event is authoritative β + the bot keeps no thread bookkeeping of its own. A None/eventless + argument is simply not in a thread. + """ + rel = ( + getattr(event, "source", None) or {} + ).get("content", {}).get("m.relates_to", {}) + if rel.get("rel_type") == "m.thread": + return rel.get("event_id") + return None + + @classmethod + def check_in_thread(cls, event) -> bool: + """Whether ``event`` was posted inside a thread.""" + return cls.get_thread_root(event) is not None + + async def _answer( + self, room_id: str, text: str, source_event: str | None, + *, metadata: dict | None = None, + ) -> None: + """Post the bot's answer to a processed item. + + Threads under the source message by default so routine filings + don't crowd the main timeline; falls back to an inline reply + when the room opts out (the historic behavior, kept for the + per-room knob). With no source event there's nothing to attach + to, so it posts plain. + + Matrix forbids nested threads: when the source message already + lives in a thread, the answer joins *that* thread rather than + spawning a malformed one rooted at an in-thread event. We fetch + the source (Matrix is the ledger) and reuse its root; a top- + level source roots a fresh thread at itself. Either way the + answer quotes the source via the reply fallback. + """ + if not (source_event and self._reply_in_thread(room_id)): + await self._send(room_id, text, source_event, metadata=metadata) + return + root = source_event + try: + resp = await self._client.room_get_event(room_id, source_event) + existing = self.get_thread_root(getattr(resp, "event", None)) + if existing: + root = existing + except Exception as e: + logger.debug("[{}] thread-root fetch failed for {}: {}", + self.name, source_event, e) + await self._send( + room_id, text, reply_to=source_event, + thread_root_event_id=root, metadata=metadata, + ) + # The famstack event envelope rides as a custom key on the visible # m.room.message (see `_send`'s `metadata`), so a filing is a single # replayable timeline event. Distinct from `emit_event`, which posts diff --git a/stacklets/docs/bot/archivist.py b/stacklets/docs/bot/archivist.py index 9906590..5332e30 100644 --- a/stacklets/docs/bot/archivist.py +++ b/stacklets/docs/bot/archivist.py @@ -49,7 +49,7 @@ from capture_tags import CaptureTagCache from extractors import TextExtractor, UrlExtractor from git_mirror import GitMirror -from microbot import MicroBot +from microbot import EYES, MicroBot from pdf_analysis import ( DEFAULT_REFORMAT_MAX_PDF_PAGES, DEFAULT_VISION_MAX_PDF_PAGES, @@ -1272,16 +1272,16 @@ async def _reply_for_outcome( translator) so it lives here, not in the pipeline. """ if o.status == "upload_failed": - await self._send(room_id, self.t("upload_failed", name=o.display_name), reply_to) + await self._answer(room_id, self.t("upload_failed", name=o.display_name), reply_to) return if o.status == "duplicate": - await self._send(room_id, self._duplicate_reply(o.display_name, o.duplicate), reply_to) + await self._answer(room_id, self._duplicate_reply(o.display_name, o.duplicate), reply_to) return if o.status == "ocr_failed": - await self._send(room_id, self.t("ocr_failed", name=o.display_name), reply_to) + await self._answer(room_id, self.t("ocr_failed", name=o.display_name), reply_to) return if o.status == "filed_no_details": - await self._send( + await self._answer( room_id, self.t("filed_no_details", name=o.display_name, link=o.link), reply_to, ) @@ -1293,19 +1293,19 @@ async def _reply_for_outcome( ) if llm_error: key, kwargs = llm_error - await self._send(room_id, self.t(key, **kwargs), reply_to) + await self._answer(room_id, self.t(key, **kwargs), reply_to) elif not o.has_text: - await self._send( + await self._answer( room_id, self.t("filed_no_text", name=o.display_name, link=o.link), reply_to, ) elif not o.classify_enabled: - await self._send( + await self._answer( room_id, f"{self.t('filed', title=o.display_name)}\n\n {o.link}", reply_to, ) elif not o.classification: - await self._send( + await self._answer( room_id, self.t("classify_failed", name=o.display_name, link=o.link), reply_to, ) @@ -1326,7 +1326,7 @@ async def _reply_for_outcome( ) # The `dev.famstack.event` envelope rides on the visible # message β one replayable timeline event per filing. - await self._send( + await self._answer( room_id, reply_text, reply_to, metadata={"dev.famstack.event": o.envelope}, ) @@ -1379,7 +1379,6 @@ async def _on_file(self, room, event) -> None: raw_filename = content.get("filename") or content.get("body") or "document" caption = _attachment_caption(content) display_name = _clean_filename(raw_filename, msgtype) - sender_name = event.sender.split(":")[0].replace("@", "").capitalize() reply_to = event.event_id # Multi-page scan / multi-message batch mode. PDFs and images @@ -1403,19 +1402,18 @@ async def _on_file(self, room, event) -> None: await self._send(room.room_id, self.t("download_failed_matrix", name=display_name), reply_to) return - if msgtype == "m.image": - await self._send(room.room_id, self.t("received_photo", sender=sender_name), reply_to) - elif msgtype == "m.audio": - await self._send(room.room_id, self.t("received_voice", sender=sender_name), reply_to) - else: - await self._send(room.room_id, self.t("received_document", sender=sender_name), reply_to) - - # Start typing AFTER the confirmation message. Sending a chat - # message clears the typing indicator on Element's side, so a - # typing notice issued before the confirmation gets immediately - # wiped by the message itself. Setting it here keeps the - # indicator alive for the rest of the OCR + classify + mirror - # work that follows. + # Acknowledge the upload with a π reaction on the source message + # the moment work starts, instead of a "Received X, analyzing..." + # reply. The reaction is attached to the message being processed + # and adds no separate timeline event per capture; the final + # filing reply (or an error reply) is the real closure signal. + await self._react(room.room_id, reply_to, EYES) + + # Set typing after the ack so the indicator stays alive through + # the OCR + classify + mirror work that follows. (The old code + # had to send the "Received X" reply first because a chat message + # clears the indicator; a reaction is the last send before this, + # so typing set here survives.) await self._set_typing(room.room_id, on=True) # Documents room β full archivist pipeline (Paperless + classify @@ -1781,8 +1779,10 @@ async def _handle_scan_page( session["files"].append((raw_filename, file_data)) if caption: session["caption"] = _join_captions(session.get("caption", ""), caption) - page_num = len(session["files"]) - await self._send(room_id, self.t("page_received", num=page_num), reply_to) + # π on the page instead of a "Page N received." line β the + # batch can run several pages deep, so a reaction per page keeps + # the timeline clean; the scan-complete reply is the closure. + await self._react(room_id, reply_to, EYES) async def _handle_voice_batch_message( self, room_id: str, event, url: str, raw_filename: str, @@ -1836,8 +1836,9 @@ async def _handle_voice_batch_message( "mxc": url, "event_id": event.event_id, }) - n = len(session["voice_inputs"]) - await self._send(room_id, self.t("scan_voice_received", num=n), reply_to) + # π acknowledges the memo landed in the batch; the scan-complete + # reply is the closure (mirrors _handle_scan_page). + await self._react(room_id, reply_to, EYES) async def _handle_scan_complete( self, room_id: str, sender: str, reply_to: str | None = None, @@ -1950,9 +1951,16 @@ async def _handle_voice_batch_complete( # capture this is β the rule is "you pasted it, it's yours." def _notifier(self, room_id: str, reply_to: str | None) -> MatrixNotifier: - """A Notifier bound to this room + reply thread for mid-flow status.""" + """A Notifier bound to this room + reply thread for mid-flow status. + + Carries a π react thunk so the capture pipeline can acknowledge + the source message instead of posting a "Reading ..." status line.""" + async def react(rid: str, eid: str) -> None: + await self._react(rid, eid, EYES) + return MatrixNotifier( room_id=room_id, reply_to=reply_to, send=self._send, t=self.t, + react=react, ) async def _handle_capture( @@ -2059,10 +2067,10 @@ async def _reply_for_capture( "binary": "capture_failed_binary", } key = failure_keys.get(o.failure_reason or "", "capture_failed") - await self._send(room_id, self.t(key), reply_to) + await self._answer(room_id, self.t(key), reply_to) return if o.status == "no_mirror": - await self._send(room_id, self.t("capture_no_mirror"), reply_to) + await self._answer(room_id, self.t("capture_no_mirror"), reply_to) return if o.status == "reclassified": reply = render_reprocessed_reply( @@ -2087,7 +2095,7 @@ async def _reply_for_capture( metadata = ( {"dev.famstack.event": o.envelope} if o.envelope else None ) - await self._send(room_id, reply, reply_to, metadata=metadata) + await self._answer(room_id, reply, reply_to, metadata=metadata) # ββ URL archiving (documents room β feeds Paperless) βββββββββββββββββ diff --git a/stacklets/docs/bot/capture_pipeline.py b/stacklets/docs/bot/capture_pipeline.py index 39439d9..70dc6f1 100644 --- a/stacklets/docs/bot/capture_pipeline.py +++ b/stacklets/docs/bot/capture_pipeline.py @@ -182,7 +182,7 @@ async def capture_url( actually wrote, not just whatever the article extractor pulled out. Empty/None leaves the prompt unchanged. """ - await notifier.status("capture_fetching", url=url) + await notifier.acknowledge() source = await self._url_extractor.extract(url) if source is None: return CaptureOutcome( diff --git a/stacklets/docs/bot/messages/archivist.yml b/stacklets/docs/bot/messages/archivist.yml index 32e2b0f..3ac7258 100644 --- a/stacklets/docs/bot/messages/archivist.yml +++ b/stacklets/docs/bot/messages/archivist.yml @@ -9,9 +9,6 @@ en: startup: "\u270D\uFE0F The Archivist has returned. Hand me your documents." # File upload - received_photo: "\U0001F4F7 Received photo from {sender} β analyzing..." - received_document: "\U0001F4C4 Received document from {sender} β analyzing..." - received_voice: "\U0001F3A4 Received voice memo from {sender}. Transcribing..." download_failed: "\u274C Failed to download {name}: {error}" download_failed_matrix: "\u274C Failed to download {name} from Matrix." @@ -67,14 +64,12 @@ en: # Scan scan_started: "\U0001F4F8 Batch started for {sender}. Send pages, photos, or voice memos, then ) or 'fertig' to combine." - page_received: "\U0001F4C4 Page {num} received." scan_page_failed: "\u274C Failed to download page: {error}" scan_page_failed_matrix: "\u274C Failed to download page from Matrix." scan_cancelled: "No pages received β scan cancelled." scan_complete_single: "\U0001F4CB Scan complete (1 page) β analyzing..." scan_complete_multi: "\U0001F4CB Scan complete β combining {count} pages into PDF..." scan_combine_failed: "\u274C Failed to combine pages into PDF: {error}" - scan_voice_received: "\U0001F3A4 Voice memo {num} received." scan_voice_failed: "\u274C Failed to download voice memo: {error}" scan_voice_failed_matrix: "\u274C Failed to download voice memo from Matrix." scan_voice_no_transcriber: "\U0001F3A4 Voice transcription is not configured. Set up AI with 'stack up ai'." @@ -92,7 +87,6 @@ en: url_not_pdf: "\u274C URL does not point to a PDF (Content-Type: {content_type}).\n Currently only PDF links and Google Docs are supported." # URL capture (knowledge rooms β links become summarized notes, not Paperless docs) - capture_fetching: "\U0001F50D Reading {url}..." capture_failed: "\u274C Couldn't read that link. Either the host is unreachable or there's no article body to extract." capture_failed_transcription: "\U0001F3A4 Couldn't transcribe that voice memo. Whisper may be down or misconfigured." capture_failed_binary: "\u274C Couldn't read that file." @@ -221,9 +215,6 @@ de: startup: "\u270D\uFE0F Der Archivar ist zurΓΌck. Gebt mir eure Dokumente." # File upload - received_photo: "\U0001F4F7 Foto von {sender} empfangen β wird analysiert..." - received_document: "\U0001F4C4 Dokument von {sender} empfangen β wird analysiert..." - received_voice: "\U0001F3A4 Sprachnachricht von {sender} empfangen. Wird transkribiert..." download_failed: "\u274C Download von {name} fehlgeschlagen: {error}" download_failed_matrix: "\u274C Download von {name} aus Matrix fehlgeschlagen." @@ -279,14 +270,12 @@ de: # Scan scan_started: "\U0001F4F8 Batch gestartet fΓΌr {sender}. Sende Seiten, Fotos oder Sprachnachrichten, dann ) oder 'fertig' zum Zusammenfassen." - page_received: "\U0001F4C4 Seite {num} empfangen." scan_page_failed: "\u274C Seite konnte nicht heruntergeladen werden: {error}" scan_page_failed_matrix: "\u274C Seite konnte nicht aus Matrix heruntergeladen werden." scan_cancelled: "Keine Seiten empfangen β Scan abgebrochen." scan_complete_single: "\U0001F4CB Scan abgeschlossen (1 Seite) β wird analysiert..." scan_complete_multi: "\U0001F4CB Scan abgeschlossen β {count} Seiten werden zu PDF zusammengefasst..." scan_combine_failed: "\u274C Seiten konnten nicht zu PDF zusammengefasst werden: {error}" - scan_voice_received: "\U0001F3A4 Sprachnachricht {num} empfangen." scan_voice_failed: "\u274C Sprachnachricht konnte nicht heruntergeladen werden: {error}" scan_voice_failed_matrix: "\u274C Sprachnachricht konnte nicht aus Matrix heruntergeladen werden." scan_voice_no_transcriber: "\U0001F3A4 Sprach-Transkription ist nicht eingerichtet. 'stack up ai' ausfΓΌhren." @@ -304,7 +293,6 @@ de: url_not_pdf: "\u274C URL zeigt nicht auf ein PDF (Content-Type: {content_type}).\n Aktuell werden nur PDF-Links und Google Docs unterstΓΌtzt." # URL-Capture (WissensrΓ€ume β Links werden zu zusammengefassten Notizen, nicht zu Paperless-Dokumenten) - capture_fetching: "\U0001F50D Lese {url}..." capture_failed: "\u274C Konnte den Link nicht lesen. Entweder ist die Seite nicht erreichbar, oder es gibt keinen Artikel-Inhalt zum Extrahieren." capture_failed_transcription: "\U0001F3A4 Konnte die Sprachnachricht nicht transkribieren. Whisper ist mΓΆglicherweise nicht erreichbar oder falsch konfiguriert." capture_failed_binary: "\u274C Konnte diese Datei nicht lesen." diff --git a/stacklets/docs/bot/notifier.py b/stacklets/docs/bot/notifier.py index a18228a..34bd163 100644 --- a/stacklets/docs/bot/notifier.py +++ b/stacklets/docs/bot/notifier.py @@ -16,14 +16,19 @@ class Notifier(Protocol): - """Posts a translated, ephemeral status message into the conversation.""" + """Posts an ephemeral progress signal into the conversation β either + a translated status line or a reaction on the source message.""" async def status(self, key: str, **kwargs) -> None: ... + async def acknowledge(self) -> None: + """Signal 'picked this up, working on it' without a reply line.""" + ... + class MatrixNotifier: """A Notifier bound to one room + reply thread, backed by the bot's - formatted send and translator.""" + formatted send, translator, and (optionally) reaction transport.""" def __init__( self, *, @@ -31,11 +36,21 @@ def __init__( reply_to: Optional[str], send: Callable[..., Awaitable[None]], t: Callable[..., str], + react: Optional[Callable[[str, str], Awaitable[None]]] = None, ): self._room_id = room_id self._reply_to = reply_to self._send = send self._t = t + self._react = react async def status(self, key: str, **kwargs) -> None: await self._send(self._room_id, self._t(key, **kwargs), self._reply_to) + + async def acknowledge(self) -> None: + """React π on the bound source message β the reaction-based + replacement for the old "Reading β¦" status text. A no-op when no + reaction transport or no source event is bound (e.g. a text-only + flow), so callers never need to guard the call.""" + if self._react is not None and self._reply_to is not None: + await self._react(self._room_id, self._reply_to) diff --git a/tests/stacklets/test_capture_pipeline.py b/tests/stacklets/test_capture_pipeline.py index 3c1daff..67492f4 100644 --- a/tests/stacklets/test_capture_pipeline.py +++ b/tests/stacklets/test_capture_pipeline.py @@ -104,10 +104,14 @@ async def get_tags(self): class FakeNotifier: def __init__(self): self.statuses: list[tuple] = [] + self.acknowledged = 0 async def status(self, key, **kwargs): self.statuses.append((key, kwargs)) + async def acknowledge(self): + self.acknowledged += 1 + def _pipeline(*, mirror, classifier=None, capture_keep_body=False, transcriber=None, llm=None): @@ -156,7 +160,7 @@ async def transcribe(self, audio: bytes, *, filename: str = "voice.ogg", class TestCaptureUrl: @pytest.mark.asyncio - async def test_announces_fetching_then_captures(self): + async def test_acknowledges_then_captures(self): mirror = FakeMirror() pipe = _pipeline(mirror=mirror) notifier = FakeNotifier() @@ -164,7 +168,9 @@ async def test_announces_fetching_then_captures(self): url="http://example.com", sender_mxid="@homer:s", notifier=notifier, ) assert out.status == "captured" - assert ("capture_fetching", {"url": "http://example.com"}) in notifier.statuses + # The bot reacts π on the source message instead of posting a + # "Reading example.com..." status reply. + assert notifier.acknowledged == 1 assert len(mirror.captures) == 1 assert mirror.captures[0]["kind"] == "bookmark" assert out.display_link == "http://example.com" @@ -187,8 +193,8 @@ async def test_extract_failure(self): # URL-shaped failure -> the reply layer renders the link error # message (`Couldn't read that link...`). assert out.failure_reason == "url" - # Fetching was still announced before the failed extract. - assert notifier.statuses[0][0] == "capture_fetching" + # The π acknowledgement still fired before the failed extract. + assert notifier.acknowledged == 1 @pytest.mark.asyncio async def test_no_mirror(self): diff --git a/tests/stacklets/test_microbot.py b/tests/stacklets/test_microbot.py index 3fc9657..c307acd 100644 --- a/tests/stacklets/test_microbot.py +++ b/tests/stacklets/test_microbot.py @@ -456,6 +456,159 @@ async def test_tables_extension_enabled(self, tmp_path): assert "