diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..d2ecf71 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,9 @@ +{ + "permissions": { + "allow": [ + "mcp__acp__Write", + "mcp__acp__Edit", + "mcp__acp__Bash" + ] + } +} diff --git a/src/alerts.rs b/src/alerts.rs new file mode 100644 index 0000000..432397a --- /dev/null +++ b/src/alerts.rs @@ -0,0 +1,114 @@ +use std::collections::VecDeque; +use std::sync::Mutex; + +use tokio::sync::mpsc; +use tracing::field::{Field, Visit}; +use tracing_subscriber::Layer; + +use crate::protocol::{AlertInfo, LogEntry}; + +const CONTEXT_BUFFER_SIZE: usize = 200; + +/// A [`tracing_subscriber::Layer`] that captures ERROR-level log events from +/// the `iroh` crate and forwards them to the n0des cloud via the client actor. +/// +/// All log events (any level, any target) are recorded into a 200-entry ring +/// buffer. When an ERROR from the `iroh` crate fires, the buffered context is +/// drained and sent alongside the alert. +/// +/// Returned by [`Client::enable_alerts`]. The caller must install this layer +/// into their tracing subscriber stack for alerts to fire. +/// +/// [`Client::enable_alerts`]: crate::Client::enable_alerts +#[derive(Debug)] +pub struct LogMonitor { + tx: mpsc::Sender, + context_buffer: Mutex>, +} + +impl LogMonitor { + pub(crate) fn new(tx: mpsc::Sender) -> Self { + Self { + tx, + context_buffer: Mutex::new(VecDeque::with_capacity(CONTEXT_BUFFER_SIZE)), + } + } +} + +impl Layer for LogMonitor { + fn on_event( + &self, + event: &tracing::Event<'_>, + _ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + let meta = event.metadata(); + let level = *meta.level(); + + let mut visitor = MessageVisitor::default(); + event.record(&mut visitor); + let message = visitor.message.unwrap_or_default(); + + let timestamp_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + + // Record every event into the ring buffer for context. + let entry = LogEntry { + level: level.to_string(), + target: meta.target().to_string(), + message: message.clone(), + timestamp_ms, + }; + + let mut buf = self + .context_buffer + .lock() + .unwrap_or_else(|e| e.into_inner()); + if buf.len() >= CONTEXT_BUFFER_SIZE { + buf.pop_front(); + } + buf.push_back(entry); + + // Only fire an alert for ERROR-level events from iroh targets. + if level != tracing::Level::ERROR || !meta.target().starts_with("iroh") { + return; + } + + let context: Vec = buf.drain(..).collect(); + + let alert = AlertInfo { + target: meta.target().to_string(), + message, + file: meta.file().map(String::from), + line: meta.line(), + timestamp_ms, + iroh_version: crate::IROH_VERSION.to_string(), + iroh_n0des_version: crate::IROH_N0DES_VERSION.to_string(), + context, + }; + + // Non-blocking send. If the channel is full the alert is dropped — + // alerting is best-effort and must never block the caller's thread. + let _ = self.tx.try_send(alert); + } +} + +#[derive(Default)] +struct MessageVisitor { + message: Option, +} + +impl Visit for MessageVisitor { + fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { + if field.name() == "message" { + self.message = Some(format!("{:?}", value)); + } + } + + fn record_str(&mut self, field: &Field, value: &str) { + if field.name() == "message" { + self.message = Some(value.to_string()); + } + } +} diff --git a/src/caps.rs b/src/caps.rs index 198a867..89e6951 100644 --- a/src/caps.rs +++ b/src/caps.rs @@ -81,6 +81,8 @@ pub enum Cap { Metrics(MetricsCap), #[strum(to_string = "net-diagnostics:{0}")] NetDiagnostics(NetDiagnosticsCap), + #[strum(to_string = "alerts:{0}")] + Alerts(AlertsCap), } impl FromStr for Cap { @@ -94,6 +96,7 @@ impl FromStr for Cap { "metrics" => Self::Metrics(MetricsCap::from_str(inner)?), "relay" => Self::Relay(RelayCap::from_str(inner)?), "net-diagnostics" => Self::NetDiagnostics(NetDiagnosticsCap::from_str(inner)?), + "alerts" => Self::Alerts(AlertsCap::from_str(inner)?), _ => bail!("invalid cap domain"), }) } else { @@ -121,6 +124,12 @@ cap_enum!( } ); +cap_enum!( + pub enum AlertsCap { + PutAny, + } +); + impl Caps { pub fn new(caps: impl IntoIterator>) -> Self { Self::V0(CapSet::new(caps)) @@ -179,6 +188,7 @@ impl Capability for Cap { (Cap::Relay(slf), Cap::Relay(other)) => slf.permits(other), (Cap::Metrics(slf), Cap::Metrics(other)) => slf.permits(other), (Cap::NetDiagnostics(slf), Cap::NetDiagnostics(other)) => slf.permits(other), + (Cap::Alerts(slf), Cap::Alerts(other)) => slf.permits(other), (_, _) => false, } } @@ -192,6 +202,7 @@ fn client_capabilities(other: &Cap) -> bool { Cap::Metrics(MetricsCap::PutAny) => true, Cap::NetDiagnostics(NetDiagnosticsCap::PutAny) => true, Cap::NetDiagnostics(NetDiagnosticsCap::GetAny) => true, + Cap::Alerts(AlertsCap::PutAny) => true, } } @@ -221,6 +232,14 @@ impl Capability for NetDiagnosticsCap { } } +impl Capability for AlertsCap { + fn permits(&self, other: &Self) -> bool { + match (self, other) { + (AlertsCap::PutAny, AlertsCap::PutAny) => true, + } + } +} + /// A set of capabilities #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)] pub struct CapSet(BTreeSet); diff --git a/src/client.rs b/src/client.rs index b1fcc41..3e1a7db 100644 --- a/src/client.rs +++ b/src/client.rs @@ -19,9 +19,12 @@ use crate::net_diagnostics::{DiagnosticsReport, checks::run_diagnostics}; #[cfg(feature = "net_diagnostics")] use crate::protocol::PutNetworkDiagnostics; use crate::{ + alerts::LogMonitor, api_secret::ApiSecret, caps::Caps, - protocol::{ALPN, Auth, IrohServicesClient, Ping, Pong, PutMetrics, RemoteError}, + protocol::{ + ALPN, AlertInfo, Auth, IrohServicesClient, Ping, Pong, PutMetrics, RemoteError, SendAlert, + }, }; /// Client is the main handle for interacting with iroh-services. It communicates with @@ -284,6 +287,36 @@ impl Client { .map_err(Error::Remote) } + /// Enable alert forwarding. Returns a [`LogMonitor`] tracing layer that + /// captures ERROR-level log events from the `iroh` crate and forwards + /// them to n0des. The caller must install the returned layer into their + /// tracing subscriber stack. + /// + /// ```no_run + /// use tracing_subscriber::prelude::*; + /// + /// # async fn example(client: &iroh_n0des::Client) -> anyhow::Result<()> { + /// let alert_layer = client.enable_alerts().await?; + /// tracing_subscriber::registry() + /// .with(alert_layer) + /// .with(tracing_subscriber::fmt::layer()) + /// .init(); + /// # Ok(()) + /// # } + /// ``` + pub async fn enable_alerts(&self) -> Result { + let (tx, rx) = tokio::sync::mpsc::channel(64); + let (done_tx, done_rx) = oneshot::channel(); + self.message_channel + .send(ClientActorMessage::EnableAlerts { rx, done: done_tx }) + .await + .map_err(|_| Error::Other(anyhow!("enabling alerts")))?; + done_rx + .await + .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?; + Ok(LogMonitor::new(tx)) + } + /// Grant capabilities to a remote endpoint. Creates a signed RCAN token /// and sends it to iroh-services for storage. The remote can then use this token /// when dialing back to authorize its requests. @@ -344,6 +377,10 @@ enum ClientActorMessage { Ping { done: oneshot::Sender>, }, + EnableAlerts { + rx: tokio::sync::mpsc::Receiver, + done: oneshot::Sender<()>, + }, // GrantCap is used by the `client_host` feature flag #[allow(dead_code)] GrantCap { @@ -375,6 +412,7 @@ impl ClientActor { let registry = Arc::new(RwLock::new(registry)); let mut encoder = Encoder::new(registry); let mut metrics_timer = interval.map(|interval| n0_future::time::interval(interval)); + let mut alert_rx: Option> = None; trace!("starting client actor"); loop { trace!("client actor tick"); @@ -389,6 +427,10 @@ impl ClientActor { self.authorized = false; } }, + ClientActorMessage::EnableAlerts{ rx, done } => { + alert_rx = Some(rx); + let _ = done.send(()); + } ClientActorMessage::SendMetrics{ done } => { trace!("sending metrics manually triggered"); let res = self.send_metrics(&mut encoder).await; @@ -425,6 +467,18 @@ impl ClientActor { self.authorized = false; } }, + Some(alert) = async { + if let Some(ref mut rx) = alert_rx { + rx.recv().await + } else { + std::future::pending::>().await + } + } => { + if let Err(err) = self.send_alert(alert).await { + debug!("failed to send alert: {:#?}", err); + self.authorized = false; + } + }, } } } @@ -477,6 +531,23 @@ impl ClientActor { Ok(()) } + async fn send_alert(&mut self, alert: AlertInfo) -> Result<(), RemoteError> { + trace!("client actor send alert"); + self.auth().await?; + + let req = SendAlert { + session_id: self.session_id, + alert, + }; + + self.client + .rpc(req) + .await + .map_err(|_| RemoteError::InternalServerError)??; + + Ok(()) + } + async fn grant_cap(&mut self, cap: Rcan) -> Result<(), Error> { trace!("client actor grant capability"); self.auth().await?; diff --git a/src/lib.rs b/src/lib.rs index 3c368bd..27867f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,6 +28,7 @@ //! [iroh-services]: https://services.iroh.computer //! [iroh]: https://iroh.computer +pub mod alerts; mod client; #[cfg(feature = "client_host")] mod client_host; @@ -61,6 +62,7 @@ pub use iroh_metrics::Registry; #[cfg(feature = "net_diagnostics")] pub use self::net_diagnostics::{DiagnosticsReport, checks::run_diagnostics}; pub use self::{ + alerts::LogMonitor, api_secret::ApiSecret, client::{API_SECRET_ENV_VAR_NAME, Client, ClientBuilder}, protocol::ALPN, diff --git a/src/protocol.rs b/src/protocol.rs index fc62d11..032e4db 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -27,6 +27,9 @@ pub enum IrohServicesProtocol { #[rpc(tx=oneshot::Sender>)] GrantCap(GrantCap), + + #[rpc(tx=oneshot::Sender>)] + SendAlert(SendAlert), } /// Dedicated protocol for cloud-to-endpoint net diagnostics connections. @@ -95,3 +98,37 @@ pub struct RunNetworkDiagnostics; pub struct GrantCap { pub cap: Rcan, } + +/// Information about a captured error-level log event. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AlertInfo { + pub target: String, + pub message: String, + pub file: Option, + pub line: Option, + pub timestamp_ms: u64, + #[serde(default)] + pub iroh_version: String, + #[serde(default)] + pub iroh_n0des_version: String, + /// Up to 200 recent log messages captured before this error, providing + /// context for what led to the alert. + #[serde(default)] + pub context: Vec, +} + +/// A single log entry captured in the context ring buffer. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LogEntry { + pub level: String, + pub target: String, + pub message: String, + pub timestamp_ms: u64, +} + +/// Send an alert to n0des when an error-level log event is captured. +#[derive(Debug, Serialize, Deserialize)] +pub struct SendAlert { + pub session_id: Uuid, + pub alert: AlertInfo, +}