Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/design/brain/interaction-patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ Replace with **👀 reaction on the source message** the moment the bot starts w

**Closure semantics for 👀:** Leave the eyes in place after success. They double as a visual marker in scrollback that the bot processed this message. The final detailed filing reply takes over the closure role. On failure the error reply does the same; the 👀 just means "we tried."

#### Thread the answer, not just the eyes

The 👀 removes the intermediate noise; the final filing reply is still a top-level message in the channel. In a busy family room that reply is itself the bulk of the noise. So the bot posts its answer as an `m.thread` reply under the source message (the same event it reacted to), not inline. The user's upload, the 👀, and the filing summary collapse into one threaded unit, and the main timeline shows only what the family actually said.

This is on by default. Some rooms may want the answer inline: a quiet archive room where the filing summary *is* the content, or a room where threads hurt on a given client. So the placement is a per-room knob. The framework keeps both paths. `MicroBot._answer` threads or replies inline based on `_reply_in_thread(room_id)`, which today returns a single default and is the seam where per-room config plugs in. Errors thread too: in Element a thread with an unread error still surfaces, and keeping every signal about one item in its own thread beats scattering them.

If the filed message already lives in a thread (the user dropped a doc mid-conversation), the answer joins *that* thread rather than starting a new one, since Matrix forbids nested threads. `MicroBot.get_thread_root` / `check_in_thread` read the source event's own relation to decide, so any bot gets the same handling for free.

Framework-resident on purpose: `_react`, `_answer`, and the 👀 acknowledgement live on `MicroBot`, not the archivist, so scribe-bot and mail-bot inherit the same quiet-timeline behavior for free.

#### Cost

~2-3h for the user-to-bot bindings (the four-emoji table) plus unit tests around reaction-event parsing. Add ~1-2h to flip the existing "Received X, processing..." messages to 👀 reactions on the source. Both halves use the same `EventType.REACTION` plumbing — the bot half is just emitting reactions instead of subscribing to them.
Expand Down
100 changes: 100 additions & 0 deletions stacklets/core/bot-runner/microbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@

from room_context import RoomContext, context_for

# The framework's "I picked this up and I'm working on it" signal. A
# bot reacts with 👀 on the source message the moment it starts a
# capture — the same liveness role the typing indicator plays, but
# attached to the specific message instead of a separate timeline event.
EYES = "\U0001F440"


class MicroBot:
"""Base class for lightweight Matrix bots.
Expand Down Expand Up @@ -508,6 +514,31 @@ async def _room_send(
room_id=room_id, message_type=message_type, content=content,
)

async def _react(self, room_id: str, event_id: str, emoji: str) -> None:
"""Annotate an event with an emoji reaction (MSC2677).

The bot's way to signal state on a specific message without a
separate timeline reply — e.g. 👀 the moment it picks up a
capture. Routes through `_room_send` so the transport seam stays
in one place. Best-effort, like the typing indicator and read
receipt: a reaction that fails (homeserver hiccup, room not
joined) must not crash a handler mid-capture.
"""
if not event_id:
return
content = {
"m.relates_to": {
"rel_type": "m.annotation",
"event_id": event_id,
"key": emoji,
},
}
try:
await self._room_send(room_id, content, message_type="m.reaction")
except Exception as e:
logger.warning("[{}] reaction {} failed in {}: {}",
self.name, emoji, room_id, e)

async def _send(
self, room_id: str, text: str, reply_to: str | None = None,
*, metadata: dict | None = None, thread_root_event_id: str | None = None,
Expand Down Expand Up @@ -559,6 +590,75 @@ async def _send(
content.update(metadata)
await self._room_send(room_id, content)

# Whether a bot's answer to a processed item threads under the source
# message (keeping the main timeline quiet) or posts inline. On by
# default; a per-room override is a planned knob — see
# docs/design/brain/interaction-patterns.md.
REPLY_IN_THREAD = True

def _reply_in_thread(self, room_id: str) -> bool:
"""Per-room reply-placement policy. Today a single default for
every room; the seam where per-room configuration will plug in."""
return self.REPLY_IN_THREAD

@staticmethod
def get_thread_root(event) -> str | None:
"""The id of the thread ``event`` belongs to, or None if it's a
top-level message.

Reads the event's own ``m.thread`` relation. Matrix is the
ledger, so the relation on the source event is authoritative —
the bot keeps no thread bookkeeping of its own. A None/eventless
argument is simply not in a thread.
"""
rel = (
getattr(event, "source", None) or {}
).get("content", {}).get("m.relates_to", {})
if rel.get("rel_type") == "m.thread":
return rel.get("event_id")
return None

@classmethod
def check_in_thread(cls, event) -> bool:
"""Whether ``event`` was posted inside a thread."""
return cls.get_thread_root(event) is not None

async def _answer(
self, room_id: str, text: str, source_event: str | None,
*, metadata: dict | None = None,
) -> None:
"""Post the bot's answer to a processed item.

Threads under the source message by default so routine filings
don't crowd the main timeline; falls back to an inline reply
when the room opts out (the historic behavior, kept for the
per-room knob). With no source event there's nothing to attach
to, so it posts plain.

Matrix forbids nested threads: when the source message already
lives in a thread, the answer joins *that* thread rather than
spawning a malformed one rooted at an in-thread event. We fetch
the source (Matrix is the ledger) and reuse its root; a top-
level source roots a fresh thread at itself. Either way the
answer quotes the source via the reply fallback.
"""
if not (source_event and self._reply_in_thread(room_id)):
await self._send(room_id, text, source_event, metadata=metadata)
return
root = source_event
try:
resp = await self._client.room_get_event(room_id, source_event)
existing = self.get_thread_root(getattr(resp, "event", None))
if existing:
root = existing
except Exception as e:
logger.debug("[{}] thread-root fetch failed for {}: {}",
self.name, source_event, e)
await self._send(
room_id, text, reply_to=source_event,
thread_root_event_id=root, metadata=metadata,
)

# The famstack event envelope rides as a custom key on the visible
# m.room.message (see `_send`'s `metadata`), so a filing is a single
# replayable timeline event. Distinct from `emit_event`, which posts
Expand Down
72 changes: 40 additions & 32 deletions stacklets/docs/bot/archivist.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from capture_tags import CaptureTagCache
from extractors import TextExtractor, UrlExtractor
from git_mirror import GitMirror
from microbot import MicroBot
from microbot import EYES, MicroBot
from pdf_analysis import (
DEFAULT_REFORMAT_MAX_PDF_PAGES,
DEFAULT_VISION_MAX_PDF_PAGES,
Expand Down Expand Up @@ -1272,16 +1272,16 @@ async def _reply_for_outcome(
translator) so it lives here, not in the pipeline.
"""
if o.status == "upload_failed":
await self._send(room_id, self.t("upload_failed", name=o.display_name), reply_to)
await self._answer(room_id, self.t("upload_failed", name=o.display_name), reply_to)
return
if o.status == "duplicate":
await self._send(room_id, self._duplicate_reply(o.display_name, o.duplicate), reply_to)
await self._answer(room_id, self._duplicate_reply(o.display_name, o.duplicate), reply_to)
return
if o.status == "ocr_failed":
await self._send(room_id, self.t("ocr_failed", name=o.display_name), reply_to)
await self._answer(room_id, self.t("ocr_failed", name=o.display_name), reply_to)
return
if o.status == "filed_no_details":
await self._send(
await self._answer(
room_id, self.t("filed_no_details", name=o.display_name, link=o.link),
reply_to,
)
Expand All @@ -1293,19 +1293,19 @@ async def _reply_for_outcome(
)
if llm_error:
key, kwargs = llm_error
await self._send(room_id, self.t(key, **kwargs), reply_to)
await self._answer(room_id, self.t(key, **kwargs), reply_to)
elif not o.has_text:
await self._send(
await self._answer(
room_id, self.t("filed_no_text", name=o.display_name, link=o.link),
reply_to,
)
elif not o.classify_enabled:
await self._send(
await self._answer(
room_id, f"{self.t('filed', title=o.display_name)}\n\n {o.link}",
reply_to,
)
elif not o.classification:
await self._send(
await self._answer(
room_id, self.t("classify_failed", name=o.display_name, link=o.link),
reply_to,
)
Expand All @@ -1326,7 +1326,7 @@ async def _reply_for_outcome(
)
# The `dev.famstack.event` envelope rides on the visible
# message — one replayable timeline event per filing.
await self._send(
await self._answer(
room_id, reply_text, reply_to,
metadata={"dev.famstack.event": o.envelope},
)
Expand Down Expand Up @@ -1379,7 +1379,6 @@ async def _on_file(self, room, event) -> None:
raw_filename = content.get("filename") or content.get("body") or "document"
caption = _attachment_caption(content)
display_name = _clean_filename(raw_filename, msgtype)
sender_name = event.sender.split(":")[0].replace("@", "").capitalize()
reply_to = event.event_id

# Multi-page scan / multi-message batch mode. PDFs and images
Expand All @@ -1403,19 +1402,18 @@ async def _on_file(self, room, event) -> None:
await self._send(room.room_id, self.t("download_failed_matrix", name=display_name), reply_to)
return

if msgtype == "m.image":
await self._send(room.room_id, self.t("received_photo", sender=sender_name), reply_to)
elif msgtype == "m.audio":
await self._send(room.room_id, self.t("received_voice", sender=sender_name), reply_to)
else:
await self._send(room.room_id, self.t("received_document", sender=sender_name), reply_to)

# Start typing AFTER the confirmation message. Sending a chat
# message clears the typing indicator on Element's side, so a
# typing notice issued before the confirmation gets immediately
# wiped by the message itself. Setting it here keeps the
# indicator alive for the rest of the OCR + classify + mirror
# work that follows.
# Acknowledge the upload with a 👀 reaction on the source message
# the moment work starts, instead of a "Received X, analyzing..."
# reply. The reaction is attached to the message being processed
# and adds no separate timeline event per capture; the final
# filing reply (or an error reply) is the real closure signal.
await self._react(room.room_id, reply_to, EYES)

# Set typing after the ack so the indicator stays alive through
# the OCR + classify + mirror work that follows. (The old code
# had to send the "Received X" reply first because a chat message
# clears the indicator; a reaction is the last send before this,
# so typing set here survives.)
await self._set_typing(room.room_id, on=True)

# Documents room → full archivist pipeline (Paperless + classify
Expand Down Expand Up @@ -1781,8 +1779,10 @@ async def _handle_scan_page(
session["files"].append((raw_filename, file_data))
if caption:
session["caption"] = _join_captions(session.get("caption", ""), caption)
page_num = len(session["files"])
await self._send(room_id, self.t("page_received", num=page_num), reply_to)
# 👀 on the page instead of a "Page N received." line — the
# batch can run several pages deep, so a reaction per page keeps
# the timeline clean; the scan-complete reply is the closure.
await self._react(room_id, reply_to, EYES)

async def _handle_voice_batch_message(
self, room_id: str, event, url: str, raw_filename: str,
Expand Down Expand Up @@ -1836,8 +1836,9 @@ async def _handle_voice_batch_message(
"mxc": url,
"event_id": event.event_id,
})
n = len(session["voice_inputs"])
await self._send(room_id, self.t("scan_voice_received", num=n), reply_to)
# 👀 acknowledges the memo landed in the batch; the scan-complete
# reply is the closure (mirrors _handle_scan_page).
await self._react(room_id, reply_to, EYES)

async def _handle_scan_complete(
self, room_id: str, sender: str, reply_to: str | None = None,
Expand Down Expand Up @@ -1950,9 +1951,16 @@ async def _handle_voice_batch_complete(
# capture this is — the rule is "you pasted it, it's yours."

def _notifier(self, room_id: str, reply_to: str | None) -> MatrixNotifier:
"""A Notifier bound to this room + reply thread for mid-flow status."""
"""A Notifier bound to this room + reply thread for mid-flow status.

Carries a 👀 react thunk so the capture pipeline can acknowledge
the source message instead of posting a "Reading ..." status line."""
async def react(rid: str, eid: str) -> None:
await self._react(rid, eid, EYES)

return MatrixNotifier(
room_id=room_id, reply_to=reply_to, send=self._send, t=self.t,
react=react,
)

async def _handle_capture(
Expand Down Expand Up @@ -2059,10 +2067,10 @@ async def _reply_for_capture(
"binary": "capture_failed_binary",
}
key = failure_keys.get(o.failure_reason or "", "capture_failed")
await self._send(room_id, self.t(key), reply_to)
await self._answer(room_id, self.t(key), reply_to)
return
if o.status == "no_mirror":
await self._send(room_id, self.t("capture_no_mirror"), reply_to)
await self._answer(room_id, self.t("capture_no_mirror"), reply_to)
return
if o.status == "reclassified":
reply = render_reprocessed_reply(
Expand All @@ -2087,7 +2095,7 @@ async def _reply_for_capture(
metadata = (
{"dev.famstack.event": o.envelope} if o.envelope else None
)
await self._send(room_id, reply, reply_to, metadata=metadata)
await self._answer(room_id, reply, reply_to, metadata=metadata)

# ── URL archiving (documents room — feeds Paperless) ─────────────────

Expand Down
2 changes: 1 addition & 1 deletion stacklets/docs/bot/capture_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ async def capture_url(
actually wrote, not just whatever the article extractor pulled
out. Empty/None leaves the prompt unchanged.
"""
await notifier.status("capture_fetching", url=url)
await notifier.acknowledge()
source = await self._url_extractor.extract(url)
if source is None:
return CaptureOutcome(
Expand Down
Loading
Loading