Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions .github/workflows/e2e-handshake.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
name: E2E Handshake Test

on:
push:
branches: [ main, dev ]
paths:
- 'src/server/**'
- '.github/workflows/e2e-handshake.yaml'
pull_request:
branches: [ main, dev ]
paths:
- 'src/server/**'
- '.github/workflows/e2e-handshake.yaml'
workflow_dispatch:

jobs:
e2e-test:
name: Run E2E Handshake
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3


- name: Build Images
run: |
cd src/server
docker compose -f docker-compose.test.yml build

- name: Run Handshake Test
run: |
cd src/server
docker compose -f docker-compose.test.yml up --abort-on-container-exit --exit-code-from tests

- name: Collect Container Logs
if: failure()
run: |
cd src/server
docker compose -f docker-compose.test.yml logs
154 changes: 154 additions & 0 deletions src/server/docker-compose.test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
name: acontext-e2e-test
services:
# --- PostgreSQL ---
pg:
image: pgvector/pgvector:pg16
environment:
POSTGRES_USER: acontext
POSTGRES_PASSWORD: helloworld
POSTGRES_DB: acontext_test
healthcheck:
test: [ "CMD-SHELL", "pg_isready -U acontext -d acontext_test" ]
interval: 5s
timeout: 3s
retries: 5

# --- Redis ---
redis:
image: redis:7.4
command: [ "redis-server", "--requirepass", "helloworld" ]
healthcheck:
test: [ "CMD", "redis-cli", "-a", "helloworld", "ping" ]
interval: 5s
timeout: 3s
retries: 5

# --- RabbitMQ ---
rabbitmq:
image: rabbitmq:4-management
environment:
RABBITMQ_DEFAULT_USER: acontext
RABBITMQ_DEFAULT_PASS: helloworld
healthcheck:
test: [ "CMD", "rabbitmq-diagnostics", "ping" ]
interval: 10s
timeout: 5s
retries: 5

# --- SeaweedFS ---
seaweedfs:
image: chrislusf/seaweedfs:4.02
environment:
WEED_S3_ACCESS_KEY: acontext
WEED_S3_SECRET_KEY: helloworld
command: server -s3 -s3.port=9000 -dir=/data -volume.max=100 -volume.publicUrl=http://seaweedfs:9000 -ip=seaweedfs -ip.bind=0.0.0.0
healthcheck:
test: [ "CMD-SHELL", "wget -q -O- http://localhost:9000 >/dev/null 2>&1 || exit 1" ]
interval: 5s
timeout: 3s
retries: 10

# --- SeaweedFS Setup ---
seaweedfs-setup:
image: amazon/aws-cli:2.32.6
depends_on:
seaweedfs:
condition: service_healthy
environment:
AWS_ACCESS_KEY_ID: acontext
AWS_SECRET_ACCESS_KEY: helloworld
AWS_DEFAULT_REGION: us-east-1
# Note: 'sleep infinity' prevents race condition with --exit-code-from tests.
# If this container exits immediately, Docker Compose shuts down the entire stack
# prematurely, causing Core to crash during startup. The bucket is still created
# before Core needs it, so this approach works reliably in the test environment.
entrypoint:
- /bin/sh
- -c
- 'set -e; BUCKET_NAME=acontext-assets; echo "Checking bucket: $$BUCKET_NAME"; for i in 1 2 3 4 5; do if aws --endpoint-url=http://seaweedfs:9000 s3 ls s3://$$BUCKET_NAME >/dev/null 2>&1; then echo "Bucket exists"; break; fi; echo "Creating bucket (attempt $$i)..."; aws --endpoint-url=http://seaweedfs:9000 s3 mb s3://$$BUCKET_NAME 2>/dev/null && break || sleep 2; done; echo "S3 ready"; sleep infinity'
Comment thread
GenerQAQ marked this conversation as resolved.

# --- Python Core ---
core:
build:
context: ./core
environment:
DATABASE_URL: postgresql://acontext:helloworld@pg:5432/acontext_test
MQ_URL: amqp://acontext:helloworld@rabbitmq:5672/
REDIS_URL: redis://:helloworld@redis:6379
S3_ENDPOINT: http://seaweedfs:9000
LLM_SIMPLE_MODEL: gpt-4o # Mock/Stub if possible, or dummy
LLM_API_KEY: fake-key
OTEL_ENABLED: "false"
LOGGING_LEVEL: DEBUG
depends_on:
pg: { condition: service_healthy }
redis: { condition: service_healthy }
rabbitmq: { condition: service_healthy }
seaweedfs: { condition: service_healthy }
seaweedfs-setup: { condition: service_started }
Comment thread
GenerQAQ marked this conversation as resolved.
healthcheck:
test: [ "CMD-SHELL", "wget -q -O- http://localhost:8000/health || exit 1" ]
interval: 10s
timeout: 5s
retries: 5

# --- Go API ---
api:
build:
context: ./api/go
environment:
DATABASE_HOST: pg
DATABASE_USER: acontext
DATABASE_PASSWORD: helloworld
DATABASE_NAME: acontext_test
DATABASE_EXPORT_PORT: 5432
DATABASE_ENABLE_TLS: "false"
REDIS_HOST: redis
REDIS_PASSWORD: helloworld
REDIS_EXPORT_PORT: 6379
REDIS_ENABLE_TLS: "false"
RABBITMQ_HOST: rabbitmq
RABBITMQ_USER: acontext
RABBITMQ_PASSWORD: helloworld
RABBITMQ_VHOST: /
RABBITMQ_VHOST_ENCODED: "%2F"
RABBITMQ_EXPORT_PORT: 5672
RABBITMQ_ENABLE_TLS: "false"
S3_ENDPOINT: http://seaweedfs:9000
S3_INTERNAL_ENDPOINT: http://seaweedfs:9000
S3_REGION: us-east-1
S3_ACCESS_KEY: acontext
S3_SECRET_KEY: helloworld
S3_BUCKET: acontext-assets
CORE_BASE_URL: http://core:8000
APP_TELEMETRY_ENABLED: "false"
ENABLE_ARGON2_VERIFICATION: "false"
ARTIFACT_MAX_UPLOAD_SIZE_BYTES: 16777216
ROOT_API_BEARER_TOKEN: test-token
APP_ROOT_SECRETPEPPER: test-pepper
depends_on:
core: { condition: service_healthy }
healthcheck:
test: [ "CMD-SHELL", "wget -q -O- http://localhost:8029/health || exit 1" ]
interval: 10s
timeout: 5s
retries: 5

# --- Test Runner ---
tests:
image: python:3.12-slim
environment:
API_URL: http://api:8029
CORE_URL: http://core:8000
DB_URL: postgresql://acontext:helloworld@pg:5432/acontext_test
TEST_TOKEN: test-token
volumes:
- ./tests:/app/tests
working_dir: /app
depends_on:
api: { condition: service_healthy }
command: [ "sh", "-c", "pip install httpx asyncpg pydantic && python tests/e2e/handshake_test.py" ]

networks:
default:
name: acontext-e2e-test
122 changes: 122 additions & 0 deletions src/server/tests/e2e/handshake_test.py
Comment thread
slyt3 marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import asyncio
import httpx
import asyncpg
import os
import uuid
import hmac
import hashlib
import json

API_URL = os.getenv("API_URL", "http://api:8029")
CORE_URL = os.getenv("CORE_URL", "http://core:8000")
DB_URL = os.getenv("DB_URL", "postgresql://acontext:helloworld@pg:5432/acontext_test")
TEST_TOKEN_PREFIX = "sk-ac-"
PEPPER = "test-pepper"

def generate_hmac(secret, pepper):
h = hmac.new(pepper.encode(), secret.encode(), hashlib.sha256)
return h.hexdigest()

async def wait_for_services():
print("Waiting for API and Core health checks...")
async with httpx.AsyncClient() as client:
for i in range(30):
try:
api_resp = await client.get(f"{API_URL}/health", timeout=2.0)
core_resp = await client.get(f"{CORE_URL}/health", timeout=2.0)
if api_resp.status_code == 200 and core_resp.status_code == 200:
print("Both services are healthy!")
return True
print(f"Waiting... API: {api_resp.status_code}, Core: {core_resp.status_code}")
except Exception as e:
print(f"Waiting... Error: {e}")
await asyncio.sleep(2)
print("Timeout waiting for services")
return False

async def seed_project(conn, project_id, secret):
print(f"Seeding project {project_id}...")
token_hmac = generate_hmac(secret, PEPPER)
# Configure project for immediate message processing (no buffer wait)
configs = {
"project_session_message_buffer_max_turns": 1,
"project_session_message_buffer_ttl_seconds": 2
}
await conn.execute(
"INSERT INTO projects (id, secret_key_hmac, secret_key_hash_phc, configs) VALUES ($1, $2, $3, $4)",
project_id, token_hmac, "dummy-phc", json.dumps(configs)
)

async def run_test():
if not await wait_for_services():
return False

project_id = uuid.uuid4()
secret = str(uuid.uuid4())
bearer_token = f"{TEST_TOKEN_PREFIX}{secret}" # matching cfg.Root.ProjectBearerTokenPrefix "sk-ac-"

print(f"Connecting to DB at {DB_URL}...")
conn = await asyncpg.connect(DB_URL)
try:
await seed_project(conn, project_id, secret)

async with httpx.AsyncClient() as client:
headers = {"Authorization": f"Bearer {bearer_token}"}

# 1. Create session
print("Creating session...")
resp = await client.post(
f"{API_URL}/api/v1/session",
json={},
headers=headers
)
print(f"Session Response: {resp.status_code}, {resp.text}")
assert resp.status_code in (200, 201)
session_id = resp.json()["data"]["id"]

# 2. Store message
print("Storing message...")
resp = await client.post(
f"{API_URL}/api/v1/session/{session_id}/messages",
json={
"format": "acontext",
"blob": {
"role": "user",
"parts": [{"type": "text", "text": "Hello, bot!"}]
}
},
headers=headers
)
print(f"Message Response: {resp.status_code}, {resp.text}")
assert resp.status_code in (200, 201)
message_id = resp.json()["data"]["id"]

# 3. Poll for processing
print("Polling for message processing (Python Core handshake)...")
try:
msg_uuid = uuid.UUID(message_id)
except (ValueError, AttributeError) as e:
print(f"Invalid message ID format: {message_id}")
return False

for i in range(30):
status = await conn.fetchval(
"SELECT session_task_process_status FROM messages WHERE id = $1",
msg_uuid
)
print(f"Current status: {status}")
if status in ("success", "failed"):
print(f"Handshake successful! Final status: {status}")
return True
await asyncio.sleep(2)

print("Timed out waiting for message processing")
return False

finally:
await conn.close()

if __name__ == "__main__":
success = asyncio.run(run_test())
if not success:
exit(1)