@@ -61,10 +61,9 @@ async def waiting_for_message_notify(wait_for_seconds: int, body: InsertNewMessa
6161async def insert_new_message (body : InsertNewMessage , message : Message ):
6262 LOG .debug (f"Insert new message { body .message_id } " )
6363 async with DB_CLIENT .get_session_context () as read_session :
64- r = await MD .get_latest_message_ids (read_session , body .session_id )
64+ r = await MD .get_message_ids (read_session , body .session_id )
6565 message_ids , eil = r .unpack ()
6666 if eil :
67- LOG .error (f"Exception while fetching session messages: { eil } " )
6867 return
6968 if not len (message_ids ):
7069 LOG .debug (f"No pending message found for session { body .session_id } , ignore" )
@@ -79,13 +78,11 @@ async def insert_new_message(body: InsertNewMessage, message: Message):
7978 r = await PD .get_project_config (read_session , body .project_id )
8079 project_config , eil = r .unpack ()
8180 if eil :
82- LOG .error (f"Exception while fetching project config: { eil } " )
8381 return
8482
8583 r = await MD .session_message_length (read_session , body .session_id )
8684 pending_message_length , eil = r .unpack ()
8785 if eil :
88- LOG .error (f"Exception while fetching session messages: { eil } " )
8986 return
9087 if (
9188 pending_message_length
@@ -110,14 +107,13 @@ async def insert_new_message(body: InsertNewMessage, message: Message):
110107 routing_key = RK .session_message_insert_retry ,
111108 body = body .model_dump_json (),
112109 )
113-
114110 return
115111
116112 try :
117113 LOG .info (
118114 f"Session message buffer is full (size: { pending_message_length } ), start process"
119115 )
120- await MC .process_session_pending_message (body .session_id )
116+ await MC .process_session_pending_message (project_config , body .session_id )
121117 finally :
122118 await release_session_message_lock (str (body .session_id ))
123119
@@ -145,10 +141,9 @@ async def insert_new_message(body: InsertNewMessage, message: Message):
145141)
146142async def buffer_new_message (body : InsertNewMessage , message : Message ):
147143 async with DB_CLIENT .get_session_context () as session :
148- r = await MD .get_latest_message_ids (session , body .session_id )
144+ r = await MD .get_message_ids (session , body .session_id )
149145 message_ids , eil = r .unpack ()
150146 if eil :
151- LOG .error (f"Exception while fetching latest message id { eil } " )
152147 return
153148 if not len (message_ids ):
154149 LOG .debug (f"No pending message found for session { body .session_id } , ignore" )
@@ -159,6 +154,10 @@ async def buffer_new_message(body: InsertNewMessage, message: Message):
159154 f"Message { body .message_id } is not the latest pending message, ignore"
160155 )
161156 return
157+ r = await PD .get_project_config (session , body .project_id )
158+ project_config , eil = r .unpack ()
159+ if eil :
160+ return
162161 LOG .info (f"Message { body .message_id } IDLE, process it now" )
163162 _l = await check_session_message_lock_or_set (str (body .session_id ))
164163 if not _l :
@@ -172,6 +171,6 @@ async def buffer_new_message(body: InsertNewMessage, message: Message):
172171 )
173172 return
174173 try :
175- await MC .process_session_pending_message (body .session_id )
174+ await MC .process_session_pending_message (project_config , body .session_id )
176175 finally :
177176 await release_session_message_lock (str (body .session_id ))
0 commit comments