Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions langfuse/_client/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class LangfuseOtelSpanAttributes:

# Internal
AS_ROOT = "langfuse.internal.as_root"
IS_APP_ROOT = "langfuse.internal.is_app_root"

# Experiments
EXPERIMENT_ID = "langfuse.experiment.id"
Expand Down
74 changes: 46 additions & 28 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import backoff
import httpx
from opentelemetry import context as otel_context_api
from opentelemetry import trace as otel_trace_api
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
from opentelemetry.sdk.trace.export import SpanExporter
Expand Down Expand Up @@ -66,7 +67,9 @@
)
from langfuse._client.propagation import (
PropagatedExperimentAttributes,
_detach_context_token_safely,
_propagate_attributes,
_set_langfuse_trace_id_in_baggage,
)
from langfuse._client.resource_manager import LangfuseResourceManager
from langfuse._client.span import (
Expand Down Expand Up @@ -1178,39 +1181,54 @@ def _start_as_current_otel_span_with_processed_media(
name=name,
end_on_exit=end_on_exit if end_on_exit is not None else True,
) as otel_span:
baggage_token = None

if otel_span.is_recording():
context_with_app_root_claim = _set_langfuse_trace_id_in_baggage(
trace_id=self._get_otel_trace_id(otel_span),
context=otel_context_api.get_current(),
)
baggage_token = otel_context_api.attach(context_with_app_root_claim)
Comment on lines +1186 to +1191
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Gate trace-id baggage claim on export eligibility

_start_as_current_otel_span_with_processed_media now adds langfuse_trace_id baggage for every recording Langfuse span before any export filtering runs, so downstream services can suppress IS_APP_ROOT even when the upstream parent is later dropped by should_export_span/scope filters. In that setup (custom filtering + distributed propagation), no exported span may end up marked as app root in downstream traces, which defeats the new root-selection behavior. The claim should only be propagated when the originating span is expected to export (or include a way to distinguish non-exporting parents).

Useful? React with 👍 / 👎.

Comment on lines +1184 to +1191
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The baggage claim (langfuse_trace_id) is attached in _start_as_current_otel_span_with_processed_media based solely on otel_span.is_recording(), without consulting should_export_span. In a distributed trace where a custom filter drops the parent observation in Service A, Service B still receives the baggage and computes suppressed_by_parent_claim = True in _mark_app_root_candidate — so the downstream root is denied IS_APP_ROOT while the parent never reaches the backend, leaving no span in the trace marked as app root. Gate the baggage attachment on _is_expected_exported_at_start (or equivalent) so non-exporting parents do not propagate a suppression claim.

Extended reasoning...

What the bug is

client.py:1184-1191 attaches the langfuse_trace_id baggage to the current context whenever otel_span.is_recording() returns True. But is_recording() only reflects the sampling decision — it says nothing about whether the user's should_export_span filter will keep the span. The local app-root logic in the span processor uses _is_expected_exported_at_start (which calls should_export_span) to make the export decision, but the cross-process baggage path is set without that gate. This asymmetry breaks the new V4 default-table behavior in a documented, supported scenario: custom should_export_span plus distributed (cross-service) trace propagation.

How it manifests in a distributed trace

Configure Service A with a custom should_export_span that drops a particular span (e.g. parent_obs). Then:

  1. Service Alangfuse.start_as_current_observation(name='parent_obs') creates an SDK Span. is_recording() returns True (sampling decision), even though should_export_span will later return False.
  2. Service A — At client.py:1186, the SDK calls _set_langfuse_trace_id_in_baggage and attaches baggage langfuse_trace_id=T1 to the active context, purely on the strength of is_recording().
  3. Service A — User code makes an outbound HTTP request inside the observation. OTel propagators inject the W3C traceparent (trace_id=T1) AND the baggage header.
  4. Service B — Receives the request. langfuse_tracer.start_span('downstream-root', context=...) starts a span as a remote child of the propagated context. Its trace_id is T1; parent.span_id is the remote parent's id but parent_state is None in Service B's processor (the parent lives in Service A).
  5. Service B — In span_processor._mark_app_root_candidate:
    • format_trace_id(span.context.trace_id) == 'T1'
    • propagated_trace_id == 'T1' (from baggage)
    • parent_state is None (no local record of the remote parent)
    • suppressed_by_parent_claim = (T1 == T1) and (None is None) = True
    • mark_app_root = expected_exported AND not parent_expected_exported AND not suppressed_by_parent_claim = True AND True AND False = False
  6. Service A — At span end, should_export_span(parent_obs) returns False. parent_obs is dropped and never exported.

Net effect: parent_obs is filtered out on Service A, downstream-root is suppressed on Service B by a claim from a parent that will never reach the backend, and no span in the entire distributed trace carries langfuse.internal.is_app_root. The V4 default observation table — which the PR explicitly motivates as the reason for this feature — has no row to surface for this trace.

Why existing code doesn't prevent it

The local case is handled correctly: when Service A starts parent_obs it is recorded in trace_state.spans with expected_exported_at_start=False (because _is_expected_exported_at_start evaluates the filter). So if Service A starts a local child, the processor sees a non-None parent_state whose expected_exported_at_start is False, and correctly marks the child as the app root. The test test_local_baggage_claim_does_not_suppress_child_of_filtered_parent exercises exactly this.

The cross-process baggage path has no equivalent. _set_langfuse_trace_id_in_baggage is called unconditionally for any recording span, so the baggage on the wire effectively says 'a Langfuse parent for trace T1 exists upstream — suppress yourself' even when the upstream parent is filtered out and will never appear in the backend.

Impact

  • Triggered by a documented, supported configuration: any user with a custom should_export_span that drops some observations AND any cross-service HTTP call inside such an observation will hit this.
  • Defeats the feature this PR introduces for exactly the scenario described in the PR motivation ('Default and custom span filtering can remove infrastructure or instrumentation spans before export').
  • Independently flagged by chatgpt-codex-connector at P2 in the PR timeline with the same conclusion and the same suggested fix, which corroborates the analysis.

Fix

Gate the baggage attachment at client.py:1186 on the same predicate the span processor already uses locally: _is_expected_exported_at_start(otel_span). If the span is not expected to export, do not attach the langfuse_trace_id claim — there is no app-root candidate upstream to suppress with. Alternatively, attach the baggage lazily inside the processor once export eligibility has been established (e.g. only when IS_APP_ROOT is actually set on the span). The processor already exposes _is_expected_exported_at_start and uses it for the symmetric local decision, so the fix is mechanical and self-consistent.


span_class = self._get_span_class(
as_type or "generation"
) # default was "generation"
common_args = {
"otel_span": otel_span,
"langfuse_client": self,
"environment": self._environment,
"release": self._release,
"input": input,
"output": output,
"metadata": metadata,
"version": version,
"level": level,
"status_message": status_message,
}

if span_class in [
LangfuseGeneration,
LangfuseEmbedding,
]:
common_args.update(
{
"completion_start_time": completion_start_time,
"model": model,
"model_parameters": model_parameters,
"usage_details": usage_details,
"cost_details": cost_details,
"prompt": prompt,
}
)
# For span-like types (span, agent, tool, chain, retriever, evaluator, guardrail), no generation properties needed
try:
common_args = {
"otel_span": otel_span,
"langfuse_client": self,
"environment": self._environment,
"release": self._release,
"input": input,
"output": output,
"metadata": metadata,
"version": version,
"level": level,
"status_message": status_message,
}

if span_class in [
LangfuseGeneration,
LangfuseEmbedding,
]:
common_args.update(
{
"completion_start_time": completion_start_time,
"model": model,
"model_parameters": model_parameters,
"usage_details": usage_details,
"cost_details": cost_details,
"prompt": prompt,
}
)
# For span-like types (span, agent, tool, chain, retriever, evaluator, guardrail), no generation properties needed

yield span_class(**common_args) # type: ignore[arg-type]

yield span_class(**common_args) # type: ignore[arg-type]
finally:
if baggage_token is not None:
_detach_context_token_safely(baggage_token)

def _get_current_otel_span(self) -> Optional[otel_trace_api.Span]:
current_span = otel_trace_api.get_current_span()
Expand Down
35 changes: 35 additions & 0 deletions langfuse/_client/propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ def _get_propagated_attributes_from_context(
# Handle baggage
baggage_entries = baggage.get_all(context=context)
for baggage_key, baggage_value in baggage_entries.items():
if baggage_key == LANGFUSE_TRACE_ID_BAGGAGE_KEY:
continue

if baggage_key.startswith(LANGFUSE_BAGGAGE_PREFIX):
span_key = _get_span_key_from_baggage_key(baggage_key)

Expand Down Expand Up @@ -471,12 +474,44 @@ def _get_propagated_context_key(key: str) -> str:


LANGFUSE_BAGGAGE_PREFIX = "langfuse_"
LANGFUSE_TRACE_ID_BAGGAGE_KEY = "langfuse_trace_id"


def _get_propagated_baggage_key(key: str) -> str:
return f"{LANGFUSE_BAGGAGE_PREFIX}{key}"


def _get_langfuse_trace_id_from_baggage(
context: otel_context_api.Context,
) -> Optional[str]:
value = otel_baggage_api.get_baggage(
name=LANGFUSE_TRACE_ID_BAGGAGE_KEY,
context=context,
)

if value is None:
return None

return str(value).lower()


def _set_langfuse_trace_id_in_baggage(
*,
trace_id: str,
context: otel_context_api.Context,
) -> otel_context_api.Context:
normalized_trace_id = trace_id.lower()

if _get_langfuse_trace_id_from_baggage(context) == normalized_trace_id:
return context

return otel_baggage_api.set_baggage(
name=LANGFUSE_TRACE_ID_BAGGAGE_KEY,
value=normalized_trace_id,
context=context,
)


def _get_span_key_from_baggage_key(key: str) -> Optional[str]:
if not key.startswith(LANGFUSE_BAGGAGE_PREFIX):
return None
Expand Down
Loading