Skip to content

Commit 2343dd1

Browse files
committed
feat(core): add max overflow for buffer
1 parent 2aef830 commit 2343dd1

4 files changed

Lines changed: 28 additions & 5 deletions

File tree

src/server/core/acontext_core/service/controller/message.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,19 @@ async def process_session_pending_message(
1515
pending_message_ids = None
1616
try:
1717
async with DB_CLIENT.get_session_context() as session:
18-
r = await MD.unpending_session_messages_to_running(session, session_id)
18+
r = await MD.get_message_ids(
19+
session,
20+
session_id,
21+
limit=project_config.project_session_message_buffer_max_overflow_turns,
22+
asc=True,
23+
)
1924
pending_message_ids, eil = r.unpack()
2025
if eil:
2126
return
27+
await MD.update_message_status_to(
28+
session, pending_message_ids, TaskStatus.RUNNING
29+
)
30+
LOG.info(f"Unpending {len(pending_message_ids)} session messages to process")
2231

2332
async with DB_CLIENT.get_session_context() as session:
2433
r = await MD.fetch_messages_data_by_ids(session, pending_message_ids)

src/server/core/acontext_core/service/data/message.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ async def get_message_ids(
181181

182182

183183
async def unpending_session_messages_to_running(
184-
db_session: AsyncSession, session_id: asUUID
184+
db_session: AsyncSession, session_id: asUUID, limit: int
185185
) -> Result[List[asUUID]]:
186186
query = (
187187
update(Message)

src/server/core/acontext_core/service/session_message.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ async def insert_new_message(body: InsertNewMessage, message: Message):
9898
_l = await check_session_message_lock_or_set(str(body.session_id))
9999
if not _l:
100100
LOG.info(
101-
f"Current Session is processing. "
101+
f"Current Session is locked. "
102102
f"wait {DEFAULT_CORE_CONFIG.session_message_session_lock_wait_seconds} seconds for next resend. "
103103
f"Message {body.message_id}"
104104
)
@@ -113,6 +113,20 @@ async def insert_new_message(body: InsertNewMessage, message: Message):
113113
LOG.info(
114114
f"Session message buffer is full (size: {pending_message_length}), start process"
115115
)
116+
if (
117+
pending_message_length
118+
> project_config.project_session_message_buffer_max_overflow_turns
119+
):
120+
LOG.info(
121+
f"Session message buffer is overflow "
122+
f"(size: {pending_message_length} > {project_config.project_session_message_buffer_max_overflow_turns}), "
123+
f"Truncate the buffer, the rest will be processed later in {DEFAULT_CORE_CONFIG.session_message_session_lock_wait_seconds} seconds"
124+
)
125+
await MQ_CLIENT.publish(
126+
exchange_name=EX.session_message,
127+
routing_key=RK.session_message_insert_retry,
128+
body=body.model_dump_json(),
129+
)
116130
await MC.process_session_pending_message(project_config, body.session_id)
117131
finally:
118132
await release_session_message_lock(str(body.session_id))
@@ -162,7 +176,7 @@ async def buffer_new_message(body: InsertNewMessage, message: Message):
162176
_l = await check_session_message_lock_or_set(str(body.session_id))
163177
if not _l:
164178
LOG.info(
165-
f"Current Session is processing, resend Message {body.message_id} to insert queue."
179+
f"Current Session is locked, resend Message {body.message_id} to insert queue."
166180
)
167181
await MQ_CLIENT.publish(
168182
exchange_name=EX.session_message,

src/server/core/acontext_core/telemetry/log.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def format(self, record):
6363
base_msg = super().format(record)
6464

6565
# Check if there are extra fields (beyond the standard ones)
66-
printout_attrs = {"traceback"}
66+
printout_attrs = {"traceback", "session_id", "message_id"}
6767

6868
all_fields = record.__dict__
6969
extra_fields = []

0 commit comments

Comments
 (0)