Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .claude/settings.local.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"permissions": {
"allow": [
"mcp__acp__Write",
"mcp__acp__Edit",
"mcp__acp__Bash"
]
}
}
114 changes: 114 additions & 0 deletions src/alerts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::collections::VecDeque;
use std::sync::Mutex;

use tokio::sync::mpsc;
use tracing::field::{Field, Visit};
use tracing_subscriber::Layer;

use crate::protocol::{AlertInfo, LogEntry};

const CONTEXT_BUFFER_SIZE: usize = 200;

/// A [`tracing_subscriber::Layer`] that captures ERROR-level log events from
/// the `iroh` crate and forwards them to the n0des cloud via the client actor.
///
/// All log events (any level, any target) are recorded into a 200-entry ring
/// buffer. When an ERROR from the `iroh` crate fires, the buffered context is
/// drained and sent alongside the alert.
///
/// Returned by [`Client::enable_alerts`]. The caller must install this layer
/// into their tracing subscriber stack for alerts to fire.
///
/// [`Client::enable_alerts`]: crate::Client::enable_alerts
#[derive(Debug)]
pub struct LogMonitor {
tx: mpsc::Sender<AlertInfo>,
context_buffer: Mutex<VecDeque<LogEntry>>,
}

impl LogMonitor {
pub(crate) fn new(tx: mpsc::Sender<AlertInfo>) -> Self {
Self {
tx,
context_buffer: Mutex::new(VecDeque::with_capacity(CONTEXT_BUFFER_SIZE)),
}
}
}

impl<S: tracing::Subscriber> Layer<S> for LogMonitor {
fn on_event(
&self,
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let meta = event.metadata();
let level = *meta.level();

let mut visitor = MessageVisitor::default();
event.record(&mut visitor);
let message = visitor.message.unwrap_or_default();

let timestamp_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;

// Record every event into the ring buffer for context.
let entry = LogEntry {
level: level.to_string(),
target: meta.target().to_string(),
message: message.clone(),
timestamp_ms,
};

let mut buf = self
.context_buffer
.lock()
.unwrap_or_else(|e| e.into_inner());
if buf.len() >= CONTEXT_BUFFER_SIZE {
buf.pop_front();
}
buf.push_back(entry);

// Only fire an alert for ERROR-level events from iroh targets.
if level != tracing::Level::ERROR || !meta.target().starts_with("iroh") {
return;
}

let context: Vec<LogEntry> = buf.drain(..).collect();

let alert = AlertInfo {
target: meta.target().to_string(),
message,
file: meta.file().map(String::from),
line: meta.line(),
timestamp_ms,
iroh_version: crate::IROH_VERSION.to_string(),
iroh_n0des_version: crate::IROH_N0DES_VERSION.to_string(),
context,
};

// Non-blocking send. If the channel is full the alert is dropped —
// alerting is best-effort and must never block the caller's thread.
let _ = self.tx.try_send(alert);
}
}

#[derive(Default)]
struct MessageVisitor {
message: Option<String>,
}

impl Visit for MessageVisitor {
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
if field.name() == "message" {
self.message = Some(format!("{:?}", value));
}
}

fn record_str(&mut self, field: &Field, value: &str) {
if field.name() == "message" {
self.message = Some(value.to_string());
}
}
}
19 changes: 19 additions & 0 deletions src/caps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub enum Cap {
Metrics(MetricsCap),
#[strum(to_string = "net-diagnostics:{0}")]
NetDiagnostics(NetDiagnosticsCap),
#[strum(to_string = "alerts:{0}")]
Alerts(AlertsCap),
}

impl FromStr for Cap {
Expand All @@ -94,6 +96,7 @@ impl FromStr for Cap {
"metrics" => Self::Metrics(MetricsCap::from_str(inner)?),
"relay" => Self::Relay(RelayCap::from_str(inner)?),
"net-diagnostics" => Self::NetDiagnostics(NetDiagnosticsCap::from_str(inner)?),
"alerts" => Self::Alerts(AlertsCap::from_str(inner)?),
_ => bail!("invalid cap domain"),
})
} else {
Expand Down Expand Up @@ -121,6 +124,12 @@ cap_enum!(
}
);

cap_enum!(
pub enum AlertsCap {
PutAny,
}
);

impl Caps {
pub fn new(caps: impl IntoIterator<Item = impl Into<Cap>>) -> Self {
Self::V0(CapSet::new(caps))
Expand Down Expand Up @@ -179,6 +188,7 @@ impl Capability for Cap {
(Cap::Relay(slf), Cap::Relay(other)) => slf.permits(other),
(Cap::Metrics(slf), Cap::Metrics(other)) => slf.permits(other),
(Cap::NetDiagnostics(slf), Cap::NetDiagnostics(other)) => slf.permits(other),
(Cap::Alerts(slf), Cap::Alerts(other)) => slf.permits(other),
(_, _) => false,
}
}
Expand All @@ -192,6 +202,7 @@ fn client_capabilities(other: &Cap) -> bool {
Cap::Metrics(MetricsCap::PutAny) => true,
Cap::NetDiagnostics(NetDiagnosticsCap::PutAny) => true,
Cap::NetDiagnostics(NetDiagnosticsCap::GetAny) => true,
Cap::Alerts(AlertsCap::PutAny) => true,
}
}

Expand Down Expand Up @@ -221,6 +232,14 @@ impl Capability for NetDiagnosticsCap {
}
}

impl Capability for AlertsCap {
fn permits(&self, other: &Self) -> bool {
match (self, other) {
(AlertsCap::PutAny, AlertsCap::PutAny) => true,
}
}
}

/// A set of capabilities
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)]
pub struct CapSet<C: Capability + Ord>(BTreeSet<C>);
Expand Down
73 changes: 72 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ use crate::net_diagnostics::{DiagnosticsReport, checks::run_diagnostics};
#[cfg(feature = "net_diagnostics")]
use crate::protocol::PutNetworkDiagnostics;
use crate::{
alerts::LogMonitor,
api_secret::ApiSecret,
caps::Caps,
protocol::{ALPN, Auth, IrohServicesClient, Ping, Pong, PutMetrics, RemoteError},
protocol::{
ALPN, AlertInfo, Auth, IrohServicesClient, Ping, Pong, PutMetrics, RemoteError, SendAlert,
},
};

/// Client is the main handle for interacting with iroh-services. It communicates with
Expand Down Expand Up @@ -284,6 +287,36 @@ impl Client {
.map_err(Error::Remote)
}

/// Enable alert forwarding. Returns a [`LogMonitor`] tracing layer that
/// captures ERROR-level log events from the `iroh` crate and forwards
/// them to n0des. The caller must install the returned layer into their
/// tracing subscriber stack.
///
/// ```no_run
/// use tracing_subscriber::prelude::*;
///
/// # async fn example(client: &iroh_n0des::Client) -> anyhow::Result<()> {
/// let alert_layer = client.enable_alerts().await?;
/// tracing_subscriber::registry()
/// .with(alert_layer)
/// .with(tracing_subscriber::fmt::layer())
/// .init();
/// # Ok(())
/// # }
/// ```
pub async fn enable_alerts(&self) -> Result<LogMonitor, Error> {
let (tx, rx) = tokio::sync::mpsc::channel(64);
let (done_tx, done_rx) = oneshot::channel();
self.message_channel
.send(ClientActorMessage::EnableAlerts { rx, done: done_tx })
.await
.map_err(|_| Error::Other(anyhow!("enabling alerts")))?;
done_rx
.await
.map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?;
Ok(LogMonitor::new(tx))
}

/// Grant capabilities to a remote endpoint. Creates a signed RCAN token
/// and sends it to iroh-services for storage. The remote can then use this token
/// when dialing back to authorize its requests.
Expand Down Expand Up @@ -344,6 +377,10 @@ enum ClientActorMessage {
Ping {
done: oneshot::Sender<Result<Pong, RemoteError>>,
},
EnableAlerts {
rx: tokio::sync::mpsc::Receiver<AlertInfo>,
done: oneshot::Sender<()>,
},
// GrantCap is used by the `client_host` feature flag
#[allow(dead_code)]
GrantCap {
Expand Down Expand Up @@ -375,6 +412,7 @@ impl ClientActor {
let registry = Arc::new(RwLock::new(registry));
let mut encoder = Encoder::new(registry);
let mut metrics_timer = interval.map(|interval| n0_future::time::interval(interval));
let mut alert_rx: Option<tokio::sync::mpsc::Receiver<AlertInfo>> = None;
trace!("starting client actor");
loop {
trace!("client actor tick");
Expand All @@ -389,6 +427,10 @@ impl ClientActor {
self.authorized = false;
}
},
ClientActorMessage::EnableAlerts{ rx, done } => {
alert_rx = Some(rx);
let _ = done.send(());
}
ClientActorMessage::SendMetrics{ done } => {
trace!("sending metrics manually triggered");
let res = self.send_metrics(&mut encoder).await;
Expand Down Expand Up @@ -425,6 +467,18 @@ impl ClientActor {
self.authorized = false;
}
},
Some(alert) = async {
if let Some(ref mut rx) = alert_rx {
rx.recv().await
} else {
std::future::pending::<Option<AlertInfo>>().await
}
} => {
if let Err(err) = self.send_alert(alert).await {
debug!("failed to send alert: {:#?}", err);
self.authorized = false;
}
},
}
}
}
Expand Down Expand Up @@ -477,6 +531,23 @@ impl ClientActor {
Ok(())
}

async fn send_alert(&mut self, alert: AlertInfo) -> Result<(), RemoteError> {
trace!("client actor send alert");
self.auth().await?;

let req = SendAlert {
session_id: self.session_id,
alert,
};

self.client
.rpc(req)
.await
.map_err(|_| RemoteError::InternalServerError)??;

Ok(())
}

async fn grant_cap(&mut self, cap: Rcan<Caps>) -> Result<(), Error> {
trace!("client actor grant capability");
self.auth().await?;
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
//! [iroh-services]: https://services.iroh.computer
//! [iroh]: https://iroh.computer

pub mod alerts;
mod client;
#[cfg(feature = "client_host")]
mod client_host;
Expand Down Expand Up @@ -61,6 +62,7 @@ pub use iroh_metrics::Registry;
#[cfg(feature = "net_diagnostics")]
pub use self::net_diagnostics::{DiagnosticsReport, checks::run_diagnostics};
pub use self::{
alerts::LogMonitor,
api_secret::ApiSecret,
client::{API_SECRET_ENV_VAR_NAME, Client, ClientBuilder},
protocol::ALPN,
Expand Down
37 changes: 37 additions & 0 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub enum IrohServicesProtocol {

#[rpc(tx=oneshot::Sender<RemoteResult<()>>)]
GrantCap(GrantCap),

#[rpc(tx=oneshot::Sender<RemoteResult<()>>)]
SendAlert(SendAlert),
}

/// Dedicated protocol for cloud-to-endpoint net diagnostics connections.
Expand Down Expand Up @@ -95,3 +98,37 @@ pub struct RunNetworkDiagnostics;
pub struct GrantCap {
pub cap: Rcan<Caps>,
}

/// Information about a captured error-level log event.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertInfo {
pub target: String,
pub message: String,
pub file: Option<String>,
pub line: Option<u32>,
pub timestamp_ms: u64,
#[serde(default)]
pub iroh_version: String,
#[serde(default)]
pub iroh_n0des_version: String,
/// Up to 200 recent log messages captured before this error, providing
/// context for what led to the alert.
#[serde(default)]
pub context: Vec<LogEntry>,
}

/// A single log entry captured in the context ring buffer.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry {
pub level: String,
pub target: String,
pub message: String,
pub timestamp_ms: u64,
}

/// Send an alert to n0des when an error-level log event is captured.
#[derive(Debug, Serialize, Deserialize)]
pub struct SendAlert {
pub session_id: Uuid,
pub alert: AlertInfo,
}
Loading