Skip to content

Commit 698a8aa

Browse files
committed
feat: update wide event log
1 parent 70958ea commit 698a8aa

8 files changed

Lines changed: 1885 additions & 1791 deletions

File tree

src/server/core/acontext_core/infra/async_mq.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
# The publish() method includes retry logic to handle reconnection automatically.
33
import asyncio
44
import json
5-
import traceback
65
from enum import StrEnum
76
from functools import partial
87
from pydantic import ValidationError, BaseModel
@@ -17,6 +16,7 @@
1716
from ..env import LOG, DEFAULT_CORE_CONFIG
1817
from ..telemetry.log import (
1918
bound_logging_vars,
19+
get_logging_contextvars,
2020
set_wide_event,
2121
clear_wide_event,
2222
)
@@ -31,6 +31,22 @@
3131
OTEL_AVAILABLE = False
3232

3333

34+
def _inject_otel_trace(target: dict) -> None:
35+
"""Snapshot current OTel span's trace_id/span_id into *target* dict."""
36+
if not OTEL_AVAILABLE:
37+
return
38+
try:
39+
span = trace.get_current_span()
40+
if span is None:
41+
return
42+
ctx = span.get_span_context()
43+
if ctx is not None and ctx.trace_id != 0:
44+
target["trace_id"] = format(ctx.trace_id, "032x")
45+
target["span_id"] = format(ctx.span_id, "016x")
46+
except Exception:
47+
pass
48+
49+
3450
def _extract_trace_context_from_headers(message: Message) -> Optional[Any]:
3551
"""Extract trace context from message headers for distributed tracing."""
3652
if not OTEL_AVAILABLE or not message.headers:
@@ -77,7 +93,10 @@ class SpecialHandler(StrEnum):
7793
"timeout_seconds",
7894
"duration_ms",
7995
"_log_level",
96+
"trace_id",
97+
"span_id",
8098
}
99+
| LOGGING_FIELDS
81100
)
82101

83102

@@ -257,13 +276,16 @@ async def _process_message(
257276
if extracted_context and OTEL_AVAILABLE:
258277
token = otel_context.attach(extracted_context)
259278
try:
279+
wide_event.update(get_logging_contextvars())
280+
_inject_otel_trace(wide_event)
260281
await asyncio.wait_for(
261282
config.handler(validated_body, message),
262283
timeout=config.timeout,
263284
)
264285
finally:
265286
otel_context.detach(token)
266287
else:
288+
wide_event.update(get_logging_contextvars())
267289
await asyncio.wait_for(
268290
config.handler(validated_body, message),
269291
timeout=config.timeout,
@@ -650,7 +672,7 @@ async def start(self) -> None:
650672

651673
for task in done:
652674
try:
653-
r = task.result()
675+
task.result()
654676
if task in self._consumer_loop_tasks:
655677
self._consumer_loop_tasks.remove(task)
656678
except Exception as e:

src/server/core/acontext_core/llm/agent/task.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ async def task_agent_curd(
189189
for tool_call in use_tools:
190190
try:
191191
tool_name = tool_call.function.name
192+
tools_called.append(tool_name)
192193
if tool_name == "finish":
193194
just_finish = True
194195
continue
@@ -208,8 +209,6 @@ async def task_agent_curd(
208209
raise RuntimeError(
209210
f"Tool {tool_name} rejected: {r.error}"
210211
)
211-
if tool_name != "report_thinking":
212-
tools_called.append(tool_name)
213212
tool_response.append(
214213
{
215214
"role": "tool",

src/server/core/acontext_core/llm/complete/anthropic_sdk.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from time import perf_counter
66
from ...env import LOG
77
from ...schema.llm import LLMResponse
8+
from ...telemetry.log import get_wide_event
89

910

1011
def convert_openai_tool_to_anthropic_tool(tools: list[dict]) -> list[dict]:
@@ -108,14 +109,23 @@ async def anthropic_complete(
108109
)
109110
_end_s = perf_counter()
110111

112+
_input = response.usage.input_tokens
113+
_output = response.usage.output_tokens
114+
_cached = response.usage.cache_read_input_tokens or 0
115+
116+
wide = get_wide_event()
117+
wide["llm_input_tokens"] = wide.get("llm_input_tokens", 0) + _input
118+
wide["llm_output_tokens"] = wide.get("llm_output_tokens", 0) + _output
119+
wide["llm_cached_tokens"] = wide.get("llm_cached_tokens", 0) + _cached
120+
111121
LOG.info(
112122
"llm.complete",
113123
prompt_id=prompt_id,
114124
model=model,
115-
cached_tokens=response.usage.cache_read_input_tokens,
116-
input_tokens=response.usage.input_tokens,
117-
output_tokens=response.usage.output_tokens,
118-
total_tokens=response.usage.input_tokens + response.usage.output_tokens,
125+
cached_tokens=_cached,
126+
input_tokens=_input,
127+
output_tokens=_output,
128+
total_tokens=_input + _output,
119129
duration_s=round(_end_s - _start_s, 4),
120130
)
121131

src/server/core/acontext_core/llm/complete/openai_sdk.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from time import perf_counter
77
from ...env import LOG, DEFAULT_CORE_CONFIG
88
from ...schema.llm import LLMResponse
9+
from ...telemetry.log import get_wide_event
910

1011

1112
def convert_openai_tool_to_llm_tool(tool_body: ChatCompletionMessageToolCall) -> dict:
@@ -59,15 +60,23 @@ async def openai_complete(
5960
**kwargs,
6061
)
6162
_end_s = perf_counter()
62-
cached_tokens = getattr(response.usage.prompt_tokens_details, "cached_tokens", None)
63+
_input = response.usage.prompt_tokens
64+
_output = response.usage.completion_tokens
65+
_cached = getattr(response.usage.prompt_tokens_details, "cached_tokens", None) or 0
66+
67+
wide = get_wide_event()
68+
wide["llm_input_tokens"] = wide.get("llm_input_tokens", 0) + _input
69+
wide["llm_output_tokens"] = wide.get("llm_output_tokens", 0) + _output
70+
wide["llm_cached_tokens"] = wide.get("llm_cached_tokens", 0) + _cached
71+
6372
LOG.info(
6473
"llm.complete",
6574
prompt_id=prompt_id,
6675
model=model,
67-
cached_tokens=cached_tokens,
68-
input_tokens=response.usage.prompt_tokens,
69-
output_tokens=response.usage.completion_tokens,
70-
total_tokens=response.usage.total_tokens,
76+
cached_tokens=_cached,
77+
input_tokens=_input,
78+
output_tokens=_output,
79+
total_tokens=_input + _output,
7180
duration_s=round(_end_s - _start_s, 4),
7281
)
7382

src/server/core/acontext_core/schema/result.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import sys
12
from pydantic import BaseModel, ConfigDict
23
from typing import Generic, TypeVar, Optional, Union
34
from .error_code import Code
@@ -6,6 +7,17 @@
67
T = TypeVar("T")
78

89

10+
def _caller_name(depth: int = 2) -> str:
11+
"""Return the qualified name of the caller's caller.
12+
13+
``depth=2`` skips this function and the immediate caller (resolve/reject).
14+
"""
15+
try:
16+
return sys._getframe(depth).f_code.co_qualname
17+
except (ValueError, AttributeError):
18+
return "unknown"
19+
20+
921
class ResultError(Exception):
1022
pass
1123

@@ -29,14 +41,19 @@ class Result(BaseModel, Generic[T]):
2941

3042
@classmethod
3143
def resolve(cls, data: T) -> "Result[T]":
44+
wide = get_wide_event()
45+
caller = _caller_name()
46+
stack = wide.setdefault("success_stack", [])
47+
if caller not in stack:
48+
stack.append(caller)
3249
return cls(data=data, error=Error())
3350

3451
@classmethod
3552
def reject(cls, errmsg: str, status: Code = Code.INTERNAL_ERROR) -> "Result[T]":
3653
assert status != Code.SUCCESS, "status must not be SUCCESS"
3754
wide = get_wide_event()
38-
wide.setdefault("errors", []).append(
39-
{"status": str(status), "errmsg": errmsg}
55+
wide.setdefault("error_stack", []).append(
56+
{"caller": _caller_name(), "status": str(status), "errmsg": errmsg}
4057
)
4158
return cls(data=None, error=Error.init(status, errmsg))
4259

src/server/core/acontext_core/service/session_message.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ async def insert_new_message(body: InsertNewMessage, message: Message):
3737
msg_status, eil = r.unpack()
3838
if eil or msg_status != "pending":
3939
wide["action"] = "skip_not_pending"
40+
wide["_log_level"] = "debug"
4041
return
4142

4243
r = await PD.get_project_config(session, body.project_id)

src/server/core/tests/schema/test_result.py

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,44 @@ def test_get_wide_event_throwaway_dict_does_not_pollute_contextvar():
5454

5555

5656
# ---------------------------------------------------------------------------
57-
# Result.reject appends errors to wide event
57+
# Result.resolve records deduplicated caller in success_stack
58+
# ---------------------------------------------------------------------------
59+
60+
61+
def test_resolve_records_caller_in_success_stack():
62+
event: dict = {}
63+
set_wide_event(event)
64+
try:
65+
Result.resolve("a")
66+
Result.resolve("b")
67+
Result.resolve("c")
68+
assert event["success_stack"] == ["test_resolve_records_caller_in_success_stack"]
69+
finally:
70+
clear_wide_event()
71+
72+
73+
def _helper_resolve():
74+
return Result.resolve("from helper")
75+
76+
77+
def test_resolve_deduplicates_same_caller():
78+
event: dict = {}
79+
set_wide_event(event)
80+
try:
81+
Result.resolve("first")
82+
_helper_resolve()
83+
Result.resolve("second")
84+
_helper_resolve()
85+
assert event["success_stack"] == [
86+
"test_resolve_deduplicates_same_caller",
87+
"_helper_resolve",
88+
]
89+
finally:
90+
clear_wide_event()
91+
92+
93+
# ---------------------------------------------------------------------------
94+
# Result.reject appends to error_stack
5895
# ---------------------------------------------------------------------------
5996

6097

@@ -63,10 +100,10 @@ def test_reject_appends_error_to_wide_event():
63100
set_wide_event(event)
64101
try:
65102
Result.reject("something broke", Code.INTERNAL_ERROR)
66-
assert "errors" in event
67-
assert len(event["errors"]) == 1
68-
assert event["errors"][0]["errmsg"] == "something broke"
69-
assert event["errors"][0]["status"] == str(Code.INTERNAL_ERROR)
103+
assert len(event["error_stack"]) == 1
104+
assert event["error_stack"][0]["errmsg"] == "something broke"
105+
assert event["error_stack"][0]["status"] == str(Code.INTERNAL_ERROR)
106+
assert event["error_stack"][0]["caller"] == "test_reject_appends_error_to_wide_event"
70107
finally:
71108
clear_wide_event()
72109

@@ -77,9 +114,9 @@ def test_reject_accumulates_multiple_errors():
77114
try:
78115
Result.reject("first", Code.BAD_REQUEST)
79116
Result.reject("second", Code.INTERNAL_ERROR)
80-
assert len(event["errors"]) == 2
81-
assert event["errors"][0]["errmsg"] == "first"
82-
assert event["errors"][1]["errmsg"] == "second"
117+
assert len(event["error_stack"]) == 2
118+
assert event["error_stack"][0]["errmsg"] == "first"
119+
assert event["error_stack"][1]["errmsg"] == "second"
83120
finally:
84121
clear_wide_event()
85122

0 commit comments

Comments
 (0)