Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
afe9246
Add telemetry package to support sinks
namrataghadi-galileo Apr 5, 2026
8b57ca7
add telemetry to server docker
namrataghadi-galileo Apr 5, 2026
92cee53
make defauteventingestor use ControlEventSink
namrataghadi-galileo Apr 5, 2026
d8b10aa
add telemetry README
namrataghadi-galileo Apr 6, 2026
efd93b0
merge from add telemetry sink
namrataghadi-galileo Apr 6, 2026
946fb5a
merge from master
namrataghadi-galileo Apr 7, 2026
2c623e5
merge from main
namrataghadi-galileo Apr 7, 2026
6d6cced
Merge branch 'main' into feature/61574-refactor-server-ingestion-to-sink
namrataghadi-galileo Apr 7, 2026
9025e45
type error
namrataghadi-galileo Apr 7, 2026
d93564b
coverage
namrataghadi-galileo Apr 7, 2026
d51558c
address comment
namrataghadi-galileo Apr 14, 2026
9a36ae5
Merge branch 'main' into feature/61574-refactor-server-ingestion-to-sink
namrataghadi-galileo Apr 16, 2026
c2e9f05
add external sink
namrataghadi-galileo Apr 16, 2026
04729cc
Merge branch 'main' into feature/62793-add-external-sink
namrataghadi-galileo Apr 16, 2026
4d8c853
adding config
namrataghadi-galileo Apr 16, 2026
3fe0694
fix some p1s
namrataghadi-galileo Apr 16, 2026
f7b75ea
fix regressions
namrataghadi-galileo Apr 16, 2026
3531e48
fix regressions
namrataghadi-galileo Apr 16, 2026
c361242
fix issues
namrataghadi-galileo Apr 16, 2026
9759fbc
mypy
namrataghadi-galileo Apr 16, 2026
2a2b976
add otel sink
namrataghadi-galileo Apr 16, 2026
9232988
add otel sink
namrataghadi-galileo Apr 16, 2026
365accb
address comments
namrataghadi-galileo Apr 19, 2026
39deee7
address merge conflict
namrataghadi-galileo May 3, 2026
7b54a8d
make sure to get traget from provider
namrataghadi-galileo May 3, 2026
d1de509
Merge branch 'main' into feature/62792-add-agent-control-brigde-for-l…
namrataghadi-galileo May 5, 2026
5d44184
Merge branch 'main' into feature/61578-add-otel-support
namrataghadi-galileo May 6, 2026
1a11ad7
Merge branch 'feature/62792-add-agent-control-brigde-for-logger' into…
namrataghadi-galileo May 6, 2026
e8be30e
fix ci adn merge from main
namrataghadi-galileo May 6, 2026
cddebb1
address comment
namrataghadi-galileo May 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,24 @@ events the SDK emits through its normal event-construction flow. The default
SDK sink remains the OSS path to the Agent Control server. To use registered
or named custom sinks, set `observability_sink_name` explicitly.

The SDK also includes a built-in OpenTelemetry sink. Install the OTEL extra,
select the `otel` sink, and configure the OTLP exporter through Agent Control
settings or environment variables:

```bash
uv pip install "agent-control-sdk[otel]"
export AGENT_CONTROL_OBSERVABILITY_SINK_NAME=otel
export AGENT_CONTROL_OTEL_ENABLED=true
export AGENT_CONTROL_OTEL_ENDPOINT=http://localhost:4318/v1/traces
export AGENT_CONTROL_OTEL_HEADERS='{"authorization":"Bearer demo-token"}'
export AGENT_CONTROL_OTEL_SERVICE_NAME=awesome-bot
```

If the `otel` sink is selected without an OTLP endpoint/exporter configured,
the OTEL path stays inert and the default OSS SDK-to-server behavior still
remains unchanged unless `observability_sink_name` is explicitly switched away
from `default`.

Next, create a control in Step 4, then run the setup and agent scripts in
order to see blocking in action.

Expand Down
5 changes: 5 additions & 0 deletions sdks/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ Repository = "https://github.com/yourusername/agent-control"
[project.optional-dependencies]
strands-agents = ["strands-agents>=1.26.0"]
google-adk = ["google-adk>=1.0.0"]
otel = [
"opentelemetry-api>=1.24.0",
"opentelemetry-sdk>=1.24.0",
"opentelemetry-exporter-otlp-proto-http>=1.24.0",
]
galileo = ["agent-control-evaluator-galileo>=7.5.0"]

[dependency-groups]
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/src/agent_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ async def handle_input(user_message: str) -> str:
unregister_control_event_sink_factory,
write_events,
)
from .otel_sink import control_event_to_otel_span
from .tracing import (
get_current_span_id,
get_current_trace_id,
Expand Down Expand Up @@ -1421,6 +1422,7 @@ async def main():
"write_events",
"shutdown_observability",
"is_observability_enabled",
"control_event_to_otel_span",
"get_event_batcher",
"get_event_sink",
"get_registered_control_event_sink_factory_names",
Expand Down
29 changes: 21 additions & 8 deletions sdks/python/src/agent_control/control_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ async def chat(message: str) -> str:
from typing import Any, TypeVar

from agent_control_models import Step, normalize_action
from agent_control_telemetry import get_trace_context_from_provider

from agent_control import AgentControlClient
from agent_control.evaluation import check_evaluation_with_local
Expand All @@ -53,6 +54,25 @@ async def chat(message: str) -> str:
F = TypeVar("F", bound=Callable[..., Any])


def _resolve_control_trace_context() -> tuple[str, str]:
"""Resolve trace/span IDs for a decorated control site.

External providers, such as the Galileo bridge, are authoritative because
they may reserve the concrete span ID that the eventual LLM/tool call will
use. Without a provider, keep the existing behavior: share an active trace
but create a fresh function span for this decorated call.
"""
provider_context = get_trace_context_from_provider()
if provider_context is not None:
return provider_context["trace_id"], provider_context["span_id"]

existing_trace_id = get_current_trace_id()
if existing_trace_id:
return existing_trace_id, _generate_span_id()

return get_trace_and_span_ids()


@dataclass
class ControlContext:
"""
Expand Down Expand Up @@ -697,14 +717,7 @@ async def _execute_with_control(
# Get cached controls for local evaluation support
controls = _get_server_controls()

# Get trace context: inherit trace_id if set, always generate new span_id
# This allows multiple @control() calls to share the same trace but have unique spans
existing_trace_id = get_current_trace_id()
if existing_trace_id:
trace_id = existing_trace_id
span_id = _generate_span_id() # New span for this function
else:
trace_id, span_id = get_trace_and_span_ids() # New trace and span
trace_id, span_id = _resolve_control_trace_context()

ctx = ControlContext(
agent_name=agent.agent_name,
Expand Down
97 changes: 84 additions & 13 deletions sdks/python/src/agent_control/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
Configuration (Environment Variables):
# Observability (event batching)
AGENT_CONTROL_OBSERVABILITY_ENABLED: Enable observability (default: true)
AGENT_CONTROL_OBSERVABILITY_SINK_NAME: Selected control-event sink (default: default)
AGENT_CONTROL_OBSERVABILITY_SINK_CONFIG: JSON config for the selected sink
AGENT_CONTROL_OTEL_ENABLED: Enable the built-in OTEL sink (default: false)
AGENT_CONTROL_OTEL_ENDPOINT: OTLP HTTP endpoint for exported control-event spans
AGENT_CONTROL_OTEL_HEADERS: JSON object of OTLP exporter headers
AGENT_CONTROL_OTEL_SERVICE_NAME: OTEL service.name for emitted spans
AGENT_CONTROL_BATCH_SIZE: Max events per batch (default: 100)
AGENT_CONTROL_FLUSH_INTERVAL: Seconds between flushes (default: 5.0)
AGENT_CONTROL_SHUTDOWN_JOIN_TIMEOUT: Seconds to wait for worker shutdown (default: 5.0)
Expand Down Expand Up @@ -75,6 +81,11 @@
if TYPE_CHECKING:
from agent_control_models import ControlExecutionEvent

from .otel_sink import (
OTEL_CONTROL_EVENT_SINK_NAME,
create_otel_control_event_sink,
)

# =============================================================================
# Logger Setup - Standard Library Pattern
# =============================================================================
Expand Down Expand Up @@ -816,6 +827,19 @@ def get_stats(self) -> dict:
_configured_named_event_sink: ControlEventSink | None = None
_configured_named_event_sink_selection: ControlEventSinkSelection | None = None
_configured_named_event_sink_lock = threading.Lock()
_used_custom_event_sinks: list[ControlEventSink] = []
_used_custom_event_sinks_lock = threading.Lock()


def _register_builtin_control_event_sink_factories() -> None:
"""Ensure built-in named sink factories are available."""
_named_event_sink_factories.register(
OTEL_CONTROL_EVENT_SINK_NAME,
create_otel_control_event_sink,
)


_register_builtin_control_event_sink_factories()


class _BatcherControlEventSink(BaseControlEventSink):
Expand Down Expand Up @@ -907,12 +931,37 @@ def get_registered_control_event_sink_factory_names() -> tuple[str, ...]:
return _named_event_sink_factories.registered_names()


def _remember_custom_control_event_sinks(sinks: Sequence[ControlEventSink]) -> None:
"""Track custom sink instances that should be cleaned up on shutdown."""
with _used_custom_event_sinks_lock:
for sink in sinks:
if not any(remembered_sink is sink for remembered_sink in _used_custom_event_sinks):
_used_custom_event_sinks.append(sink)


def _sink_is_active(sink: ControlEventSink) -> bool:
"""Return whether a sink instance is currently able to deliver events."""
is_active = getattr(sink, "is_active", None)
if callable(is_active):
return bool(is_active())
return True


def _get_sink_selection() -> ControlEventSinkSelection:
"""Build the current sink-selection model from SDK settings."""
settings = get_settings()
config: JSONObject = dict(settings.observability_sink_config or {})
if settings.observability_sink_name == OTEL_CONTROL_EVENT_SINK_NAME:
# Materialize OTEL-specific settings into the selection so that
# changes to otel_endpoint / otel_headers / otel_service_name /
# otel_enabled invalidate the cached sink instance.
config.setdefault("enabled", settings.otel_enabled)
config.setdefault("endpoint", settings.otel_endpoint)
config.setdefault("headers", dict(settings.otel_headers))
config.setdefault("service_name", settings.otel_service_name)
return ControlEventSinkSelection(
name=settings.observability_sink_name,
config=settings.observability_sink_config,
config=config,
)


Expand Down Expand Up @@ -967,13 +1016,19 @@ def _get_active_control_event_sinks() -> tuple[ControlEventSink, ...]:

selection = _get_sink_selection()
Comment thread
namrataghadi-galileo marked this conversation as resolved.
if selection.name == DEFAULT_CONTROL_EVENT_SINK_NAME:
return (_event_sink,) if _event_sink is not None else ()
if _event_sink is None or not _sink_is_active(_event_sink):
return ()
return (_event_sink,)
if selection.name == REGISTERED_CONTROL_EVENT_SINK_NAME:
return get_registered_control_event_sinks()
sinks = get_registered_control_event_sinks()
sinks = tuple(sink for sink in sinks if _sink_is_active(sink))
_remember_custom_control_event_sinks(sinks)
return sinks

named_sink = _get_or_create_named_control_event_sink(selection)
if named_sink is None:
if named_sink is None or not _sink_is_active(named_sink):
return ()
_remember_custom_control_event_sinks((named_sink,))
return (named_sink,)


Expand All @@ -987,6 +1042,20 @@ def _shutdown_built_in_event_sink() -> None:
_event_sink = None


def _shutdown_configured_named_event_sink() -> None:
"""Stop and clear the cached configured named sink if it is active."""
global _configured_named_event_sink, _configured_named_event_sink_selection

configured_named_sink: ControlEventSink | None = None
with _configured_named_event_sink_lock:
configured_named_sink = _configured_named_event_sink
_configured_named_event_sink = None
_configured_named_event_sink_selection = None

if configured_named_sink is not None:
_shutdown_custom_control_event_sink(configured_named_sink)


def _shutdown_custom_control_event_sink(sink: ControlEventSink) -> None:
"""Flush and close a custom sink when it exposes lifecycle hooks."""
flush = getattr(sink, "flush", None)
Expand Down Expand Up @@ -1020,6 +1089,12 @@ async def _run_awaitable_during_shutdown(result: Awaitable[Any]) -> None:
await result


def _get_custom_control_event_sinks_to_shutdown() -> tuple[ControlEventSink, ...]:
"""Collect custom sink instances that should be cleaned up on shutdown."""
with _used_custom_event_sinks_lock:
return tuple(_used_custom_event_sinks)


def init_observability(
server_url: str | None = None,
api_key: str | None = None,
Expand All @@ -1046,6 +1121,8 @@ def init_observability(

settings_updates: dict[str, object] = {}
current_settings = get_settings()
if enabled is not None:
settings_updates["observability_enabled"] = enabled
if sink_name is not None:
settings_updates["observability_sink_name"] = sink_name
if (
Expand All @@ -1058,11 +1135,12 @@ def init_observability(
if settings_updates:
configure_settings(**settings_updates)

is_enabled = enabled if enabled is not None else get_settings().observability_enabled
is_enabled = get_settings().observability_enabled

if not is_enabled:
logger.debug("Observability disabled")
_shutdown_built_in_event_sink()
_shutdown_configured_named_event_sink()
return None

selection = _get_sink_selection()
Expand Down Expand Up @@ -1137,15 +1215,8 @@ def write_events(events: Sequence[ControlExecutionEvent]) -> SinkResult:

def sync_shutdown_observability() -> None:
"""Synchronously shut down observability and flush remaining events."""
global _configured_named_event_sink, _configured_named_event_sink_selection
_shutdown_built_in_event_sink()
configured_named_sink: ControlEventSink | None = None
with _configured_named_event_sink_lock:
configured_named_sink = _configured_named_event_sink
_configured_named_event_sink = None
_configured_named_event_sink_selection = None
if configured_named_sink is not None:
_shutdown_custom_control_event_sink(configured_named_sink)
_shutdown_configured_named_event_sink()


async def shutdown_observability() -> None:
Expand Down
Loading
Loading