Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ base64 = "0.22.1"
bitcoin = { version = "0.32.8", features = ["serde", "base64"] }
corepc-types = "0.10.1" # keep in sync with corepc-node
hex = { package = "hex-conservative", version = "0.2.1" } # for optimization keep in sync with bitcoin
bitreq = { version = "0.3", default-features = false, features = ["async-https"] }
bitreq = { version = "0.3.4", default-features = false, features = ["async-https"] }
secp256k1 = { version = "0.29.1", features = [ # for optimization keep in sync with bitcoin
"global-context",
] }
Expand Down
119 changes: 119 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,122 @@ impl Client {
self.call::<R>(method, params).await
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
sync::oneshot,
time::{sleep, timeout},
};

use super::*;

async fn read_http_request(stream: &mut TcpStream) {
let mut buf = vec![0u8; 4096];
let mut total = Vec::new();
loop {
let n = stream.read(&mut buf).await.expect("read request");
if n == 0 {
break;
}
total.extend_from_slice(&buf[..n]);
let Some(hdr_end) = total.windows(4).position(|w| w == b"\r\n\r\n") else {
continue;
};
let headers = std::str::from_utf8(&total[..hdr_end]).unwrap_or("");
let cl: usize = headers
.lines()
.find_map(|l| {
let mut parts = l.splitn(2, ':');
let k = parts.next()?.trim();
if k.eq_ignore_ascii_case("Content-Length") {
parts.next()?.trim().parse().ok()
} else {
None
}
})
.unwrap_or(0);
if total.len() >= hdr_end + 4 + cl {
break;
}
}
}

async fn write_json_response(stream: &mut TcpStream, body: &str) {
let response = format!(
concat!(
"HTTP/1.1 200 OK\r\n",
"Content-Type: application/json\r\n",
"Connection: keep-alive\r\n",
"Content-Length: {}\r\n\r\n{}"
),
body.len(),
body,
);
stream
.write_all(response.as_bytes())
.await
.expect("write response");
stream.flush().await.expect("flush response");
}

/// Regression test for issue #101: a pooled keep-alive socket that is later
/// closed server-side must not permanently poison future RPC calls.
#[tokio::test]
async fn retry_recovers_from_dead_pooled_connection() {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr = listener.local_addr().expect("addr");

let (ready_tx, ready_rx) = oneshot::channel();
let server = tokio::spawn(async move {
let (mut first_stream, _) = listener.accept().await.expect("accept 1");
read_http_request(&mut first_stream).await;
write_json_response(
&mut first_stream,
r#"{"result":"first","error":null,"id":0}"#,
)
.await;

// Keep the socket alive long enough for the client to cache it, then
// close it server-side to mimic bitcoind's rpcservertimeout behavior.
sleep(Duration::from_millis(100)).await;
drop(first_stream);
let _ = ready_tx.send(());

let (mut second_stream, _) = listener.accept().await.expect("accept 2");
read_http_request(&mut second_stream).await;
write_json_response(
&mut second_stream,
r#"{"result":"second","error":null,"id":1}"#,
)
.await;
});

let url = format!("http://{}", addr);
let client = Client::new(
url,
Auth::UserPass("user".into(), "pass".into()),
Some(3),
Some(10),
Some(5),
)
.expect("client");

let first: String = client.call("ping", &[]).await.expect("first call");
assert_eq!(first, "first");

ready_rx.await.expect("ready signal");

let second: String = timeout(Duration::from_secs(5), client.call("ping", &[]))
.await
.expect("call did not time out")
.expect("second call");
assert_eq!(second, "second");

server.await.expect("server task");
}
}
Loading