Skip to content

Commit 7c97e2e

Browse files
committed
feat(core): add di and log extra print
1 parent 67572a5 commit 7c97e2e

10 files changed

Lines changed: 62 additions & 22 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .entry import setup, cleanup
1+
from .di import setup, cleanup
22

33
__version__ = "0.0.1.dev"
44
__url__ = "https://github.com/memodb-io/Acontext"

src/server/core/acontext_core/infra/async_mq.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ async def _process_message(self, config: ConsumerConfig, message: Message) -> No
205205
f"Message processing unknown error - queue: {config.queue_name}, "
206206
f"attempt: {retry_count}/{config.max_retries}, "
207207
f"retry after {_wait_for}s, "
208-
f"error: {str(e)}. {traceback.format_exc()}",
208+
f"error: {str(e)}.",
209209
extra={"traceback": traceback.format_exc()},
210210
)
211211
await asyncio.sleep(_wait_for) # Exponential backoff

src/server/core/acontext_core/infra/db.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,10 @@ async def get_session_context(self) -> AsyncGenerator[AsyncSession, None]:
139139
yield session
140140
await session.commit()
141141
except Exception as e:
142-
logger.error(f"DB Session failed: {str(e)}. Rollback...")
143-
logger.error(traceback.format_exc(), extra={"__internal__": True})
142+
logger.error(
143+
f"DB Session failed: {str(e)}. Rollback...",
144+
extra={"traceback": traceback.format_exc()},
145+
)
144146
await session.rollback()
145147
raise e
146148
finally:

src/server/core/acontext_core/llm/agent/task.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import re
12
from typing import List
23
from urllib import response
34
from ...env import LOG, CONFIG, bound_logging_vars
@@ -34,7 +35,7 @@ async def task_agent_curd(
3435
session_id: asUUID,
3536
previous_messages: List[MessageBlob],
3637
messages: List[MessageBlob],
37-
max_iterations=3,
38+
max_iterations=1, # task curd agent only receive one turn of actions
3839
) -> Result[None]:
3940
async with DB_CLIENT.get_session_context() as db_session:
4041
r = await TD.fetch_current_tasks(db_session, session_id)
@@ -87,7 +88,7 @@ async def task_agent_curd(
8788
use_ctx = TaskCtx(
8889
db_session=db_session,
8990
session_id=session_id,
90-
task_ids_index=[t.id for t in tasks],
91+
task_ids_index=[t.id for t in current_tasks],
9192
message_ids_index=[m.message_id for m in messages],
9293
)
9394
tool_response = []

src/server/core/acontext_core/llm/prompt/task.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,20 @@ def system_prompt(cls) -> str:
2828
2929
## Analysis Guidelines
3030
### Planning Detection
31-
- Look for explicit task planning language ("I need to...", "My goal is...", "I will follow ... steps")
32-
- Read out the planning, and separate the tasks from it.
33-
- Link those planning messages to the planning section, since they aren't related to any specific task execution.
34-
- Collect all current tasks without missing future ones
35-
-
31+
- Look for explicit task planning language ("My plan is to...")
32+
- Look for user requirements and preferences.
33+
- General plannings from user/agent.
34+
- The messages that cause you to create/update tasks.
3635
3736
### New Task Detection
3837
- Avoid creating tasks for simple questions answerable directly
3938
- Only collect tasks stated by agents/users, don't invent them
4039
- User's requirement should be confimed by the agent's response, then it becomes a valid task, and append those requirements to planning section.
41-
- [think] The degree of task splitting should follow the agent's plan in the conversation; do not arbitrarily split into finer or coarser granularity.
42-
- [think] Notice any task modification from agent.
43-
- [think] Infer execution order and insert tasks sequentially, make sure you arrange the tasks in logical execution order, no the mentioned order.
44-
- [think] Ensure no task overlap, make sure the tasks are MECE(mutually exclusive, collectively exhaustive).
40+
- The degree of task splitting should follow the agent's plan in the conversation; do not arbitrarily split into finer or coarser granularity.
41+
- Notice any task modification from agent.
42+
- Infer execution order and insert tasks sequentially, make sure you arrange the tasks in logical execution order, no the mentioned order.
43+
- Ensure no task overlap, make sure the tasks are MECE(mutually exclusive, collectively exhaustive).
44+
- When valid new tasks mentioned, always try to capture them all, not only the first one.
4545
4646
### Task Assignment
4747
- Match agent responses/actions to existing task descriptions and contexts
@@ -66,15 +66,16 @@ def system_prompt(cls) -> str:
6666
- `## Current Message with IDs`: the current messages that you need to analyze [with message ids]
6767
- Message with ID format: <message id=N> ... </message>, inside the tag is the message content, the id field indicates the message id.
6868
69-
## Think before calling tools
69+
## Report your thinking before calling tools
7070
- Use extremely brief sentences to state the plans & tasks conversation mentioned, if any.
7171
- Use one-two sentences to briefly describe your plan.
72+
- At the end, confirm you can call finish tool and call it at the end of your actions.
7273
7374
## Action Guidelines
7475
- Be precise, context-aware, and conservative.
7576
- Focus on meaningful task management that organizes conversation objectives effectively.
76-
- Use parallel tool calls when possible, and make sure you call the tools in the correct order.
77-
- After completing all task management actions, call the `finish` tool.
77+
- Use parallel tool calls, and make sure you call the tools in the correct order.
78+
- Make sure you called every tool that you need to call based on your report.
7879
"""
7980

8081
@classmethod

src/server/core/acontext_core/schema/result.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
T = TypeVar("T")
77

88

9+
class ResultError(Exception):
10+
pass
11+
12+
913
class Error(BaseModel):
1014
status: Code = Code.SUCCESS
1115
errmsg: str = ""
@@ -41,3 +45,7 @@ def ok(self) -> bool:
4145
if self.error.status != Code.SUCCESS:
4246
return False
4347
return True
48+
49+
def raise_error(self):
50+
if self.error.status != Code.SUCCESS:
51+
raise ResultError(str(self.error))

src/server/core/acontext_core/service/controller/message.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from ...schema.session.task import TaskStatus
44
from ...schema.session.message import MessageBlob
55
from ...schema.utils import asUUID
6+
from ...schema.result import ResultError
67
from ...env import LOG, CONFIG
78
from ...llm.agent import task as AT
89

@@ -41,15 +42,19 @@ async def process_session_pending_message(session_id: asUUID):
4142
]
4243

4344
r = await AT.task_agent_curd(session_id, previous_messages_data, messages_data)
45+
46+
after_status = TaskStatus.SUCCESS
47+
if not r.ok():
48+
after_status = TaskStatus.FAILED
4449
async with DB_CLIENT.get_session_context() as session:
4550
await MD.update_message_status_to(
46-
session, pending_message_ids, TaskStatus.SUCCESS
51+
session, pending_message_ids, after_status
4752
)
4853
except Exception as e:
4954
if pending_message_ids is None:
5055
raise e
5156
LOG.error(
52-
f"Exception while processing session pending message: {e}, rollback {len(pending_message_ids)} message status to pending"
57+
f"Exception while processing session pending message: {e}, rollback {len(pending_message_ids)} message status to failed"
5358
)
5459
async with DB_CLIENT.get_session_context() as session:
5560
await MD.update_message_status_to(

src/server/core/acontext_core/telemetry/log.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,34 @@ def __get_json_logger():
5555
return logger
5656

5757

58+
class CustomTextFormatter(logging.Formatter):
59+
"""Custom formatter that supports extra fields for text logging"""
60+
61+
def format(self, record):
62+
# First, format the standard message
63+
base_msg = super().format(record)
64+
65+
# Check if there are extra fields (beyond the standard ones)
66+
printout_attrs = {"traceback"}
67+
68+
all_fields = record.__dict__
69+
extra_fields = []
70+
for attr in printout_attrs:
71+
if attr in all_fields:
72+
extra_fields.append(f"{attr}={all_fields[attr]}")
73+
74+
# If there are extra fields, append them to the message
75+
if extra_fields:
76+
extra_text = "\n- " + "\n- ".join(extra_fields)
77+
return base_msg + extra_text
78+
return base_msg
79+
80+
5881
def __get_text_logger():
5982
logger = logging.getLogger("acontext-core")
6083
logger.setLevel(logging.INFO)
6184

62-
formatter = logging.Formatter(
85+
formatter = CustomTextFormatter(
6386
f"{TerminalColorMarks.BOLD}{TerminalColorMarks.BLUE}%(name)s |{TerminalColorMarks.END} %(asctime)s - %(levelname)s - %(message)s"
6487
)
6588
handler = logging.StreamHandler()

src/server/core/core_asgi.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncio
22
import traceback
3-
from acontext_core.entry import MQ_CLIENT, LOG, setup, cleanup
3+
from acontext_core.di import MQ_CLIENT, LOG, setup, cleanup
44

55

66
async def app(scope, receive, send):

0 commit comments

Comments
 (0)