Skip to content

feat: Add message observing status API#89

Merged
GenerQAQ merged 5 commits intomemodb-io:devfrom
slyt3:feat/message-observing-status
Dec 20, 2025
Merged

feat: Add message observing status API#89
GenerQAQ merged 5 commits intomemodb-io:devfrom
slyt3:feat/message-observing-status

Conversation

@slyt3
Copy link
Copy Markdown
Contributor

@slyt3 slyt3 commented Dec 17, 2025

Summary

Implements API endpoint to return message observing status counts (observed, in_process, pending) for sessions.

Following team feedback, architecture uses a separate message_observing_tracking table decoupled from messages table, with foreign key binding for data integrity.

API Endpoint: GET /api/v1/session/{session_id}/observing-status

Response Example:

{
  "observed": 10,
  "in_process": 5,
  "pending": 3,
  "updated_at": "2025-12-17T10:30:00Z"
}

Why we need this PR?

This should close Issue #75

Users need visibility into message processing status to:

  • Monitor message processing progress
  • Debug stuck or pending messages
  • Track system health and performance
  • Understand message pipeline state

Changes

Backend (Go):

  • Model layer: MessageObservingStatus (response), MessageObservingTracking (DB table)
  • Repository layer: GORM query with LEFT JOIN (untracked messages default to 'pending')
  • Service layer: Business logic wrapper with validation
  • Handler layer: HTTP endpoint with comprehensive unit tests
  • Router: URL registration at /session/{id}/observing-status
  • Dependency injection: Full wiring in bootstrap container

Python SDK:

  • Type: MessageObservingStatus (Pydantic BaseModel)
  • Method: client.sessions.messages_observing_status(session_id)

TypeScript SDK:

  • Schema: MessageObservingStatusSchema (Zod validation)
  • Type: MessageObservingStatus (TypeScript interface)
  • Method: client.sessions.messagesObservingStatus(sessionId)

Database:

  • Migration: 002_create_message_observing_tracking.sql
  • Table: message_observing_tracking with FK to messages(id) ON DELETE CASCADE
  • Indexes: On message_id and observing_status for query performance

Impact Areas

Which part of Acontext would this feature affect?

  • Client SDK (Python)
  • Client SDK (TypeScript)
  • Core Service
  • API Server
  • Dashboard
  • CLI Tool
  • Documentation
  • Other: ...

Implementation Tasks

  • Design and create separate tracking table schema
  • Implement Go API backend (Model → Repo → Service → Handler)
  • Add dependency injection wiring
  • Implement Python SDK method
  • Implement TypeScript SDK method
  • Write handler unit tests with mocks
  • Run database migration (pending guidance)
  • End-to-end testing after migration

Testing

Completed:

  • Go handler unit tests (all passing)
    • Success case with mock data
    • Empty session_id validation
    • Service error handling
    • Nil service panic test

Pending:

  • Database migration execution
  • Integration tests with real database
  • End-to-end API testing
  • Python SDK integration test
  • TypeScript SDK integration test

Note: Awaiting guidance on running migrations in development environment for full integration testing.


Key Files:

  • src/server/api/go/internal/modules/handler/message_observing_handler.go
  • src/server/api/go/internal/modules/repo/message_observing_repo_impl.go
  • src/client/acontext-py/src/acontext/resources/sessions.py
  • src/client/acontext-ts/src/resources/sessions.ts
  • src/server/core/migrations/002_create_message_observing_tracking.sql

Questions for Reviewers

  1. How should I run the database migration locally for integration testing?
  2. Should repository tests be added before or after migration?
  3. Any architectural feedback on the separate table approach?

@GenerQAQ
Copy link
Copy Markdown
Contributor

The Message model has a session_task_process_status field, which is used to track the processing status of messages:

  • pending: waiting for processing
  • running: in progress
  • success: Processing successful
  • failed: Processing failed
    async def process_session_pending_message(
    project_config: ProjectConfig, project_id: asUUID, session_id: asUUID
    ) -> Result[None]:
    pending_message_ids = None
    try:
    async with DB_CLIENT.get_session_context() as session:
    r = await MD.get_message_ids(
    session,
    session_id,
    limit=(
    project_config.project_session_message_buffer_max_overflow
    + project_config.project_session_message_buffer_max_turns
    ),
    asc=True,
    )
    pending_message_ids, eil = r.unpack()
    if eil:
    return r
    if not pending_message_ids:
    return Result.resolve(None)
    await MD.update_message_status_to(
    session, pending_message_ids, TaskStatus.RUNNING
    )
    LOG.info(f"Unpending {len(pending_message_ids)} session messages to process")
    async with DB_CLIENT.get_session_context() as session:
    r = await MD.fetch_messages_data_by_ids(session, pending_message_ids)
    messages, eil = r.unpack()
    if eil:
    return r
    r = await MD.fetch_previous_messages_by_datetime(
    session,
    session_id,
    messages[0].created_at,
    limit=project_config.project_session_message_use_previous_messages_turns,
    )
    messages_data = [
    MessageBlob(
    message_id=m.id, role=m.role, parts=m.parts, task_id=m.task_id
    )
    for m in messages
    ]
    r = await AT.task_agent_curd(
    project_id,
    session_id,
    messages_data,
    max_iterations=project_config.default_task_agent_max_iterations,
    previous_progress_num=project_config.default_task_agent_previous_progress_num,
    )
    after_status = TaskStatus.SUCCESS
    if not r.ok():
    after_status = TaskStatus.FAILED
    async with DB_CLIENT.get_session_context() as session:
    await MD.update_message_status_to(
    session, pending_message_ids, after_status
    )
    return r
    except Exception as e:
    if pending_message_ids is None:
    raise e
    LOG.error(
    f"Exception while processing session pending message: {e}, rollback {len(pending_message_ids)} message status to failed"
    )
    async with DB_CLIENT.get_session_context() as session:
    await MD.update_message_status_to(
    session, pending_message_ids, TaskStatus.FAILED
    )
    raise e

- Query existing session_task_process_status field from messages table
- Map values: success→observed, running→in_process, pending→pending
- Implement Go API endpoint: GET /session/{id}/observing-status
- Add Python SDK method: messages_observing_status()
- Add TypeScript SDK method: messagesObservingStatus()
- Returns counts of observed, in_process, pending messages

Backend (Go):
- Model layer: MessageObservingStatus response type
- Repository layer: Direct query on messages table
- Service layer: Business logic wrapper
- Handler layer: HTTP endpoint with unit tests
- Router: URL registration
- Dependency injection: Full wiring

Python SDK:
- Type: MessageObservingStatus (Pydantic)
- Method: sessions.messages_observing_status()

TypeScript SDK:
- Schema: MessageObservingStatusSchema (Zod)
- Type: MessageObservingStatus
- Method: sessions.messagesObservingStatus()

Tests:
- Handler tests with mocks (all passing)

Note: Uses existing session_task_process_status field. No database migration needed.
@slyt3 slyt3 force-pushed the feat/message-observing-status branch from 989aca8 to 55dd818 Compare December 18, 2025 09:41
@slyt3
Copy link
Copy Markdown
Contributor Author

slyt3 commented Dec 18, 2025

@gusye1234 @GenerQAQ

Updated Implementation

Changed the approach how @gus asked:

What Changed:

  • Now uses existing session_task_process_status field from messages table
  • Maps values: successobserved, runningin_process, pendingpending
  • Removed separate message_observing_tracking table
  • Removed database migration

The session_task_process_status field already tracks message processing status with different names. API/SDK layer now maps these to the requested format.

@slyt3
Copy link
Copy Markdown
Contributor Author

slyt3 commented Dec 18, 2025

I tested in Docker env and tested the endpoints, everythings works as expected

Copy link
Copy Markdown
Contributor

@gusye1234 gusye1234 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not introduce a new handler for this api, just use the session handler please.

Comment thread src/client/acontext-py/src/acontext/resources/sessions.py
Comment thread src/server/api/go/cmd/server/main.go Outdated
@slyt3
Copy link
Copy Markdown
Contributor Author

slyt3 commented Dec 18, 2025

Let's not introduce a new handler for this api, just use the session handler please.

okey, gonna keep session-related endpoints togerther, will move to method to SessionHandler later this day.

but should i keep the service/repo layers separate or consolidate to session service/repo too?

yeah forgot to add async version in async_sessions.py ngl missed that.

will update and push later this day. thanks

@gusye1234
Copy link
Copy Markdown
Contributor

gusye1234 commented Dec 18, 2025

but should i keep the service/repo layers separate or consolidate to session service/repo too?

Oops, miss this.

I don't think you need to add new files in modules/repo and modules/model, those are for ORM.

You can just simply add this function and the struct in modules/handle/session.go, just like every other session apis.

@GenerQAQ , can you look up this PR and back me up? I think we have no need to add new file in /repo and /model in this PR, right?

@GenerQAQ
Copy link
Copy Markdown
Contributor

Agree, and also merge it into the session service/repository to maintain session consistency. @slyt3

@GenerQAQ
Copy link
Copy Markdown
Contributor

Can you modify the PR's into to dev?

@slyt3
Copy link
Copy Markdown
Contributor Author

slyt3 commented Dec 19, 2025

Can you modify the PR's into to dev?

Yes ofcourse

Based on review feedback:
- Moved method to SessionHandler instead of separate handler
- Consolidated into existing session service/repo/model files
- Added async Python client method
- Deleted all separate message_observing_* files

Changes:
- Go: Added GetSessionObservingStatus to SessionHandler/Service/Repo
- Model: Added MessageObservingStatus struct to session.go
- Tests: Added handler tests to session_test.go
- Python: Added async version in async_sessions.py
@slyt3 slyt3 changed the base branch from main to dev December 19, 2025 10:19
@slyt3 slyt3 requested a review from gusye1234 December 19, 2025 11:07
@slyt3
Copy link
Copy Markdown
Contributor Author

slyt3 commented Dec 19, 2025

Fixed everything, what you asked hope so I didnt miss anything @GenerQAQ @gusye1234

Copy link
Copy Markdown
Contributor

@GenerQAQ GenerQAQ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thank you very much for your submission. There are a few minor issues. Could you please handle them again :)

Comment thread src/server/api/go/internal/modules/handler/session.go Outdated
Comment thread src/server/api/go/internal/modules/repo/session.go
Comment thread src/server/api/go/internal/modules/service/session.go
- fixed aligment in main.go and router.go
- updated swagger doc
@slyt3
Copy link
Copy Markdown
Contributor Author

slyt3 commented Dec 19, 2025

gofmt -w . and swag fmt

Resolved all formatting and linting issues :)

@slyt3 slyt3 requested a review from GenerQAQ December 19, 2025 15:06
@GenerQAQ
Copy link
Copy Markdown
Contributor

gofmt -w . and swag fmt

Resolved all formatting and linting issues :)

Previously overlooked a point, now the API path needs to follow snake_case. /observing-status -> /observing_status

@slyt3
Copy link
Copy Markdown
Contributor Author

slyt3 commented Dec 20, 2025

gofmt -w . and swag fmt

Resolved all formatting and linting issues :)

Previously overlooked a point, now the API path needs to follow snake_case. /observing-status -> /observing_status

Oh yea nice catch didnt even noticed

@slyt3
Copy link
Copy Markdown
Contributor Author

slyt3 commented Dec 20, 2025

gofmt -w . and swag fmt
Resolved all formatting and linting issues :)

Previously overlooked a point, now the API path needs to follow snake_case. /observing-status -> /observing_status

Fixed API path to use snake_case: /observing_status and updated all SDKs and regenerated swagger docs.

Copy link
Copy Markdown
Contributor

@GenerQAQ GenerQAQ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

changed /observing-status -> observing_status

- Updated router registration
- Updated swagger doc
- Updated Python SDK (sync + async)
- Updated TS SDK
@slyt3 slyt3 force-pushed the feat/message-observing-status branch from de73f7f to 1dede2e Compare December 20, 2025 10:01
@slyt3
Copy link
Copy Markdown
Contributor Author

slyt3 commented Dec 20, 2025

fixed in tests too, missed that, sorry

@GenerQAQ GenerQAQ merged commit 2e88977 into memodb-io:dev Dec 20, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants