Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry: Change Policy to accept &mut self #681

Merged
merged 10 commits into from
Aug 23, 2022
9 changes: 3 additions & 6 deletions tower/src/retry/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ where

impl<P, S, Request> Future for ResponseFuture<P, S, Request>
where
P: Policy<Request, S::Response, S::Error> + Clone,
P: Policy<Request, S::Response, S::Error>,
S: Service<Request> + Clone,
{
type Output = Result<S::Response, S::Error>;
Expand All @@ -88,11 +88,8 @@ where
}
}
StateProj::Checking { checking } => {
this.retry
.as_mut()
.project()
.policy
.set(ready!(checking.poll(cx)));
ready!(checking.poll(cx));
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved

this.state.set(State::Retrying);
}
StateProj::Retrying => {
Expand Down
13 changes: 12 additions & 1 deletion tower/src/retry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,20 @@ pin_project! {
/// Configure retrying requests of "failed" responses.
///
/// A [`Policy`] classifies what is a "failed" response.
///
/// # Clone
///
/// This middleware requires that the `Service` is `Clone` due to requirements
/// of `'static` futures. To easily add `Clone` to your service you can
/// use the `Buffer` middleware.
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
///
/// The `Policy` is also required to implement `Clone`. This middleware will
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully this and the removal of the Clone and the relaxation of the bounds on ResponseFuture make this more clear.

LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
/// clone the policy for each _request session_. This means one instance
/// of the policy will exist for one request and its subsequent retries only.
/// This means the mutable reference is only for that session and if you want
/// to share data between request sessions then you will need to `Arc<Mutex<...>`.
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Clone, Debug)]
pub struct Retry<P, S> {
#[pin]
policy: P,
service: S,
}
Expand Down
26 changes: 16 additions & 10 deletions tower/src/retry/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ use std::future::Future;
/// struct Attempts(usize);
///
/// impl<E> Policy<Req, Res, E> for Attempts {
/// type Future = future::Ready<Self>;
/// type Future = future::Ready<()>;
///
/// fn retry(&self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future> {
/// fn retry(&mut self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future> {
/// match result {
/// Ok(_) => {
/// // Treat all `Response`s as success,
Expand All @@ -28,7 +28,8 @@ use std::future::Future;
/// // But we limit the number of attempts...
/// if self.0 > 0 {
/// // Try again!
/// Some(future::ready(Attempts(self.0 - 1)))
/// self.0 -= 1;
/// Some(future::ready(()))
/// } else {
/// // Used all our attempts, no retry...
/// None
Expand All @@ -37,14 +38,14 @@ use std::future::Future;
/// }
/// }
///
/// fn clone_request(&self, req: &Req) -> Option<Req> {
/// fn clone_request(&mut self, req: &Req) -> Option<Req> {
/// Some(req.clone())
/// }
/// }
/// ```
pub trait Policy<Req, Res, E>: Sized {
pub trait Policy<Req, Res, E> {
/// The [`Future`] type returned by [`Policy::retry`].
type Future: Future<Output = Self>;
type Future: Future<Output = ()>;
Comment on lines 47 to +48
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changing this to return a Future<Output = ()> and duplicating the Policy using Clone rather than having the future return a new policy definitely simplifies things, but it occurs to me that asynchronous work can no longer be performed in order to update the Policy. i can't immediately come up with a reason that it would be necessary to generate the next Policy asynchronously, but it seems like it could be worth thinking about before we commit to this design...?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about it, i suppose that if an implementation needed to modify the Policy after the future's completion, it could always clone an Arced shared state into its Future...this may introduce some overhead over the current approach, but I don't really think that use case is common enough that it's particularly important to worry about...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think that use case is something we should support with that level and anyways taking a lock on an uncontested arc/mutex should be very cheap and much cheaper than request io etc.


/// Check the policy if a certain request should be retried.
///
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -53,8 +54,9 @@ pub trait Policy<Req, Res, E>: Sized {
///
/// If the request should **not** be retried, return `None`.
///
/// If the request *should* be retried, return `Some` future of a new
/// policy that would apply for the next request attempt.
/// If the request *should* be retried, return `Some` future that will delay
/// the next retry of the request. This can be used to sleep for a certain
/// duration or resolve right away.
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
///
/// ## Mutating Requests
///
Expand All @@ -77,10 +79,14 @@ pub trait Policy<Req, Res, E>: Sized {
///
/// [`Service::Response`]: crate::Service::Response
/// [`Service::Error`]: crate::Service::Error
fn retry(&self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future>;
fn retry(&mut self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future>;

/// Tries to clone a request before being passed to the inner service.
///
/// If the request cannot be cloned, return [`None`].
fn clone_request(&self, req: &Req) -> Option<Req>;
fn clone_request(&mut self, req: &Req) -> Option<Req>;
}

// Ensure `Policy` is object safe
#[cfg(test)]
fn _obj_safe(_: Box<dyn Policy<(), (), (), Future = futures::future::Ready<()>>>) {}
6 changes: 3 additions & 3 deletions tower/tests/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ where
Req: Clone,
E: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
{
type Future = Ready<Self>;
type Future = Ready<()>;

fn retry(&self, _req: &mut Req, _result: &mut Result<Res, E>) -> Option<Self::Future> {
fn retry(&mut self, _req: &mut Req, _result: &mut Result<Res, E>) -> Option<Self::Future> {
None
}

fn clone_request(&self, req: &Req) -> Option<Req> {
fn clone_request(&mut self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}
42 changes: 21 additions & 21 deletions tower/tests/retry/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,16 @@ type Handle = mock::Handle<Req, Res>;
struct RetryErrors;

impl Policy<Req, Res, Error> for RetryErrors {
type Future = future::Ready<Self>;
fn retry(&self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
type Future = future::Ready<()>;
fn retry(&mut self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
if result.is_err() {
Some(future::ready(RetryErrors))
Some(future::ready(()))
} else {
None
}
}

fn clone_request(&self, req: &Req) -> Option<Req> {
fn clone_request(&mut self, req: &Req) -> Option<Req> {
Some(*req)
}
}
Expand All @@ -140,16 +140,17 @@ impl Policy<Req, Res, Error> for RetryErrors {
struct Limit(usize);

impl Policy<Req, Res, Error> for Limit {
type Future = future::Ready<Self>;
fn retry(&self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
type Future = future::Ready<()>;
fn retry(&mut self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
if result.is_err() && self.0 > 0 {
Some(future::ready(Limit(self.0 - 1)))
self.0 -= 1;
Some(future::ready(()))
} else {
None
}
}

fn clone_request(&self, req: &Req) -> Option<Req> {
fn clone_request(&mut self, req: &Req) -> Option<Req> {
Some(*req)
}
}
Expand All @@ -158,18 +159,18 @@ impl Policy<Req, Res, Error> for Limit {
struct UnlessErr(InnerError);

impl Policy<Req, Res, Error> for UnlessErr {
type Future = future::Ready<Self>;
fn retry(&self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
type Future = future::Ready<()>;
fn retry(&mut self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
result.as_ref().err().and_then(|err| {
if err.to_string() != self.0 {
Some(future::ready(self.clone()))
Some(future::ready(()))
} else {
None
}
})
}

fn clone_request(&self, req: &Req) -> Option<Req> {
fn clone_request(&mut self, req: &Req) -> Option<Req> {
Some(*req)
}
}
Expand All @@ -178,12 +179,12 @@ impl Policy<Req, Res, Error> for UnlessErr {
struct CannotClone;

impl Policy<Req, Res, Error> for CannotClone {
type Future = future::Ready<Self>;
fn retry(&self, _: &mut Req, _: &mut Result<Res, Error>) -> Option<Self::Future> {
type Future = future::Ready<()>;
fn retry(&mut self, _: &mut Req, _: &mut Result<Res, Error>) -> Option<Self::Future> {
unreachable!("retry cannot be called since request isn't cloned");
}

fn clone_request(&self, _req: &Req) -> Option<Req> {
fn clone_request(&mut self, _req: &Req) -> Option<Req> {
None
}
}
Expand All @@ -199,21 +200,20 @@ impl Policy<Req, Res, Error> for MutatingPolicy
where
Error: From<&'static str>,
{
type Future = future::Ready<Self>;
type Future = future::Ready<()>;

fn retry(&self, req: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
fn retry(&mut self, req: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
if self.remaining == 0 {
*result = Err("out of retries".into());
None
} else {
*req = "retrying";
Some(future::ready(MutatingPolicy {
remaining: self.remaining - 1,
}))
self.remaining -= 1;
Some(future::ready(()))
}
}

fn clone_request(&self, req: &Req) -> Option<Req> {
fn clone_request(&mut self, req: &Req) -> Option<Req> {
Some(*req)
}
}
Expand Down