Skip to content

Commit b8bc482

Browse files
greatmengqigreatmengqi
andauthored
refactor: root release config in gateway runtime (#2611)
Co-authored-by: greatmengqi <chenmengqi.0376@bytedance.com>
1 parent 748429e commit b8bc482

9 files changed

Lines changed: 116 additions & 34 deletions

File tree

backend/app/channels/service.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import logging
66
import os
7-
from typing import Any
7+
from typing import TYPE_CHECKING, Any
88

99
from app.channels.base import Channel
1010
from app.channels.manager import DEFAULT_GATEWAY_URL, DEFAULT_LANGGRAPH_URL, ChannelManager
@@ -13,6 +13,9 @@
1313

1414
logger = logging.getLogger(__name__)
1515

16+
if TYPE_CHECKING:
17+
from deerflow.config.app_config import AppConfig
18+
1619
# Channel name → import path for lazy loading
1720
_CHANNEL_REGISTRY: dict[str, str] = {
1821
"discord": "app.channels.discord:DiscordChannel",
@@ -75,14 +78,15 @@ def __init__(self, channels_config: dict[str, Any] | None = None) -> None:
7578
self._running = False
7679

7780
@classmethod
78-
def from_app_config(cls) -> ChannelService:
81+
def from_app_config(cls, app_config: AppConfig | None = None) -> ChannelService:
7982
"""Create a ChannelService from the application config."""
80-
from deerflow.config.app_config import get_app_config
83+
if app_config is None:
84+
from deerflow.config.app_config import get_app_config
8185

82-
config = get_app_config()
86+
app_config = get_app_config()
8387
channels_config = {}
8488
# extra fields are allowed by AppConfig (extra="allow")
85-
extra = config.model_extra or {}
89+
extra = app_config.model_extra or {}
8690
if "channels" in extra:
8791
channels_config = extra["channels"]
8892
return cls(channels_config=channels_config)
@@ -201,12 +205,12 @@ def get_channel_service() -> ChannelService | None:
201205
return _channel_service
202206

203207

204-
async def start_channel_service() -> ChannelService:
208+
async def start_channel_service(app_config: AppConfig | None = None) -> ChannelService:
205209
"""Create and start the global ChannelService from app config."""
206210
global _channel_service
207211
if _channel_service is not None:
208212
return _channel_service
209-
_channel_service = ChannelService.from_app_config()
213+
_channel_service = ChannelService.from_app_config(app_config)
210214
await _channel_service.start()
211215
return _channel_service
212216

backend/app/gateway/app.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@
2828
threads,
2929
uploads,
3030
)
31-
from deerflow.config.app_config import get_app_config
31+
from deerflow.config import app_config as deerflow_app_config
32+
33+
AppConfig = deerflow_app_config.AppConfig
34+
get_app_config = deerflow_app_config.get_app_config
3235

3336
# Configure logging
3437
logging.basicConfig(
@@ -160,7 +163,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
160163

161164
# Load config and check necessary environment variables at startup
162165
try:
163-
get_app_config()
166+
app.state.config = get_app_config()
164167
logger.info("Configuration loaded successfully")
165168
except Exception as e:
166169
error_msg = f"Failed to load configuration during gateway startup: {e}"
@@ -181,7 +184,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
181184
try:
182185
from app.channels.service import start_channel_service
183186

184-
channel_service = await start_channel_service()
187+
channel_service = await start_channel_service(app.state.config)
185188
logger.info("Channel service started: %s", channel_service.get_status())
186189
except Exception:
187190
logger.exception("No IM channels configured or channel service failed to start")

backend/app/gateway/deps.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from fastapi import FastAPI, HTTPException, Request
1616
from langgraph.types import Checkpointer
1717

18+
from deerflow.config.app_config import AppConfig
1819
from deerflow.persistence.feedback import FeedbackRepository
1920
from deerflow.runtime import RunContext, RunManager, StreamBridge
2021
from deerflow.runtime.events.store.base import RunEventStore
@@ -29,6 +30,14 @@
2930
T = TypeVar("T")
3031

3132

33+
def get_config(request: Request) -> AppConfig:
34+
"""Return the app-scoped ``AppConfig`` stored on ``app.state``."""
35+
config = getattr(request.app.state, "config", None)
36+
if config is None:
37+
raise HTTPException(status_code=503, detail="Configuration not available")
38+
return config
39+
40+
3241
@asynccontextmanager
3342
async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
3443
"""Bootstrap and tear down all LangGraph runtime singletons.
@@ -38,22 +47,24 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
3847
async with langgraph_runtime(app):
3948
yield
4049
"""
41-
from deerflow.config import get_app_config
4250
from deerflow.persistence.engine import close_engine, get_session_factory, init_engine_from_config
4351
from deerflow.runtime import make_store, make_stream_bridge
4452
from deerflow.runtime.checkpointer.async_provider import make_checkpointer
4553
from deerflow.runtime.events.store import make_run_event_store
4654

4755
async with AsyncExitStack() as stack:
48-
app.state.stream_bridge = await stack.enter_async_context(make_stream_bridge())
56+
config = getattr(app.state, "config", None)
57+
if config is None:
58+
raise RuntimeError("langgraph_runtime() requires app.state.config to be initialized")
59+
60+
app.state.stream_bridge = await stack.enter_async_context(make_stream_bridge(config))
4961

5062
# Initialize persistence engine BEFORE checkpointer so that
5163
# auto-create-database logic runs first (postgres backend).
52-
config = get_app_config()
5364
await init_engine_from_config(config.database)
5465

55-
app.state.checkpointer = await stack.enter_async_context(make_checkpointer())
56-
app.state.store = await stack.enter_async_context(make_store())
66+
app.state.checkpointer = await stack.enter_async_context(make_checkpointer(config))
67+
app.state.store = await stack.enter_async_context(make_store(config))
5768

5869
# Initialize repositories — one get_session_factory() call for all.
5970
sf = get_session_factory()
@@ -130,13 +141,12 @@ def get_run_context(request: Request) -> RunContext:
130141
131142
Returns a *base* context with infrastructure dependencies.
132143
"""
133-
from deerflow.config import get_app_config
134-
144+
config = get_config(request)
135145
return RunContext(
136146
checkpointer=get_checkpointer(request),
137147
store=get_store(request),
138148
event_store=get_run_event_store(request),
139-
run_events_config=getattr(get_app_config(), "run_events", None),
149+
run_events_config=getattr(config, "run_events", None),
140150
thread_store=get_thread_store(request),
141151
)
142152

backend/packages/harness/deerflow/config/app_config.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class AppConfig(BaseModel):
7373
subagents: SubagentsAppConfig = Field(default_factory=SubagentsAppConfig, description="Subagent runtime configuration")
7474
guardrails: GuardrailsConfig = Field(default_factory=GuardrailsConfig, description="Guardrail middleware configuration")
7575
circuit_breaker: CircuitBreakerConfig = Field(default_factory=CircuitBreakerConfig, description="LLM circuit breaker configuration")
76-
model_config = ConfigDict(extra="allow", frozen=False)
76+
model_config = ConfigDict(extra="allow")
7777
database: DatabaseConfig = Field(default_factory=DatabaseConfig, description="Unified database backend configuration")
7878
run_events: RunEventsConfig = Field(default_factory=RunEventsConfig, description="Run event storage configuration")
7979
checkpointer: CheckpointerConfig | None = Field(default=None, description="Checkpointer configuration")
@@ -292,6 +292,9 @@ def get_tool_group_config(self, name: str) -> ToolGroupConfig | None:
292292
return next((group for group in self.tool_groups if group.name == name), None)
293293

294294

295+
# Compatibility singleton layer for code paths that have not yet been
296+
# migrated to explicit ``AppConfig`` threading. New composition roots should
297+
# prefer constructing ``AppConfig`` once and passing it down directly.
295298
_app_config: AppConfig | None = None
296299
_app_config_path: Path | None = None
297300
_app_config_mtime: float | None = None

backend/packages/harness/deerflow/runtime/checkpointer/async_provider.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
from langgraph.types import Checkpointer
2626

27-
from deerflow.config.app_config import get_app_config
27+
from deerflow.config.app_config import AppConfig, get_app_config
2828
from deerflow.runtime.checkpointer.provider import (
2929
POSTGRES_CONN_REQUIRED,
3030
POSTGRES_INSTALL,
@@ -123,11 +123,11 @@ async def _async_checkpointer_from_database(db_config) -> AsyncIterator[Checkpoi
123123

124124

125125
@contextlib.asynccontextmanager
126-
async def make_checkpointer() -> AsyncIterator[Checkpointer]:
126+
async def make_checkpointer(app_config: AppConfig | None = None) -> AsyncIterator[Checkpointer]:
127127
"""Async context manager that yields a checkpointer for the caller's lifetime.
128128
Resources are opened on enter and closed on exit -- no global state::
129129
130-
async with make_checkpointer() as checkpointer:
130+
async with make_checkpointer(app_config) as checkpointer:
131131
app.state.checkpointer = checkpointer
132132
133133
Yields an ``InMemorySaver`` when no checkpointer is configured in *config.yaml*.
@@ -138,16 +138,17 @@ async def make_checkpointer() -> AsyncIterator[Checkpointer]:
138138
3. Default InMemorySaver
139139
"""
140140

141-
config = get_app_config()
141+
if app_config is None:
142+
app_config = get_app_config()
142143

143144
# Legacy: standalone checkpointer config takes precedence
144-
if config.checkpointer is not None:
145-
async with _async_checkpointer(config.checkpointer) as saver:
145+
if app_config.checkpointer is not None:
146+
async with _async_checkpointer(app_config.checkpointer) as saver:
146147
yield saver
147148
return
148149

149150
# Unified database config
150-
db_config = getattr(config, "database", None)
151+
db_config = getattr(app_config, "database", None)
151152
if db_config is not None and db_config.backend != "memory":
152153
async with _async_checkpointer_from_database(db_config) as saver:
153154
yield saver

backend/packages/harness/deerflow/runtime/store/async_provider.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
from langgraph.store.base import BaseStore
2525

26-
from deerflow.config.app_config import get_app_config
26+
from deerflow.config.app_config import AppConfig, get_app_config
2727
from deerflow.runtime.store.provider import POSTGRES_CONN_REQUIRED, POSTGRES_STORE_INSTALL, SQLITE_STORE_INSTALL, ensure_sqlite_parent_dir, resolve_sqlite_conn_str
2828

2929
logger = logging.getLogger(__name__)
@@ -86,28 +86,29 @@ async def _async_store(config) -> AsyncIterator[BaseStore]:
8686

8787

8888
@contextlib.asynccontextmanager
89-
async def make_store() -> AsyncIterator[BaseStore]:
89+
async def make_store(app_config: AppConfig | None = None) -> AsyncIterator[BaseStore]:
9090
"""Async context manager that yields a Store whose backend matches the
9191
configured checkpointer.
9292
9393
Reads from the same ``checkpointer`` section of *config.yaml* used by
9494
:func:`deerflow.runtime.checkpointer.async_provider.make_checkpointer` so
9595
that both singletons always use the same persistence technology::
9696
97-
async with make_store() as store:
97+
async with make_store(app_config) as store:
9898
app.state.store = store
9999
100100
Yields an :class:`~langgraph.store.memory.InMemoryStore` when no
101101
``checkpointer`` section is configured (emits a WARNING in that case).
102102
"""
103-
config = get_app_config()
103+
if app_config is None:
104+
app_config = get_app_config()
104105

105-
if config.checkpointer is None:
106+
if app_config.checkpointer is None:
106107
from langgraph.store.memory import InMemoryStore
107108

108109
logger.warning("No 'checkpointer' section in config.yaml — using InMemoryStore for the store. Thread list will be lost on server restart. Configure a sqlite or postgres backend for persistence.")
109110
yield InMemoryStore()
110111
return
111112

112-
async with _async_store(config.checkpointer) as store:
113+
async with _async_store(app_config.checkpointer) as store:
113114
yield store

backend/packages/harness/deerflow/runtime/stream_bridge/async_provider.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import logging
1818
from collections.abc import AsyncIterator
1919

20+
from deerflow.config.app_config import AppConfig
2021
from deerflow.config.stream_bridge_config import get_stream_bridge_config
2122

2223
from .base import StreamBridge
@@ -25,14 +26,16 @@
2526

2627

2728
@contextlib.asynccontextmanager
28-
async def make_stream_bridge(config=None) -> AsyncIterator[StreamBridge]:
29+
async def make_stream_bridge(app_config: AppConfig | None = None) -> AsyncIterator[StreamBridge]:
2930
"""Async context manager that yields a :class:`StreamBridge`.
3031
3132
Falls back to :class:`MemoryStreamBridge` when no configuration is
3233
provided and nothing is set globally.
3334
"""
34-
if config is None:
35+
if app_config is None:
3536
config = get_stream_bridge_config()
37+
else:
38+
config = app_config.stream_bridge
3639

3740
if config is None or config.type == "memory":
3841
from deerflow.runtime.stream_bridge.memory import MemoryStreamBridge

backend/tests/test_channels.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2032,6 +2032,22 @@ def test_config_service_urls_override_env(self, monkeypatch):
20322032
assert service.manager._langgraph_url == "http://custom-gateway:8001/api"
20332033
assert service.manager._gateway_url == "http://custom-gateway:8001"
20342034

2035+
def test_from_app_config_uses_explicit_config(self):
2036+
from app.channels.service import ChannelService
2037+
2038+
app_config = SimpleNamespace(
2039+
model_extra={
2040+
"channels": {
2041+
"telegram": {"enabled": False},
2042+
}
2043+
}
2044+
)
2045+
2046+
with patch("deerflow.config.app_config.get_app_config", side_effect=AssertionError("should not read global config")):
2047+
service = ChannelService.from_app_config(app_config)
2048+
2049+
assert service._config == {"telegram": {"enabled": False}}
2050+
20352051
def test_disabled_channel_with_string_creds_emits_warning(self, caplog):
20362052
"""Warning is emitted when a channel has string credentials but enabled=false."""
20372053
import logging
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from __future__ import annotations
2+
3+
from fastapi import Depends, FastAPI
4+
from fastapi.testclient import TestClient
5+
6+
from app.gateway.deps import get_config
7+
from deerflow.config.app_config import AppConfig
8+
from deerflow.config.sandbox_config import SandboxConfig
9+
10+
11+
def test_get_config_returns_app_state_config():
12+
"""get_config should return the exact AppConfig stored on app.state."""
13+
app = FastAPI()
14+
config = AppConfig(sandbox=SandboxConfig(use="test"))
15+
app.state.config = config
16+
17+
@app.get("/probe")
18+
def probe(cfg: AppConfig = Depends(get_config)):
19+
return {"same_identity": cfg is config, "log_level": cfg.log_level}
20+
21+
client = TestClient(app)
22+
response = client.get("/probe")
23+
24+
assert response.status_code == 200
25+
assert response.json() == {"same_identity": True, "log_level": "info"}
26+
27+
28+
def test_get_config_reads_updated_app_state():
29+
"""Swapping app.state.config should be visible to the dependency."""
30+
app = FastAPI()
31+
app.state.config = AppConfig(sandbox=SandboxConfig(use="test"), log_level="info")
32+
33+
@app.get("/log-level")
34+
def log_level(cfg: AppConfig = Depends(get_config)):
35+
return {"level": cfg.log_level}
36+
37+
client = TestClient(app)
38+
assert client.get("/log-level").json() == {"level": "info"}
39+
40+
app.state.config = app.state.config.model_copy(update={"log_level": "debug"})
41+
assert client.get("/log-level").json() == {"level": "debug"}

0 commit comments

Comments
 (0)