Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/vendor-wit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ make_vendor "wasi-http/src/p3" "
cli@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
clocks@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
filesystem@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
http@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
http@e3df7c179510880030a881ccea6990fb48b62715@wit-0.3.0-draft
random@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
sockets@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
"
Expand Down
6 changes: 2 additions & 4 deletions crates/test-programs/src/bin/p3_http_echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ impl Handler for Component {
/// Return a response which echoes the request headers, body, and trailers.
async fn handle(request: Request) -> Result<Response, ErrorCode> {
let headers = request.get_headers();
let (body, trailers) = request.consume_body().unwrap();

// let (headers, body) = Request::into_parts(request);
let (_, result_rx) = wit_future::new(|| Ok(()));
let (body, trailers) = Request::consume_body(request, result_rx);

let (response, _result) = if false {
// This is the easy and efficient way to do it...
Expand Down Expand Up @@ -47,7 +46,6 @@ impl Handler for Component {
drop(pipe_tx);

trailers_tx.write(trailers.await).await.unwrap();
drop(request);
});

Response::new(headers, Some(pipe_rx), trailers_rx)
Expand Down
9 changes: 4 additions & 5 deletions crates/test-programs/src/bin/p3_http_middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ impl Handler for Component {
}
_ => true,
});
let (mut body, trailers) = request.consume_body().unwrap();
let (_, result_rx) = wit_future::new(|| Ok(()));
let (mut body, trailers) = Request::consume_body(request, result_rx);

let (body, trailers) = if content_deflated {
// Next, spawn a task to pipe and decode the original request body and trailers into a new request
Expand Down Expand Up @@ -77,8 +78,6 @@ impl Handler for Component {
}

trailers_tx.write(trailers.await).await.unwrap();

drop(request);
});

(pipe_rx, trailers_rx)
Expand Down Expand Up @@ -110,7 +109,8 @@ impl Handler for Component {
headers.push(("content-encoding".into(), b"deflate".into()));
}

let (mut body, trailers) = response.consume_body().unwrap();
let (_, result_rx) = wit_future::new(|| Ok(()));
let (mut body, trailers) = Response::consume_body(response, result_rx);
let (body, trailers) = if accept_deflated {
headers.retain(|(name, _value)| name != "content-length");

Expand Down Expand Up @@ -141,7 +141,6 @@ impl Handler for Component {
}

trailers_tx.write(trailers.await).await.unwrap();
drop(response);
});

(pipe_rx, trailers_rx)
Expand Down
5 changes: 2 additions & 3 deletions crates/test-programs/src/p3/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ pub async fn request(
let response = handler::handle(request).await?;
let status = response.get_status_code();
let headers = response.get_headers().copy_all();
let (body_rx, trailers_rx) = response
.consume_body()
.expect("failed to get response body");
let (_, result_rx) = wit_future::new(|| Ok(()));
let (body_rx, trailers_rx) = types::Response::consume_body(response, result_rx);
let ((), rx) = join!(
async {
if let Some(buf) = body {
Expand Down
4 changes: 2 additions & 2 deletions crates/wasi-http/src/p3/bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ mod generated {
"wasi:http/handler/[async]handle": async | store | trappable | tracing,
"wasi:http/types/[drop]request": store | trappable | tracing,
"wasi:http/types/[drop]response": store | trappable | tracing,
"wasi:http/types/[method]request.consume-body": async | store | trappable | tracing,
"wasi:http/types/[method]response.consume-body": async | store | trappable | tracing,
"wasi:http/types/[static]request.consume-body": async | store | trappable | tracing,
"wasi:http/types/[static]request.new": async | store | trappable | tracing,
"wasi:http/types/[static]response.consume-body": async | store | trappable | tracing,
"wasi:http/types/[static]response.new": async | store | trappable | tracing,
default: trappable | tracing,
},
Expand Down
95 changes: 43 additions & 52 deletions crates/wasi-http/src/p3/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,55 +37,72 @@ pub(crate) enum Body {
/// Channel, on which transmission result will be written
result_tx: oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>,
},
/// Body is consumed.
Consumed,
}

/// [FutureConsumer] implementation for future passed to `consume-body`.
struct BodyResultConsumer(
Option<oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>>,
);

impl<D> FutureConsumer<D> for BodyResultConsumer
where
D: 'static,
{
type Item = Result<(), ErrorCode>;

fn poll_consume(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
store: StoreContextMut<D>,
mut src: Source<'_, Self::Item>,
_: bool,
) -> Poll<wasmtime::Result<()>> {
let mut res = None;
src.read(store, &mut res).context("failed to read result")?;
let res = res.context("result value missing")?;
let tx = self.0.take().context("polled after returning `Ready`")?;
_ = tx.send(Box::new(async { res }));
Poll::Ready(Ok(()))
}
}

impl Body {
/// Implementation of `consume-body` shared between requests and responses
pub(crate) fn consume<T>(
self,
mut store: Access<'_, T, WasiHttp>,
fut: FutureReader<Result<(), ErrorCode>>,
getter: fn(&mut T) -> WasiHttpCtxView<'_>,
) -> Result<
(
StreamReader<u8>,
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
),
(),
> {
) -> (
StreamReader<u8>,
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
) {
match self {
Body::Guest {
contents_rx: Some(contents_rx),
trailers_rx,
result_tx,
} => {
// TODO: Use a result specified by the caller
// https://github.com/WebAssembly/wasi-http/issues/176
_ = result_tx.send(Box::new(async { Ok(()) }));
Ok((contents_rx, trailers_rx))
fut.pipe(&mut store, BodyResultConsumer(Some(result_tx)));
(contents_rx, trailers_rx)
}
Body::Guest {
contents_rx: None,
trailers_rx,
result_tx,
} => {
fut.pipe(&mut store, BodyResultConsumer(Some(result_tx)));
let instance = store.instance();
// TODO: Use a result specified by the caller
// https://github.com/WebAssembly/wasi-http/issues/176
_ = result_tx.send(Box::new(async { Ok(()) }));
Ok((
(
StreamReader::new(instance, &mut store, iter::empty()),
trailers_rx,
))
)
}
Body::Host { body, result_tx } => {
fut.pipe(&mut store, BodyResultConsumer(Some(result_tx)));
let instance = store.instance();
// TODO: Use a result specified by the caller
// https://github.com/WebAssembly/wasi-http/issues/176
_ = result_tx.send(Box::new(async { Ok(()) }));
let (trailers_tx, trailers_rx) = oneshot::channel();
Ok((
(
StreamReader::new(
instance,
&mut store,
Expand All @@ -96,9 +113,8 @@ impl Body {
},
),
FutureReader::new(instance, &mut store, trailers_rx),
))
)
}
Body::Consumed => Err(()),
}
}

Expand Down Expand Up @@ -390,31 +406,6 @@ impl http_body::Body for GuestBody {
}
}

/// [http_body::Body] that has been consumed.
pub(crate) struct ConsumedBody;

impl http_body::Body for ConsumedBody {
type Data = Bytes;
type Error = ErrorCode;

fn poll_frame(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
Poll::Ready(Some(Err(ErrorCode::InternalError(Some(
"body consumed".into(),
)))))
}

fn is_end_stream(&self) -> bool {
true
}

fn size_hint(&self) -> http_body::SizeHint {
http_body::SizeHint::with_exact(0)
}
}

/// [FutureConsumer] implementation for trailers originating in the guest.
struct GuestTrailerConsumer<T> {
tx: Option<oneshot::Sender<Result<Option<Arc<HeaderMap>>, ErrorCode>>>,
Expand All @@ -434,10 +425,10 @@ where
mut src: Source<'_, Self::Item>,
_: bool,
) -> Poll<wasmtime::Result<()>> {
let mut result = None;
src.read(store.as_context_mut(), &mut result)
let mut res = None;
src.read(&mut store, &mut res)
.context("failed to read result")?;
let res = match result.context("result value missing")? {
let res = match res.context("result value missing")? {
Ok(Some(trailers)) => {
let WasiHttpCtxView { table, .. } = (self.getter)(store.data_mut());
let trailers = table
Expand Down
3 changes: 1 addition & 2 deletions crates/wasi-http/src/p3/host/handler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::get_content_length;
use crate::p3::bindings::http::handler::{Host, HostWithStore};
use crate::p3::bindings::http::types::{ErrorCode, Request, Response};
use crate::p3::body::{Body, ConsumedBody, GuestBody};
use crate::p3::body::{Body, GuestBody};
use crate::p3::{HttpError, HttpResult, WasiHttp, WasiHttpCtxView};
use anyhow::Context as _;
use bytes::Bytes;
Expand Down Expand Up @@ -275,7 +275,6 @@ impl HostWithStore for WasiHttp {
body.with_state(io_task_rx).boxed()
}
}
Body::Consumed => ConsumedBody.boxed(),
};

let WasiHttpCtxView { ctx, .. } = store.get();
Expand Down
47 changes: 22 additions & 25 deletions crates/wasi-http/src/p3/host/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::p3::bindings::http::types::{
use crate::p3::body::Body;
use crate::p3::{HeaderResult, HttpError, RequestOptionsResult, WasiHttp, WasiHttpCtxView};
use anyhow::Context as _;
use core::mem;
use core::pin::Pin;
use core::task::{Context, Poll, ready};
use http::header::CONTENT_LENGTH;
Expand Down Expand Up @@ -356,20 +355,19 @@ impl HostRequestWithStore for WasiHttp {
async fn consume_body<T>(
store: &Accessor<T, Self>,
req: Resource<Request>,
) -> wasmtime::Result<
Result<
(
StreamReader<u8>,
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
),
(),
>,
> {
fut: FutureReader<Result<(), ErrorCode>>,
) -> wasmtime::Result<(
StreamReader<u8>,
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
)> {
let getter = store.getter();
store.with(|mut store| {
let Request { body, .. } = get_request_mut(store.get().table, &req)?;
let body = mem::replace(body, Body::Consumed);
Ok(body.consume(store, getter))
let Request { body, .. } = store
.get()
.table
.delete(req)
.context("failed to delete request from table")?;
Ok(body.consume(store, fut, getter))
})
}

Expand Down Expand Up @@ -633,20 +631,19 @@ impl HostResponseWithStore for WasiHttp {
async fn consume_body<T>(
store: &Accessor<T, Self>,
res: Resource<Response>,
) -> wasmtime::Result<
Result<
(
StreamReader<u8>,
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
),
(),
>,
> {
fut: FutureReader<Result<(), ErrorCode>>,
) -> wasmtime::Result<(
StreamReader<u8>,
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
)> {
let getter = store.getter();
store.with(|mut store| {
let Response { body, .. } = get_response_mut(store.get().table, &res)?;
let body = mem::replace(body, Body::Consumed);
Ok(body.consume(store, getter))
let Response { body, .. } = store
.get()
.table
.delete(res)
.context("failed to delete response from table")?;
Ok(body.consume(store, fut, getter))
})
}

Expand Down
3 changes: 1 addition & 2 deletions crates/wasi-http/src/p3/response.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::get_content_length;
use crate::p3::bindings::http::types::ErrorCode;
use crate::p3::body::{Body, ConsumedBody, GuestBody};
use crate::p3::body::{Body, GuestBody};
use crate::p3::{WasiHttpCtxView, WasiHttpView};
use anyhow::Context as _;
use bytes::Bytes;
Expand Down Expand Up @@ -86,7 +86,6 @@ impl Response {
_ = result_tx.send(Box::new(fut));
body
}
Body::Consumed => ConsumedBody.boxed(),
};
Ok(http::Response::from_parts(res, body))
}
Expand Down
36 changes: 10 additions & 26 deletions crates/wasi-http/src/p3/wit/deps/http/types.wit
Original file line number Diff line number Diff line change
Expand Up @@ -316,20 +316,12 @@ interface types {
/// future to determine whether the body was received successfully.
/// The future will only resolve after the stream is reported as closed.
///
/// The stream and future returned by this method are children:
/// they should be closed or consumed before the parent `response`
/// is dropped, or its ownership is transferred to another component
/// by e.g. `handler.handle`.
/// This function takes a `res` future as a parameter, which can be used to
/// communicate an error in handling of the request.
///
/// This method may be called multiple times.
///
/// This method will return an error if it is called while either:
/// - a stream or future returned by a previous call to this method is still open
/// - a stream returned by a previous call to this method has reported itself as closed
/// Thus there will always be at most one readable stream open for a given body.
/// Each subsequent stream picks up where the previous one left off,
/// continuing until the entire body has been consumed.
consume-body: func() -> result<tuple<stream<u8>, future<result<option<trailers>, error-code>>>>;
/// Note that function will move the `request`, but references to headers or
/// request options acquired from it previously will remain valid.
consume-body: static func(this: request, res: future<result<_, error-code>>) -> tuple<stream<u8>, future<result<option<trailers>, error-code>>>;
}

/// Parameters for making an HTTP Request. Each of these parameters is
Expand Down Expand Up @@ -417,19 +409,11 @@ interface types {
/// future to determine whether the body was received successfully.
/// The future will only resolve after the stream is reported as closed.
///
/// The stream and future returned by this method are children:
/// they should be closed or consumed before the parent `response`
/// is dropped, or its ownership is transferred to another component
/// by e.g. `handler.handle`.
///
/// This method may be called multiple times.
/// This function takes a `res` future as a parameter, which can be used to
/// communicate an error in handling of the response.
///
/// This method will return an error if it is called while either:
/// - a stream or future returned by a previous call to this method is still open
/// - a stream returned by a previous call to this method has reported itself as closed
/// Thus there will always be at most one readable stream open for a given body.
/// Each subsequent stream picks up where the previous one left off,
/// continuing until the entire body has been consumed.
consume-body: func() -> result<tuple<stream<u8>, future<result<option<trailers>, error-code>>>>;
/// Note that function will move the `response`, but references to headers
/// acquired from it previously will remain valid.
consume-body: static func(this: response, res: future<result<_, error-code>>) -> tuple<stream<u8>, future<result<option<trailers>, error-code>>>;
}
}