diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 00000000..7269b968 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,8 @@ +{ + "permissions": { + "allow": [ + "mcp__acp__Edit", + "mcp__acp__Bash" + ] + } +} diff --git a/Cargo.lock b/Cargo.lock index 35345367..3d8e62f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1756,7 +1756,7 @@ dependencies = [ [[package]] name = "iroh-services" -version = "0.12.0" +version = "0.11.0" dependencies = [ "anyhow", "built", @@ -2821,8 +2821,6 @@ dependencies = [ [[package]] name = "rcan" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "725eb86d019799495be1164a962cbaf8f8cd1553e0f1e602d8fd6671fc619498" dependencies = [ "anyhow", "blake3", diff --git a/Cargo.toml b/Cargo.toml index e928c4e6..81d2ebc3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iroh-services" -version = "0.12.0" +version = "0.11.0" edition = "2024" readme = "README.md" description = "p2p quic connections dialed by public key" @@ -21,7 +21,8 @@ iroh-metrics = { version = "0.38", default-features = false } iroh-tickets = "0.4" n0-error = "0.1" n0-future = "0.3" -rcan = "0.3.0" +# rcan = "0.3.0" +rcan = { path = "../rcan" } serde = { version = "1.0.217", features = ["derive"] } strum = { version = "0.27.1", features = ["derive"] } thiserror = "2.0.12" diff --git a/examples/net_diagnostics.rs b/examples/net_diagnostics.rs index 4b6b52ed..f7a6d1b1 100644 --- a/examples/net_diagnostics.rs +++ b/examples/net_diagnostics.rs @@ -18,6 +18,8 @@ use iroh_services::{ #[tokio::main] async fn main() -> Result<()> { + tracing_subscriber::fmt::init(); + // 1. Create an endpoint that will both dial iroh-services and accept incoming // requests from the iroh-services service via a ClientHost. let endpoint = Endpoint::bind(presets::N0).await?; @@ -26,9 +28,15 @@ async fn main() -> Result<()> { // EndpointID. Normally we'd pass it straight to the client builder. let secret = ApiSecret::from_env_var(API_SECRET_ENV_VAR_NAME)?; + // optional: label the endpoint. Here we generate a label from the endpoint, + // in your app this would be used to connect with something like a userId + let id = endpoint.id().to_string(); + let label = format!("net-diagnostics-example-{}", &id[..8]); + // 3. Build a Client that dials iroh-services (as in all other examples). let client = Client::builder(&endpoint) .api_secret(secret.clone())? + .label(label)? .build() .await?; diff --git a/src/client.rs b/src/client.rs index b1fcc41a..e22ded3d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -9,7 +9,8 @@ use iroh_metrics::{MetricsGroup, Registry, encoding::Encoder}; use irpc_iroh::IrohLazyRemoteConnection; use n0_error::StackResultExt; use n0_future::{task::AbortOnDropHandle, time::Duration}; -use rcan::Rcan; +use rcan::{Expires, Rcan}; +use std::time::SystemTime; use tokio::sync::oneshot; use tracing::{debug, trace, warn}; use uuid::Uuid; @@ -21,7 +22,10 @@ use crate::protocol::PutNetworkDiagnostics; use crate::{ api_secret::ApiSecret, caps::Caps, - protocol::{ALPN, Auth, IrohServicesClient, Ping, Pong, PutMetrics, RemoteError}, + protocol::{ + ALPN, Auth, IrohServicesClient, LabelEndpoint, Ping, Pong, PutMetrics, RefreshAuthToken, + RemoteError, + }, }; /// Client is the main handle for interacting with iroh-services. It communicates with @@ -63,6 +67,7 @@ pub struct ClientBuilder { cap_expiry: Duration, cap: Option>, endpoint: Endpoint, + label: Option, metrics_interval: Option, remote: Option, registry: Registry, @@ -80,6 +85,7 @@ impl ClientBuilder { cap: None, cap_expiry: DEFAULT_CAP_EXPIRY, endpoint: endpoint.clone(), + label: None, metrics_interval: Some(Duration::from_secs(60)), remote: None, registry, @@ -108,6 +114,29 @@ impl ClientBuilder { self } + /// Set an optional human-readable name for this endpoint, making metrics + /// from this endpoint easier to identify. This is often used for associating + /// with other services in your app, like a database user id. + /// + /// When this builder method is called, the provided label label is sent + /// after the client initially authenticates the endpoint server-side. + /// Errors will not interrupt client construction, instead producing a + /// warn-level log. For explicit error handling, use [`Client::set_label`]. + /// + /// labels can be any UTF-8 string, with a min length of 2 bytes, and + /// maximum length of 128 bytes. **label uniqueness is not enforced.** + pub fn label(mut self, label: impl Into) -> Result { + let label = label.into(); + if label.len() < LABEL_MIN_LENGTH { + return Err(BuildError::InvalidLabel(ValidateLabelError::TooShort).into()); + } else if label.len() > LABEL_MAX_LENGTH { + return Err(BuildError::InvalidLabel(ValidateLabelError::TooLong).into()); + } + + self.label = Some(label); + Ok(self) + } + /// Check IROH_SERVICES_API_SECRET environment variable for a valid API secret pub fn api_secret_from_env(self) -> Result { let ticket = ApiSecret::from_env_var(API_SECRET_ENV_VAR_NAME)?; @@ -188,23 +217,34 @@ impl ClientBuilder { let capabilities = self.cap.ok_or(BuildError::MissingCapability)?; let conn = IrohLazyRemoteConnection::new(self.endpoint.clone(), remote, ALPN.to_vec()); - let client = IrohServicesClient::boxed(conn); + let irpc_client = IrohServicesClient::boxed(conn); let (tx, rx) = tokio::sync::mpsc::channel(1); - let metrics_task = AbortOnDropHandle::new(n0_future::task::spawn( + let actor_task = AbortOnDropHandle::new(n0_future::task::spawn( ClientActor { capabilities, - client, + client: irpc_client, + label: self.label.clone(), session_id: Uuid::new_v4(), authorized: false, } .run(self.registry, self.metrics_interval, rx), )); + if let Some(label) = &self.label { + let label = label.clone(); + let tx = tx.clone(); + tokio::spawn(async move { + if let Err(err) = set_label_inner(tx, label).await { + warn!(err = %err, "setting endpoint label on startup"); + } + }); + } + Ok(Client { endpoint: self.endpoint, message_channel: tx, - _actor_task: Arc::new(metrics_task), + _actor_task: Arc::new(actor_task), }) } } @@ -223,6 +263,8 @@ pub enum BuildError { Rpc(irpc::Error), #[error("Connection error: {0}")] Connect(ConnectError), + #[error("Invalid endpoint label: {0}")] + InvalidLabel(#[from] ValidateLabelError), } impl From for BuildError { @@ -241,6 +283,17 @@ impl From for BuildError { } } +pub const LABEL_MIN_LENGTH: usize = 2; +pub const LABEL_MAX_LENGTH: usize = 128; + +#[derive(Debug, thiserror::Error)] +pub enum ValidateLabelError { + #[error("Label is too long (must be no more than {LABEL_MAX_LENGTH} characters).")] + TooLong, + #[error("Label is too short (must be at least {LABEL_MIN_LENGTH} characters).")] + TooShort, +} + #[derive(thiserror::Error, Debug)] pub enum Error { #[error("Remote error: {0}")] @@ -256,6 +309,26 @@ impl Client { ClientBuilder::new(endpoint) } + /// Read the current endpoint label from the local client. + pub async fn label(&self) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + self.message_channel + .send(ClientActorMessage::ReadLabel { done: tx }) + .await + .map_err(|_| Error::Other(anyhow!("sending ping request")))?; + + rx.await + .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e))) + } + + /// Label the active endpoint cloud-side. + /// + /// labels can be any UTF-8 string, with a min length of 2 bytes, and + /// maximum length of 128 bytes. **label uniqueness is not enforced.** + pub async fn set_label(&self, label: String) -> Result<(), Error> { + set_label_inner(self.message_channel.clone(), label).await + } + /// Pings the remote node. pub async fn ping(&self) -> Result { let (tx, rx) = oneshot::channel(); @@ -337,6 +410,41 @@ impl Client { } } +/// Compute the next time the client should attempt a token refresh. +/// +/// Strategy: +/// - If the token never expires, no refresh is needed. +/// - If already expired, refresh immediately. +/// - In the last 72 hours before expiry, refresh every hour. +/// - Otherwise, refresh at the halfway point of remaining lifetime. +fn next_refresh_at(token: &Rcan) -> Option { + let now_secs = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("now is after UNIX_EPOCH") + .as_secs(); + let expires_at = match token.expires() { + Expires::At(t) => *t, + Expires::Never => return None, + }; + + let remaining = expires_at.saturating_sub(now_secs); + let seventy_two_hours: u64 = 72 * 3600; + + let delay_secs = if remaining == 0 { + // expired, refresh now + 0 + } else if remaining > seventy_two_hours { + // plenty of time left, refresh at halfway point + remaining / 2 + } else { + // in the last 72 hours: refresh hourly, or at remaining/2 if + // the token lifetime is very short + 3600.min(remaining / 2).max(1) + }; + + Some(tokio::time::Instant::now() + Duration::from_secs(delay_secs)) +} + enum ClientActorMessage { SendMetrics { done: oneshot::Sender>, @@ -356,11 +464,19 @@ enum ClientActorMessage { report: Box, done: oneshot::Sender>, }, + ReadLabel { + done: oneshot::Sender>, + }, + LabelEndpoint { + label: String, + done: oneshot::Sender>, + }, } struct ClientActor { capabilities: Rcan, client: IrohServicesClient, + label: Option, session_id: Uuid, authorized: bool, } @@ -403,6 +519,17 @@ impl ClientActor { warn!("failed to grant capability: {:#?}", err); } } + ClientActorMessage::ReadLabel{ done } => { + if let Err(err) = done.send(self.label.clone()) { + warn!("sending label value: {:#?}", err); + } + } + ClientActorMessage::LabelEndpoint{ label, done } => { + let res = self.send_label_endpoint(label).await; + if let Err(err) = done.send(res) { + warn!("failed to label endpoint: {:#?}", err); + } + } #[cfg(feature = "net_diagnostics")] ClientActorMessage::PutNetworkDiagnostics{ report, done } => { let res = self.put_network_diagnostics(*report).await; @@ -425,6 +552,17 @@ impl ClientActor { self.authorized = false; } }, + _ = async { + match next_refresh_at(&self.capabilities) { + Some(deadline) => tokio::time::sleep_until(deadline).await, + None => std::future::pending::<()>().await, + } + } => { + trace!("token refresh tick"); + if let Err(err) = self.refresh_token().await { + warn!("token refresh failed: {err:?}"); + } + }, } } } @@ -458,6 +596,38 @@ impl ClientActor { .map_err(|_| RemoteError::InternalServerError) } + async fn send_label_endpoint(&mut self, label: String) -> Result<(), RemoteError> { + trace!("client sending label endpoint request"); + self.auth().await?; + + self.client + .rpc(LabelEndpoint { + label: label.clone(), + }) + .await + .inspect_err(|e| debug!("label endpoint error: {e}")) + .map_err(|_| RemoteError::InternalServerError)??; + Ok(()) + } + + async fn refresh_token(&mut self) -> Result<(), RemoteError> { + trace!("client refreshing auth token"); + self.auth().await?; + + let resp = self + .client + .rpc(RefreshAuthToken { + original: self.capabilities.clone(), + }) + .await + .inspect_err(|e| debug!("token refresh error: {e}")) + .map_err(|_| RemoteError::InternalServerError)??; + self.capabilities = resp.updated; + self.authorized = false; + self.auth().await?; // proactively re-auth with the new token + Ok(()) + } + async fn send_metrics(&mut self, encoder: &mut Encoder) -> Result<(), RemoteError> { trace!("client actor send metrics"); self.auth().await?; @@ -508,6 +678,21 @@ impl ClientActor { } } +async fn set_label_inner( + message_channel: tokio::sync::mpsc::Sender, + label: String, +) -> Result<(), Error> { + debug!(label=%label, "calling set label"); + let (tx, rx) = oneshot::channel(); + message_channel + .send(ClientActorMessage::LabelEndpoint { label, done: tx }) + .await + .map_err(|_| Error::Other(anyhow!("sending label endpoint request")))?; + rx.await + .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))? + .map_err(Error::Remote) +} + #[cfg(test)] mod tests { use iroh::{Endpoint, EndpointAddr, SecretKey}; @@ -517,7 +702,7 @@ mod tests { Client, api_secret::ApiSecret, caps::{Cap, Caps}, - client::API_SECRET_ENV_VAR_NAME, + client::{API_SECRET_ENV_VAR_NAME, BuildError, ValidateLabelError}, }; #[tokio::test] @@ -569,4 +754,196 @@ mod tests { let err = client.push_metrics().await; assert!(err.is_err()); } + + #[tokio::test] + async fn test_token_refresh() { + use std::sync::{ + Arc, + atomic::{AtomicU32, Ordering}, + }; + + use iroh::{ + address_lookup::MemoryLookup, + endpoint::Connection, + protocol::{AcceptError, ProtocolHandler, Router}, + }; + use irpc::WithChannels; + use irpc_iroh::read_request; + use n0_error::AnyError; + use n0_future::time::Duration; + + use crate::{ + caps::create_api_token_from_secret_key, + protocol::{ + ALPN, IrohServicesProtocol, Pong, RefreshAuthTokenResponse, ServicesMessage, + }, + }; + + /// Mock server that handles Auth, Ping, and RefreshAuthToken. + #[derive(Debug)] + struct MockServer { + endpoint: Endpoint, + refresh_count: Arc, + } + + impl ProtocolHandler for MockServer { + async fn accept(&self, connection: Connection) -> Result<(), AcceptError> { + self.handle_connection(connection).await.map_err(|e| { + let boxed: Box = e.into(); + AcceptError::from(AnyError::from(boxed)) + }) + } + } + + impl MockServer { + async fn handle_connection(&self, connection: Connection) -> anyhow::Result<()> { + let remote_node_id = connection.remote_id(); + + // First message must be Auth + let Some(first_request) = read_request::(&connection).await? + else { + return Ok(()); + }; + + let ServicesMessage::Auth(WithChannels { inner: _, tx, .. }) = first_request else { + connection.close(400u32.into(), b"Expected initial auth message"); + return Ok(()); + }; + tx.send(()).await?; + + // Handle subsequent requests in a loop + loop { + let Some(request) = read_request::(&connection).await? + else { + return Ok(()); + }; + + match request { + ServicesMessage::Auth(WithChannels { tx, .. }) => { + tx.send(()).await?; + } + ServicesMessage::Ping(WithChannels { inner, tx, .. }) => { + tx.send(Pong { + req_id: inner.req_id, + }) + .await?; + } + ServicesMessage::RefreshAuthToken(WithChannels { + inner: _, tx, .. + }) => { + self.refresh_count.fetch_add(1, Ordering::Relaxed); + // Issue a fresh token with a short expiry + let new_cap = create_api_token_from_secret_key( + self.endpoint.secret_key().clone(), + remote_node_id, + Duration::from_secs(4), + Caps::for_shared_secret(), + ) + .unwrap(); + tx.send(Ok(RefreshAuthTokenResponse { updated: new_cap })) + .await?; + } + _ => { + // Ignore other messages + } + } + } + } + } + + let lookup = MemoryLookup::new(); + let server_ep = Endpoint::empty_builder() + .address_lookup(lookup.clone()) + .bind() + .await + .unwrap(); + + let client_ep = Endpoint::empty_builder() + .address_lookup(lookup.clone()) + .bind() + .await + .unwrap(); + + let refresh_count = Arc::new(AtomicU32::new(0)); + let mock = MockServer { + endpoint: server_ep.clone(), + refresh_count: refresh_count.clone(), + }; + + let router = Router::builder(server_ep.clone()) + .accept(ALPN, mock) + .spawn(); + + // Create a short-lived token (4 seconds) so refresh fires at ~2 seconds + let cap = create_api_token_from_secret_key( + server_ep.secret_key().clone(), + client_ep.id(), + Duration::from_secs(4), + Caps::for_shared_secret(), + ) + .unwrap(); + + let client = Client::builder(&client_ep) + .disable_metrics_interval() + .remote(server_ep.addr()) + .rcan(cap) + .unwrap() + .build() + .await + .unwrap(); + + // Initial ping should work + let pong = client.ping().await; + assert!(pong.is_ok(), "initial ping should succeed"); + + // Wait for refresh to happen (token is 4s, refresh at ~2s) + tokio::time::sleep(Duration::from_secs(3)).await; + + assert!( + refresh_count.load(Ordering::Relaxed) >= 1, + "expected at least one token refresh" + ); + + // Ping should still work after refresh + let pong = client.ping().await; + assert!(pong.is_ok(), "ping after refresh should succeed"); + + router.shutdown().await.unwrap(); + client_ep.close().await; + } + + #[tokio::test] + async fn test_label() { + let mut rng = rand::rng(); + let shared_secret = SecretKey::generate(&mut rng); + let fake_endpoint_id = SecretKey::generate(&mut rng).public(); + let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id); + + let endpoint = Endpoint::empty_builder().bind().await.unwrap(); + + let builder = Client::builder(&endpoint) + .label("my-node 👋") + .unwrap() + .api_secret(api_secret) + .unwrap(); + + assert_eq!(builder.label, Some("my-node 👋".to_string())); + + let Err(err) = Client::builder(&endpoint).label("a") else { + panic!("label should fail for strings under 2 bytes"); + }; + assert!(matches!( + err.downcast_ref::(), + Some(BuildError::InvalidLabel(ValidateLabelError::TooShort)) + )); + + let too_long_name = "👋".repeat(129); + let Err(err) = Client::builder(&endpoint).label(&too_long_name) else { + panic!("label should fail for strings over 1024 bytes"); + }; + assert!(matches!( + err.downcast_ref::(), + Some(BuildError::InvalidLabel(ValidateLabelError::TooLong)) + )); + } } diff --git a/src/protocol.rs b/src/protocol.rs index fc62d119..98389b85 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -27,6 +27,12 @@ pub enum IrohServicesProtocol { #[rpc(tx=oneshot::Sender>)] GrantCap(GrantCap), + + #[rpc(tx=oneshot::Sender>)] + LabelEndpoint(LabelEndpoint), + + #[rpc(tx=oneshot::Sender>)] + RefreshAuthToken(RefreshAuthToken), } /// Dedicated protocol for cloud-to-endpoint net diagnostics connections. @@ -95,3 +101,21 @@ pub struct RunNetworkDiagnostics; pub struct GrantCap { pub cap: Rcan, } + +/// Label the client endpoint cloud-side with a string identifier. +#[derive(Debug, Serialize, Deserialize)] +pub struct LabelEndpoint { + pub label: String, +} + +/// Request an updated authorization token +#[derive(Debug, Serialize, Deserialize)] +pub struct RefreshAuthToken { + pub original: Rcan, +} + +/// Successful response from server with updated token +#[derive(Debug, Serialize, Deserialize)] +pub struct RefreshAuthTokenResponse { + pub updated: Rcan, +}