Skip to content

Commit e076b48

Browse files
committed
feat(core): add task sends
1 parent 69edfee commit e076b48

10 files changed

Lines changed: 83 additions & 27 deletions

File tree

src/server/core/acontext_core/llm/agent/task.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@ def pack_current_message_with_ids(messages: list[MessageBlob]) -> str:
5858

5959

6060
async def build_task_ctx(
61-
db_session: AsyncSession, session_id: asUUID, messages: list[MessageBlob]
61+
db_session: AsyncSession,
62+
project_id: asUUID,
63+
session_id: asUUID,
64+
messages: list[MessageBlob],
6265
) -> TaskCtx:
6366
LOG.debug(f"Building task context for session {session_id}")
6467
r = await TD.fetch_current_tasks(db_session, session_id)
@@ -67,6 +70,7 @@ async def build_task_ctx(
6770
return r
6871
use_ctx = TaskCtx(
6972
db_session=db_session,
73+
project_id=project_id,
7074
session_id=session_id,
7175
task_ids_index=[t.id for t in current_tasks],
7276
message_ids_index=[m.message_id for m in messages],
@@ -76,6 +80,7 @@ async def build_task_ctx(
7680

7781
@track_process
7882
async def task_agent_curd(
83+
project_id: asUUID,
7984
session_id: asUUID,
8085
previous_messages: List[MessageBlob],
8186
messages: List[MessageBlob],
@@ -130,7 +135,7 @@ async def task_agent_curd(
130135
use_tools = llm_return.tool_calls
131136
just_finish = False
132137
async with DB_CLIENT.get_session_context() as db_session:
133-
USE_CTX = await build_task_ctx(db_session, session_id, messages)
138+
USE_CTX = await build_task_ctx(db_session, project_id, session_id, messages)
134139
tool_response = []
135140
for tool_call in use_tools:
136141
try:
@@ -155,7 +160,9 @@ async def task_agent_curd(
155160
)
156161
if tool_name not in NEED_UPDATE_CTX:
157162
continue
158-
USE_CTX = await build_task_ctx(db_session, session_id, messages)
163+
USE_CTX = await build_task_ctx(
164+
db_session, project_id, session_id, messages
165+
)
159166
except KeyError as e:
160167
return Result.reject(f"Tool {tool_name} not found: {str(e)}")
161168
except Exception as e:

src/server/core/acontext_core/llm/tool/task_lib/ctx.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
@dataclass
77
class TaskCtx:
88
db_session: AsyncSession
9+
project_id: asUUID
910
session_id: asUUID
1011
task_ids_index: list[asUUID]
1112
message_ids_index: list[asUUID]

src/server/core/acontext_core/llm/tool/task_lib/update.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,27 @@
1+
import asyncio
12
from typing import Any
2-
from ....infra.db import AsyncSession
3+
from ....infra.async_mq import MQ_CLIENT
34
from ..base import Tool, ToolPool
45
from ....schema.llm import ToolSchema
56
from ....schema.utils import asUUID
67
from ....schema.result import Result
78
from ....schema.orm import Task
9+
from ....schema.mq.space import NewTaskComplete
10+
from ....schema.session.task import TaskStatus
811
from ....service.data import task as TD
912
from ....env import LOG
13+
from ....service.constants import EX, RK
1014
from .ctx import TaskCtx
1115

1216

17+
async def send_complete_new_task(body: NewTaskComplete):
18+
await MQ_CLIENT.publish(
19+
exchange_name=EX.space_task,
20+
routing_key=RK.space_task_new_complete,
21+
body=body.model_dump_json(),
22+
)
23+
24+
1325
async def update_task_handler(
1426
ctx: TaskCtx,
1527
llm_arguments: dict,
@@ -24,23 +36,33 @@ async def update_task_handler(
2436
f"Task order {task_order} is out of range, updating failed."
2537
)
2638
actually_task_id = ctx.task_ids_index[task_order - 1]
27-
status = llm_arguments.get("task_status", None)
28-
description = llm_arguments.get("task_description", None)
39+
task_status = llm_arguments.get("task_status", None)
40+
task_description = llm_arguments.get("task_description", None)
2941
r = await TD.update_task(
3042
ctx.db_session,
3143
actually_task_id,
32-
status=status,
44+
status=task_status,
3345
patch_data=(
3446
{
35-
"task_description": description,
47+
"task_description": task_description,
3648
}
37-
if description
49+
if task_description
3850
else None
3951
),
4052
)
4153
t, eil = r.unpack()
4254
if eil:
4355
return r
56+
if task_status is not None and task_status == TaskStatus.SUCCESS.value:
57+
asyncio.create_task(
58+
send_complete_new_task(
59+
NewTaskComplete(
60+
project_id=ctx.project_id,
61+
session_id=ctx.session_id,
62+
task_id=actually_task_id,
63+
)
64+
)
65+
)
4466
return Result.resolve(f"Task {t.task_order} updated")
4567

4668

src/server/core/acontext_core/schema/session/task.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class TaskSchema(BaseModel):
1818
task_description: str
1919
task_status: TaskStatus
2020
task_data: dict
21+
space_digested: bool
2122
raw_message_ids: list[asUUID]
2223

2324
def to_string(self) -> str:

src/server/core/acontext_core/service/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ class EX:
44

55

66
class RK:
7-
space_task_complete = "space.task.complete"
7+
space_task_new_complete = "space.task.new.complete"
88

99
session_message_insert = "session.message.insert"
1010
session_message_insert_retry = "session.message.insert.retry"

src/server/core/acontext_core/service/controller/message.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111

1212
async def process_session_pending_message(
13-
project_config: ProjectConfig, session_id: asUUID
13+
project_config: ProjectConfig, project_id: asUUID, session_id: asUUID
1414
):
1515
pending_message_ids = None
1616
try:
@@ -60,7 +60,9 @@ async def process_session_pending_message(
6060
for m in previous_messages
6161
]
6262

63-
r = await AT.task_agent_curd(session_id, previous_messages_data, messages_data)
63+
r = await AT.task_agent_curd(
64+
project_id, session_id, previous_messages_data, messages_data
65+
)
6466

6567
after_status = TaskStatus.SUCCESS
6668
if not r.ok():

src/server/core/acontext_core/service/controller/space_task.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,30 @@
77
from ...schema.result import ResultError
88
from ...env import LOG, DEFAULT_CORE_CONFIG
99
from ...schema.config import ProjectConfig
10+
from ...schema.session.task import TaskSchema
1011

1112

12-
async def process_space_pending_task(
13-
project_config: ProjectConfig, space_id: asUUID, task_id: asUUID
13+
async def process_space_task(
14+
project_config: ProjectConfig, space_id: asUUID, task: TaskSchema
1415
):
15-
pass
16+
if task.task_status != TaskStatus.SUCCESS:
17+
LOG.info(f"Task {task.id} is not success, skipping")
18+
return
19+
20+
async with DB_CLIENT.get_session_context() as db_session:
21+
# 1. fetch messages from task
22+
msg_ids = task.raw_message_ids
23+
r = await MD.fetch_messages_data_by_ids(db_session, msg_ids)
24+
if not r.ok():
25+
return
26+
messages, _ = r.unpack()
27+
messages_data = [
28+
MessageBlob(message_id=m.id, role=m.role, parts=m.parts, task_id=m.task_id)
29+
for m in messages
30+
]
31+
print(messages_data)
32+
# 2. call agent to digest raw messages to SOP
33+
...
34+
35+
# 3. Create block and trigger space_agent to save it
36+
...

src/server/core/acontext_core/service/data/task.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ async def fetch_planning_task(
3232
task_status=planning.task_status,
3333
task_description="",
3434
task_data=planning.task_data,
35+
space_digested=planning.space_digested,
3536
raw_message_ids=[msg.id for msg in planning.messages],
3637
)
3738
)
@@ -51,6 +52,7 @@ async def fetch_task(db_session: AsyncSession, task_id: asUUID) -> Result[TaskSc
5152
task_status=task.task_status,
5253
task_description=task.task_data.get("task_description", ""),
5354
task_data=task.task_data,
55+
space_digested=task.space_digested,
5456
raw_message_ids=[msg.id for msg in task.messages],
5557
)
5658
)
@@ -78,6 +80,7 @@ async def fetch_current_tasks(
7880
task_status=t.task_status,
7981
task_description=t.task_data.get("task_description", ""),
8082
task_data=t.task_data,
83+
space_digested=t.space_digested,
8184
raw_message_ids=[msg.id for msg in t.messages],
8285
)
8386
for t in tasks

src/server/core/acontext_core/service/session_message.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,9 @@ async def insert_new_message(body: InsertNewMessage, message: Message):
127127
routing_key=RK.session_message_insert_retry,
128128
body=body.model_dump_json(),
129129
)
130-
await MC.process_session_pending_message(project_config, body.session_id)
130+
await MC.process_session_pending_message(
131+
project_config, body.project_id, body.session_id
132+
)
131133
finally:
132134
await release_session_message_lock(str(body.session_id))
133135

@@ -185,6 +187,8 @@ async def buffer_new_message(body: InsertNewMessage, message: Message):
185187
)
186188
return
187189
try:
188-
await MC.process_session_pending_message(project_config, body.session_id)
190+
await MC.process_session_pending_message(
191+
project_config, body.project_id, body.session_id
192+
)
189193
finally:
190194
await release_session_message_lock(str(body.session_id))

src/server/core/acontext_core/service/space_task.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,14 @@
2222
mq_client=MQ_CLIENT,
2323
config=ConsumerConfigData(
2424
exchange_name=EX.space_task,
25-
routing_key=RK.space_task_complete,
26-
queue_name=RK.space_task_complete,
25+
routing_key=RK.space_task_new_complete,
26+
queue_name=RK.space_task_new_complete,
2727
),
2828
)
29-
async def complete_new_task(body: NewTaskComplete, message: Message):
30-
async with DB_CLIENT.session() as db_session:
29+
async def space_complete_new_task(body: NewTaskComplete, message: Message):
30+
async with DB_CLIENT.get_session_context() as db_session:
3131
r = await SD.fetch_session(db_session, body.session_id)
3232
if not r.ok():
33-
LOG.error(f"Session {body.session_id} not found, error: {r.error}")
3433
return
3534
session_data, _ = r.unpack()
3635
if session_data.space_id is None:
@@ -39,14 +38,10 @@ async def complete_new_task(body: NewTaskComplete, message: Message):
3938
SPACE_ID = session_data.space_id
4039
r = await TD.fetch_task(db_session, body.task_id)
4140
if not r.ok():
42-
LOG.error(f"Task {body.task_id} not found, error: {r.error}")
4341
return
4442
TASK_DATA, _ = r.unpack()
4543
r = await TD.set_task_space_digested(db_session, body.task_id)
4644
if not r.ok():
47-
LOG.error(
48-
f"Failed to update task {body.task_id} space digested, error: {r.error}"
49-
)
5045
return
5146
already_digested, _ = r.unpack()
5247
if already_digested:
@@ -58,4 +53,4 @@ async def complete_new_task(body: NewTaskComplete, message: Message):
5853
if eil:
5954
return
6055

61-
await STC.process_space_pending_task(project_config, SPACE_ID, TASK_DATA.id)
56+
await STC.process_space_task(project_config, SPACE_ID, TASK_DATA)

0 commit comments

Comments
 (0)