Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 88 additions & 35 deletions app/src/ai/agent_conversations_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,9 @@ pub struct AgentConversationsModel {
/// and are absent from this map.
task_fetch_state: HashMap<AmbientAgentTaskId, TaskFetchState>,
rtc_task_refresh_throttle_state: RtcTaskRefreshThrottleState,
/// Earliest RTC timestamp received while no consumer view was open.
/// On next `register_view_open`, triggers a single `fetch_tasks_updated_after`.
dirty_since: Option<DateTime<Utc>>,
}

pub enum AgentConversationsModelEvent {
Expand Down Expand Up @@ -572,6 +575,7 @@ impl AgentConversationsModel {
has_finished_initial_load: true,
task_fetch_state: HashMap::new(),
rtc_task_refresh_throttle_state: RtcTaskRefreshThrottleState::default(),
dirty_since: None,
};
}

Expand Down Expand Up @@ -610,6 +614,7 @@ impl AgentConversationsModel {
has_finished_initial_load: false,
task_fetch_state: HashMap::new(),
rtc_task_refresh_throttle_state: RtcTaskRefreshThrottleState::default(),
dirty_since: None,
};

// Only sync local conversations if we're not in CLI mode. Server-side data
Expand Down Expand Up @@ -675,23 +680,50 @@ impl AgentConversationsModel {
event: &UpdateManagerEvent,
ctx: &mut ModelContext<Self>,
) {
if let UpdateManagerEvent::AmbientTaskUpdated { timestamp } = event {
match std::mem::take(&mut self.rtc_task_refresh_throttle_state) {
RtcTaskRefreshThrottleState::Idle => {
self.fetch_tasks_updated_after(*timestamp, ctx);
self.start_rtc_task_refresh_throttle_timer(ctx);
}
RtcTaskRefreshThrottleState::CoolingDown {
mut pending_timestamp,
let UpdateManagerEvent::AmbientTaskUpdated { task_id, timestamp } = event else {
return;
};

// (a) If this task has an open tab (any window), force a re-fetch.
let has_open_tab = ActiveAgentViewsModel::as_ref(ctx)
.get_terminal_view_id_for_ambient_task(*task_id)
.is_some();
if has_open_tab {
self.get_or_async_fetch_task_data_internal(task_id, true, ctx);
}

let has_list_consumers = self
.active_data_consumers_per_window
.values()
.any(|views| !views.is_empty());
if has_list_consumers {
// (b) If management view or conversation list is open, throttled list-fetch.
self.handle_rtc_for_list_views(*timestamp, ctx);
} else {
// (c) Nothing open: record earliest timestamp for flush on next view open.
record_earliest_rtc_task_refresh_timestamp(&mut self.dirty_since, *timestamp);
}
}

fn handle_rtc_for_list_views(
&mut self,
timestamp: DateTime<Utc>,
ctx: &mut ModelContext<Self>,
) {
match std::mem::take(&mut self.rtc_task_refresh_throttle_state) {
RtcTaskRefreshThrottleState::Idle => {
self.fetch_tasks_updated_after(timestamp, ctx);
self.start_rtc_task_refresh_throttle_timer(ctx);
}
RtcTaskRefreshThrottleState::CoolingDown {
mut pending_timestamp,
timer_abort_handle,
} => {
record_earliest_rtc_task_refresh_timestamp(&mut pending_timestamp, timestamp);
self.rtc_task_refresh_throttle_state = RtcTaskRefreshThrottleState::CoolingDown {
pending_timestamp,
timer_abort_handle,
} => {
record_earliest_rtc_task_refresh_timestamp(&mut pending_timestamp, *timestamp);
self.rtc_task_refresh_throttle_state =
RtcTaskRefreshThrottleState::CoolingDown {
pending_timestamp,
timer_abort_handle,
};
}
};
}
}
}
Expand Down Expand Up @@ -942,6 +974,11 @@ impl AgentConversationsModel {
.or_default()
.insert(view_id);
self.update_polling_state(ctx);

// Flush dirty tasks accumulated while no view was open.
if let Some(dirty_since) = self.dirty_since.take() {
self.fetch_tasks_updated_after(dirty_since, ctx);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ [IMPORTANT] This catch-up can miss updates: fetch_tasks_updated_after requests only INITIAL_TASK_AMOUNT tasks, so if more than 100 tasks changed while no view was open, reopening a list view drops the rest while RTC polling remains disabled. Track dirty task IDs or page until all updates are loaded before clearing dirty_since.

}
}

/// Called when a view that consumes this model's data becomes hidden.
Expand Down Expand Up @@ -1521,29 +1558,44 @@ impl AgentConversationsModel {
task_id: &AmbientAgentTaskId,
ctx: &mut ModelContext<Self>,
) -> Option<AmbientAgentTask> {
// If we already have it, return it
if let Some(task) = self.tasks.get(task_id) {
return Some(task.clone());
self.get_or_async_fetch_task_data_internal(task_id, false, ctx)
}

// If `force_refresh` is true, this will invalidate the cache entry for the stored
// task's data and refetch the data from the server. We use this for handling RTC
// invalidations.
fn get_or_async_fetch_task_data_internal(
&mut self,
task_id: &AmbientAgentTaskId,
force_refresh: bool,
ctx: &mut ModelContext<Self>,
) -> Option<AmbientAgentTask> {
if !force_refresh {
if let Some(task) = self.tasks.get(task_id) {
return Some(task.clone());
}
}

// Consult the per-task fetch state. The three variants are mutually exclusive: at most
// one applies to a given id.
match self.task_fetch_state.get(task_id) {
Some(TaskFetchState::InFlight) => return None,
Some(TaskFetchState::PermanentlyFailed { at, .. }) => {
if at.elapsed() < PERMANENT_FETCH_FAILURE_COOLDOWN {
return None;
// Consult the per-task fetch state unless force-refreshing.
if !force_refresh {
match self.task_fetch_state.get(task_id) {
Some(TaskFetchState::InFlight) => return None,
Some(TaskFetchState::PermanentlyFailed { at, .. }) => {
if at.elapsed() < PERMANENT_FETCH_FAILURE_COOLDOWN {
return None;
}
self.task_fetch_state.remove(task_id);
}
// Cooldown has elapsed; clear the entry and fall through to fetch again.
self.task_fetch_state.remove(task_id);
}
Some(TaskFetchState::TransientlyFailed { at, .. }) => {
if at.elapsed() < TRANSIENT_FETCH_FAILURE_COOLDOWN {
return None;
Some(TaskFetchState::TransientlyFailed { at, .. }) => {
if at.elapsed() < TRANSIENT_FETCH_FAILURE_COOLDOWN {
return None;
}
self.task_fetch_state.remove(task_id);
}
self.task_fetch_state.remove(task_id);
None => {}
}
None => {}
} else {
self.task_fetch_state.remove(task_id);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ [IMPORTANT] Keep the InFlight guard when force-refreshing. RTC events for an open tab can arrive while the previous GET /agent/runs/{id} is still running; removing the fetch state here spawns overlapping fetches and an older response can overwrite newer task data. Only bypass cached task/failure cooldowns, not TaskFetchState::InFlight.

}

// Opportunistically purge other expired entries so the map doesn't grow unbounded.
Expand Down Expand Up @@ -1600,7 +1652,8 @@ impl AgentConversationsModel {
},
);

None
// Return the stale cached copy if available (force_refresh keeps it in the map).
self.tasks.get(task_id).cloned()
}

/// Returns all (name, uid) pairs for creators of tasks in the model.
Expand Down
1 change: 1 addition & 0 deletions app/src/ai/agent_conversations_model_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ fn create_test_model() -> AgentConversationsModel {
has_finished_initial_load: false,
task_fetch_state: Default::default(),
rtc_task_refresh_throttle_state: RtcTaskRefreshThrottleState::default(),
dirty_since: None,
}
}

Expand Down
26 changes: 20 additions & 6 deletions app/src/server/cloud_objects/update_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
ai::{
agent::conversation::AIConversationId,
ambient_agents::scheduled::{CloudScheduledAmbientAgentModel, ScheduledAmbientAgent},
ambient_agents::AmbientAgentTaskId,
blocklist::BlocklistAIHistoryModel,
cloud_environments::{AmbientAgentEnvironment, CloudAmbientAgentEnvironmentModel},
execution_profiles::{
Expand Down Expand Up @@ -135,10 +136,19 @@ pub struct ObjectOperationResult {

#[derive(Debug)]
pub enum UpdateManagerEvent {
ObjectOperationComplete { result: ObjectOperationResult },
CloudPreferencesUpdated { updated: Vec<Preference> },
MCPGalleryUpdated { templates: Vec<MCPGalleryTemplate> },
AmbientTaskUpdated { timestamp: DateTime<Utc> },
ObjectOperationComplete {
result: ObjectOperationResult,
},
CloudPreferencesUpdated {
updated: Vec<Preference>,
},
MCPGalleryUpdated {
templates: Vec<MCPGalleryTemplate>,
},
AmbientTaskUpdated {
task_id: AmbientAgentTaskId,
timestamp: DateTime<Utc>,
},
}

/// An enum for choosing the behavior of the fetch_single_cloud_object function.
Expand Down Expand Up @@ -1118,11 +1128,15 @@ impl UpdateManager {

fn handle_ambient_task_changed(
&mut self,
_task_id: String,
task_id: String,
timestamp: DateTime<Utc>,
ctx: &mut ModelContext<UpdateManager>,
) {
ctx.emit(UpdateManagerEvent::AmbientTaskUpdated { timestamp });
let Ok(task_id) = task_id.parse::<AmbientAgentTaskId>() else {
log::warn!("Ignoring AmbientTaskUpdated with unparseable task_id: {task_id}");
return;
};
ctx.emit(UpdateManagerEvent::AmbientTaskUpdated { task_id, timestamp });
}

/// Fetches environment "last used" timestamps from the server and merges them
Expand Down
Loading