Skip to content

Commit 49b9f20

Browse files
committed
feat(core): add task id for previous messages
1 parent 5b5d0e3 commit 49b9f20

3 files changed

Lines changed: 62 additions & 28 deletions

File tree

src/server/core/acontext_core/llm/agent/task.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,28 @@ def pack_task_section(tasks: List[TaskSchema]) -> str:
2424
return section
2525

2626

27-
def pack_previous_messages_section(messages: list[MessageBlob]) -> str:
28-
return "\n".join([m.to_string() for m in messages])
27+
def pack_previous_messages_section(
28+
planning_task: TaskSchema | None,
29+
tasks: list[TaskSchema],
30+
messages: list[MessageBlob],
31+
) -> str:
32+
task_ids = [m.task_id for m in messages]
33+
mappings = {t.id: t for t in tasks}
34+
task_descs = []
35+
for ti in task_ids:
36+
if ti is None:
37+
task_descs.append("(no task linked)")
38+
continue
39+
elif ti in mappings:
40+
task_descs.append(f"(append to task_{mappings[ti].task_order})")
41+
elif planning_task is not None and ti == planning_task.id:
42+
task_descs.append("(append to planning_section)")
43+
else:
44+
LOG.warning(f"Unknown task id: {ti}")
45+
task_descs.append("(no task linked)")
46+
return "\n---\n".join(
47+
[f"{td}\n{m.to_string()}" for td, m in zip(task_descs, messages)]
48+
)
2949

3050

3151
def pack_current_message_with_ids(messages: list[MessageBlob]) -> str:
@@ -64,8 +84,15 @@ async def task_agent_curd(
6484
if eil:
6585
return r
6686

87+
r = await TD.fetch_planning_task(db_session, session_id)
88+
planning_section, eil = r.unpack()
89+
if eil:
90+
return r
91+
6792
task_section = pack_task_section(tasks)
68-
previous_messages_section = pack_previous_messages_section(previous_messages)
93+
previous_messages_section = pack_previous_messages_section(
94+
planning_section, tasks, previous_messages
95+
)
6996
current_messages_section = pack_current_message_with_ids(messages)
7097

7198
LOG.info(f"Task Section: {task_section}")

src/server/core/acontext_core/service/controller/message.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,15 @@ async def process_session_pending_message(
4848
if eil:
4949
return
5050
messages_data = [
51-
MessageBlob(message_id=m.id, role=m.role, parts=m.parts)
51+
MessageBlob(
52+
message_id=m.id, role=m.role, parts=m.parts, task_id=m.task_id
53+
)
5254
for m in messages
5355
]
5456
previous_messages_data = [
55-
MessageBlob(message_id=m.id, role=m.role, parts=m.parts)
57+
MessageBlob(
58+
message_id=m.id, role=m.role, parts=m.parts, task_id=m.task_id
59+
)
5660
for m in previous_messages
5761
]
5862

src/server/core/acontext_core/service/data/task.py

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,32 @@
1111
from ...schema.session.task import TaskSchema
1212

1313

14+
async def fetch_planning_task(
15+
db_session: AsyncSession, session_id: asUUID
16+
) -> Result[TaskSchema | None]:
17+
query = (
18+
select(Task)
19+
.where(Task.session_id == session_id)
20+
.options(selectinload(Task.messages))
21+
.where(Task.is_planning_task == True)
22+
)
23+
result = await db_session.execute(query)
24+
planning = result.scalars().first()
25+
if planning is None:
26+
return Result.resolve(None)
27+
return Result.resolve(
28+
TaskSchema(
29+
id=planning.id,
30+
session_id=planning.session_id,
31+
task_order=planning.task_order,
32+
task_status=planning.task_status,
33+
task_description="",
34+
task_data=planning.task_data,
35+
raw_message_ids=[msg.id for msg in planning.messages],
36+
)
37+
)
38+
39+
1440
async def fetch_current_tasks(
1541
db_session: AsyncSession, session_id: asUUID, status: str = None
1642
) -> Result[List[TaskSchema]]:
@@ -40,29 +66,6 @@ async def fetch_current_tasks(
4066
return Result.resolve(tasks_d) # Fixed: return tasks_d instead of tasks
4167

4268

43-
async def fetch_planning_section(
44-
db_session: AsyncSession, session_id: asUUID
45-
) -> Result[TaskSchema]:
46-
query = (
47-
select(Task)
48-
.where(Task.session_id == session_id)
49-
.where(Task.is_planning_task == True)
50-
.options(selectinload(Task.messages))
51-
)
52-
result = await db_session.execute(query)
53-
task = result.scalars().first()
54-
return Resolver.resolve(
55-
TaskSchema(
56-
id=task.id,
57-
session_id=task.session_id,
58-
task_order=task.task_order,
59-
task_status=task.task_status,
60-
task_data=task.task_data,
61-
raw_message_ids=[msg.id for msg in task.messages],
62-
)
63-
)
64-
65-
6669
async def update_task(
6770
db_session: AsyncSession,
6871
task_id: asUUID,

0 commit comments

Comments
 (0)