Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 48 additions & 9 deletions agents/openhands_sdk/condensation_sft.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from openhands.sdk.context.condenser import LLMSummarizingCondenser
from openhands.sdk.context.condenser.utils import get_total_token_count
from openhands.sdk.context.view import View
from openhands.sdk.event import LLMConvertibleEvent as SDKEvent
from openhands.sdk.event import MessageEvent, SystemPromptEvent
from openhands.sdk.event.condenser import Condensation
from openhands.sdk.llm.llm_response import LLMResponse
from openhands.sdk.tool import ToolDefinition
Expand Down Expand Up @@ -98,6 +100,21 @@ def format_messages(llm: LLM, messages: list[Message]) -> list[dict[str, Any]]:
return normalize_message_content(llm.format_messages_for_llm(messages))


class TrackingSDKEventBuilder(SDKEventBuilder):
def __init__(
self,
conversation: Conversation,
metadata: Any,
event_history: list[SDKEvent],
) -> None:
super().__init__(conversation, metadata)
self.event_history = event_history

def append(self, event: SDKEvent) -> None:
self.event_history.append(event)
super().append(event)


def token_count(view: View, llm: LLM) -> int:
return get_total_token_count(view.events, llm)

Expand Down Expand Up @@ -168,7 +185,7 @@ def make_trajectory_record_from_conversation(

def condensation_prompt_record_if_needed(
*,
conversation: Conversation,
events: list[SDKEvent],
condenser: LLMSummarizingCondenser,
agent_llm: LLM,
condenser_llm: PromptCapturingLLM,
Expand All @@ -177,7 +194,7 @@ def condensation_prompt_record_if_needed(
max_tokens: int,
condensation_index: int,
) -> tuple[Condensation, dict[str, Any]] | None:
view = View.from_events(conversation.state.events)
view = View.from_events(events)
prompt_token_count = token_count(view, condenser.llm)
before_prompt_count = len(condenser_llm.captured_messages)
condensation_result = condenser.condense(view, agent_llm=agent_llm)
Expand Down Expand Up @@ -213,7 +230,28 @@ def append_standardized_events_with_condensation(
include_trajectories: bool,
) -> list[dict[str, Any]]:
metadata = load_dataset_metadata(dataset_name, required=True)
builder = SDKEventBuilder(conversation, metadata)
event_history: list[SDKEvent] = [
SystemPromptEvent(
system_prompt=TextContent(text=conversation.agent.static_system_message),
tools=list(conversation.agent.tools_map.values()),
)
]
builder = TrackingSDKEventBuilder(conversation, metadata, event_history)
first_event = trajectory.content[0]
if not isinstance(first_event, TextObservation) or first_event.source != "user":
raise ValueError(
"OpenHands SDK condensation conversion expects the first event to be a "
"user TextObservation"
)
builder.append(
MessageEvent(
source="user",
llm_message=Message(
role="user",
content=[TextContent(text=first_event.content)],
),
)
)
condenser_llm = PromptCapturingLLM(
usage_id="openhands-sdk-condensation-sft-condenser",
model=model,
Expand All @@ -231,18 +269,18 @@ def append_standardized_events_with_condensation(
condensation_index = 1
index = start_index
batch_number = 0
last_safe_events = list(conversation.state.events)
last_safe_events = list(event_history)

def update_last_safe_events() -> None:
nonlocal last_safe_events
view = View.from_events(conversation.state.events)
view = View.from_events(event_history)
if token_count(view, conversation.agent.llm) <= max_tokens:
last_safe_events = list(conversation.state.events)
last_safe_events = list(event_history)

def emit_condensation_boundary_if_needed() -> None:
nonlocal segment_index, condensation_index, last_safe_events
result = condensation_prompt_record_if_needed(
conversation=conversation,
events=event_history,
condenser=condenser,
agent_llm=conversation.agent.llm,
condenser_llm=condenser_llm,
Expand All @@ -266,8 +304,9 @@ def emit_condensation_boundary_if_needed() -> None:
)
segment_index += 1
records.append(prompt_record)
event_history.append(condensation)
conversation.state.events.append(condensation)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: conversation.state.events is no longer read anywhere in this function after the refactor — event_history is the canonical source for all condensation logic. Yet TrackingSDKEventBuilder.append still calls super().append() (line 115), which writes every event to conversation.state.events; and here the condensation is also written there explicitly to keep the two lists in sync.

This dual-write pattern obscures what the true source of truth is. If conversation.state.events is being kept in sync intentionally (e.g. as a guard against unknown SDK side effects that read it internally), add a brief comment saying so. If it is not needed, remove the super().append() call in TrackingSDKEventBuilder and this explicit append, which would make the migration complete and the code self-documenting.

last_safe_events = list(conversation.state.events)
last_safe_events = list(event_history)
condensation_index += 1

while index < len(trajectory.content):
Expand Down Expand Up @@ -342,7 +381,7 @@ def process_row(
with tempfile.TemporaryDirectory(prefix="openhands-sdk-condensation-sft-") as tmpdir:
conversation = Conversation(agent=agent, workspace=tmpdir, visualizer=None)
try:
conversation.send_message(first_event.content)
conversation._ensure_agent_ready()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Important: _ensure_agent_ready() is a private (underscore-prefixed) SDK method. Coupling to internal implementation details is fragile — the SDK can rename or remove it without a major version bump and this will break silently. If there is no public API for "initialize without sending a message", that gap should be raised with the SDK team. At minimum, add a comment explaining why this private method is called here and what it guards against.

return append_standardized_events_with_condensation(
conversation=conversation,
trajectory=trajectory,
Expand Down
Loading