diff --git a/Cargo.toml b/Cargo.toml index dac829d..a7a6069 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ base64 = "0.22.1" bitcoin = { version = "0.32.8", features = ["serde", "base64"] } corepc-types = "0.10.1" # keep in sync with corepc-node hex = { package = "hex-conservative", version = "0.2.1" } # for optimization keep in sync with bitcoin -bitreq = { version = "0.3", default-features = false, features = ["async-https"] } +bitreq = { version = "0.3.4", default-features = false, features = ["async-https"] } secp256k1 = { version = "0.29.1", features = [ # for optimization keep in sync with bitcoin "global-context", ] } diff --git a/src/client/mod.rs b/src/client/mod.rs index faa9460..9e69c8f 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -298,3 +298,122 @@ impl Client { self.call::(method, params).await } } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpStream}, + sync::oneshot, + time::{sleep, timeout}, + }; + + use super::*; + + async fn read_http_request(stream: &mut TcpStream) { + let mut buf = vec![0u8; 4096]; + let mut total = Vec::new(); + loop { + let n = stream.read(&mut buf).await.expect("read request"); + if n == 0 { + break; + } + total.extend_from_slice(&buf[..n]); + let Some(hdr_end) = total.windows(4).position(|w| w == b"\r\n\r\n") else { + continue; + }; + let headers = std::str::from_utf8(&total[..hdr_end]).unwrap_or(""); + let cl: usize = headers + .lines() + .find_map(|l| { + let mut parts = l.splitn(2, ':'); + let k = parts.next()?.trim(); + if k.eq_ignore_ascii_case("Content-Length") { + parts.next()?.trim().parse().ok() + } else { + None + } + }) + .unwrap_or(0); + if total.len() >= hdr_end + 4 + cl { + break; + } + } + } + + async fn write_json_response(stream: &mut TcpStream, body: &str) { + let response = format!( + concat!( + "HTTP/1.1 200 OK\r\n", + "Content-Type: application/json\r\n", + "Connection: keep-alive\r\n", + "Content-Length: {}\r\n\r\n{}" + ), + body.len(), + body, + ); + stream + .write_all(response.as_bytes()) + .await + .expect("write response"); + stream.flush().await.expect("flush response"); + } + + /// Regression test for issue #101: a pooled keep-alive socket that is later + /// closed server-side must not permanently poison future RPC calls. + #[tokio::test] + async fn retry_recovers_from_dead_pooled_connection() { + let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind"); + let addr = listener.local_addr().expect("addr"); + + let (ready_tx, ready_rx) = oneshot::channel(); + let server = tokio::spawn(async move { + let (mut first_stream, _) = listener.accept().await.expect("accept 1"); + read_http_request(&mut first_stream).await; + write_json_response( + &mut first_stream, + r#"{"result":"first","error":null,"id":0}"#, + ) + .await; + + // Keep the socket alive long enough for the client to cache it, then + // close it server-side to mimic bitcoind's rpcservertimeout behavior. + sleep(Duration::from_millis(100)).await; + drop(first_stream); + let _ = ready_tx.send(()); + + let (mut second_stream, _) = listener.accept().await.expect("accept 2"); + read_http_request(&mut second_stream).await; + write_json_response( + &mut second_stream, + r#"{"result":"second","error":null,"id":1}"#, + ) + .await; + }); + + let url = format!("http://{}", addr); + let client = Client::new( + url, + Auth::UserPass("user".into(), "pass".into()), + Some(3), + Some(10), + Some(5), + ) + .expect("client"); + + let first: String = client.call("ping", &[]).await.expect("first call"); + assert_eq!(first, "first"); + + ready_rx.await.expect("ready signal"); + + let second: String = timeout(Duration::from_secs(5), client.call("ping", &[])) + .await + .expect("call did not time out") + .expect("second call"); + assert_eq!(second, "second"); + + server.await.expect("server task"); + } +}