Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions tower/src/retry/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ where
loop {
match this.state.as_mut().project() {
StateProj::Called { future } => {
let result = ready!(future.poll(cx));
if let Some(ref req) = this.request {
match this.retry.policy.retry(req, result.as_ref()) {
let mut result = ready!(future.poll(cx));
if let Some(req) = &mut this.request {
match this.retry.policy.retry(req, &mut result) {
Some(checking) => {
this.state.set(State::Checking { checking });
}
Expand Down
19 changes: 17 additions & 2 deletions tower/src/retry/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::future::Future;
/// impl<E> Policy<Req, Res, E> for Attempts {
/// type Future = future::Ready<Self>;
///
/// fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
/// fn retry(&self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future> {
/// match result {
/// Ok(_) => {
/// // Treat all `Response`s as success,
Expand Down Expand Up @@ -56,9 +56,24 @@ pub trait Policy<Req, Res, E>: Sized {
/// If the request *should* be retried, return `Some` future of a new
/// policy that would apply for the next request attempt.
///
/// ## Mutating Requests
///
/// The policy MAY chose to mutate the `req`: if the request is mutated, the mutated request
/// will be sent to the inner service in the next retry. This can be helpful for use cases like
/// tracking the retry count in a header.
///
/// ## Mutating Results
///
/// The policy MAY chose to mutate the result. This can enable the retry policy to convert a failure
Comment thread
rcoh marked this conversation as resolved.
Outdated
/// into a success and vice versa. For example, if the policy is used to poll while waiting for a state
/// change, you can switch the result to failure such that running out of retries returns a specific failure.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in general, we prefer to avoid second-person ('you') language in docs:

Suggested change
/// change, you can switch the result to failure such that running out of retries returns a specific failure.
/// change, the policy can switch the result to failure such that running out of retries returns a specific failure.

///
/// You can also record metadata on the request to include information about the number of retries required
/// or to record that a failure failed after exhausting all retries.
Comment thread
rcoh marked this conversation as resolved.
Outdated
Comment thread
rcoh marked this conversation as resolved.
Outdated
///
/// [`Service::Response`]: crate::Service::Response
/// [`Service::Error`]: crate::Service::Error
fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future>;
fn retry(&self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future>;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess getting a &mut Result<Res, E> allows users to change Err(_) into Ok(_) which might be kinda odd. Not sure. Feels like getting Result<&mut Res, &mut E> would fit the use-cases as well but not allow changing the Result variant.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, that is a really good point david, what do you think about that @rcoh ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment wasn't threaded because of email:

The entire point is to change the variant (at least for me). Because there
are "success" cases the actually need to be retries and when I run out of
retries I need to change it to failure


/// Tries to clone a request before being passed to the inner service.
///
Expand Down
2 changes: 1 addition & 1 deletion tower/tests/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
{
type Future = Ready<Self>;

fn retry(&self, _req: &Req, _result: Result<&Res, &E>) -> Option<Self::Future> {
fn retry(&self, _req: &mut Req, _result: &mut Result<Res, E>) -> Option<Self::Future> {
None
}

Expand Down
61 changes: 56 additions & 5 deletions tower/tests/retry/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,27 @@ async fn success_with_cannot_clone() {
assert_ready_ok!(fut.poll(), "world");
}

#[tokio::test(flavor = "current_thread")]
async fn retry_mutating_policy() {
let _t = support::trace_init();

let (mut service, mut handle) = new_service(MutatingPolicy { remaining: 2 });

assert_ready_ok!(service.poll_ready());
let mut fut = task::spawn(service.call("hello"));

assert_request_eq!(handle, "hello").send_response("world");
assert_pending!(fut.poll());
// the policy alters the request. in real life, this might be setting a header
assert_request_eq!(handle, "retrying").send_response("world");

assert_pending!(fut.poll());

assert_request_eq!(handle, "retrying").send_response("world");

assert_ready_err!(fut.poll(), "out of retries");
}

type Req = &'static str;
type Res = &'static str;
type InnerError = &'static str;
Expand All @@ -102,7 +123,7 @@ struct RetryErrors;

impl Policy<Req, Res, Error> for RetryErrors {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
fn retry(&self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
if result.is_err() {
Some(future::ready(RetryErrors))
} else {
Expand All @@ -120,7 +141,7 @@ struct Limit(usize);

impl Policy<Req, Res, Error> for Limit {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
fn retry(&self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
if result.is_err() && self.0 > 0 {
Some(future::ready(Limit(self.0 - 1)))
} else {
Expand All @@ -138,8 +159,8 @@ struct UnlessErr(InnerError);

impl Policy<Req, Res, Error> for UnlessErr {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
result.err().and_then(|err| {
fn retry(&self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
result.as_ref().err().and_then(|err| {
if err.to_string() != self.0 {
Some(future::ready(self.clone()))
} else {
Expand All @@ -158,7 +179,7 @@ struct CannotClone;

impl Policy<Req, Res, Error> for CannotClone {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, _: Result<&Res, &Error>) -> Option<Self::Future> {
fn retry(&self, _: &mut Req, _: &mut Result<Res, Error>) -> Option<Self::Future> {
unreachable!("retry cannot be called since request isn't cloned");
}

Expand All @@ -167,6 +188,36 @@ impl Policy<Req, Res, Error> for CannotClone {
}
}

/// Test policy that changes the request to `retrying` during retries and the result to `"out of retries"`
/// when retries are exhausted.
#[derive(Clone)]
struct MutatingPolicy {
remaining: usize,
}

impl Policy<Req, Res, Error> for MutatingPolicy
where
Error: From<&'static str>,
{
type Future = future::Ready<Self>;

fn retry(&self, req: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
if self.remaining == 0 {
*result = Err("out of retries".into());
None
} else {
*req = "retrying";
Some(future::ready(MutatingPolicy {
remaining: self.remaining - 1,
}))
}
}

fn clone_request(&self, req: &Req) -> Option<Req> {
Some(*req)
}
}

fn new_service<P: Policy<Req, Res, Error> + Clone>(
policy: P,
) -> (mock::Spawn<tower::retry::Retry<P, Mock>>, Handle) {
Expand Down