Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry: Add Budget trait #703

Merged
merged 7 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix according review
  • Loading branch information
boraarslan committed Oct 23, 2022
commit 0918d77f5c9ead4cd6fa6e08062627ec9888a780
35 changes: 5 additions & 30 deletions tower/src/retry/budget/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@
//! use std::sync::Arc;
//!
//! use futures_util::future;
//! use tower::retry::{budget::{Budget, BudgetTrait}, Policy};
//! use tower::retry::{budget::{Budget, TpsBudget}, Policy};
//!
//! type Req = String;
//! type Res = String;
//!
//! #[derive(Clone, Debug)]
//! struct RetryPolicy {
//! budget: Arc<Budget>,
//! budget: Arc<TpsBudget>,
//! }
//!
//! impl<E> Policy<Req, Res, E> for RetryPolicy {
Expand Down Expand Up @@ -71,16 +71,14 @@
//! }
//! ```

#[allow(clippy::module_inception)]
pub mod budget;
pub mod tps_budget;

pub use budget::Budget;
pub use budget::TpsBucket;
pub use tps_budget::TpsBudget;

/// For more info about [`Budget`], please see the [module-level documentation].
///
/// [module-level documentation]: self
pub trait BudgetTrait {
pub trait Budget {
/// Store a "deposit" in the budget, which will be used to permit future
/// withdrawals.
fn deposit(&self);
Expand All @@ -91,26 +89,3 @@ pub trait BudgetTrait {
/// If there is not enough, false is returned.
fn withdraw(&self) -> bool;
}

/// Represents a token bucket.
///
/// A token bucket manages a reserve of tokens to decide if a retry for the request is
/// possible. Successful requests put tokens into the reserve. Before a request is retried,
/// bucket is checked to ensure there are sufficient amount of tokens available. If there are,
/// specified amount of tokens are withdrawn.
///
/// For more info about [`Budget`], please see the [module-level documentation].
///
/// [module-level documentation]: self
pub trait Bucket {
/// Deposit `amt` of tokens into the bucket.
fn put(&self, amt: isize);

/// Try to withdraw `amt` of tokens from bucket. If reserve do not have sufficient
/// amount of tokens false is returned. If withdraw is possible, decreases the reserve
/// and true is returned.
fn try_get(&self, amt: isize) -> bool;

/// Returns the amount of tokens in the reserve.
fn reserve(&self) -> isize;
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Budget implementations
//! Transactions Per Minute (Tps) Budget implementations

use std::{
fmt,
Expand All @@ -10,7 +10,7 @@ use std::{
};
use tokio::time::Instant;

use super::{Bucket, BudgetTrait};
use super::Budget;

/// Represents a "budget" for retrying requests.
///
Expand All @@ -20,25 +20,23 @@ use super::{Bucket, BudgetTrait};
/// For more info about [`Budget`], please see the [module-level documentation].
///
/// [module-level documentation]: super
pub struct Budget<B = TpsBucket> {
bucket: B,
pub struct TpsBudget {
bucket: TpsBucket,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove this extra struct and just merge everything into the TpsBucket and rename that TpsBudget. That should simplify some of this code and also make it less confusing since TpsBucket looks like TpsBudget lol

deposit_amount: isize,
withdraw_amount: isize,
}

/// A [`Bucket`] for managing retry tokens.
/// A Transactions Per Minute config for managing retry tokens.
///
/// [`Budget`] uses a token bucket to decide if the request should be retried.
/// [`TpsBudget`] uses a token bucket to decide if the request should be retried.
///
/// [`TpsBucket`] works by checking how much retries have been made in a certain period of time.
/// Minimum allowed number of retries are effectively reset on an interval. Allowed number of
/// retries depends on failed request count in recent time frame.
///
/// For more info about [`Budget`], please see the [module-level documentation].
/// For more info about [`Bucket`], see [bucket trait]
///
/// [module-level documentation]: super
/// [bucket trait]: self::Bucket
#[derive(Debug)]
pub struct TpsBucket {
generation: Mutex<Generation>,
Expand All @@ -61,10 +59,10 @@ struct Generation {
time: Instant,
}

// ===== impl Budget =====
// ===== impl TpsBudget =====

impl Budget {
/// Create a [`Budget`] that allows for a certain percent of the total
impl TpsBudget {
/// Create a [`TpsBudget`] that allows for a certain percent of the total
/// requests to be retried.
///
/// - The `ttl` is the duration of how long a single `deposit` should be
Expand All @@ -79,7 +77,7 @@ impl Budget {
/// As an example, if `0.1` is used, then for every 10 calls to `deposit`,
/// 1 retry will be allowed. If `2.0` is used, then every `deposit`
/// allows for 2 retries.
pub fn new_tps(ttl: Duration, min_per_sec: u32, retry_percent: f32) -> Budget<TpsBucket> {
pub fn new(ttl: Duration, min_per_sec: u32, retry_percent: f32) -> Self {
// assertions taken from finagle
assert!(ttl >= Duration::from_secs(1));
assert!(ttl <= Duration::from_secs(60));
Expand Down Expand Up @@ -110,7 +108,7 @@ impl Budget {
slots.push(AtomicIsize::new(0));
}

Budget {
TpsBudget {
bucket: TpsBucket {
generation: Mutex::new(Generation {
index: 0,
Expand All @@ -127,7 +125,7 @@ impl Budget {
}
}

impl<B: Bucket> BudgetTrait for Budget<B> {
impl Budget for TpsBudget {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to check we should remove the deposit/withdraw methods from the base impl TpsBudget and only have the trait impl ones. I can't see it in the diff but just want to check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I already removed them from the base impl.

fn deposit(&self) {
self.bucket.put(self.deposit_amount)
}
Expand All @@ -137,23 +135,23 @@ impl<B: Bucket> BudgetTrait for Budget<B> {
}
}

impl Default for Budget {
impl Default for TpsBudget {
fn default() -> Self {
Budget::new_tps(Duration::from_secs(10), 10, 0.2)
TpsBudget::new(Duration::from_secs(10), 10, 0.2)
}
}

impl fmt::Debug for Budget {
impl fmt::Debug for TpsBudget {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Budget")
.field("deposit", &self.deposit_amount)
.field("withdraw", &self.withdraw_amount)
.field("balance", &self.bucket.reserve())
.field("balance", &self.bucket.sum())
.finish()
}
}

// ===== impl Bucket =====
// ===== impl TpsBucket =====

impl TpsBucket {
fn expire(&self) {
Expand Down Expand Up @@ -194,9 +192,7 @@ impl TpsBucket {
.saturating_add(windowed_sum)
.saturating_add(self.reserve)
}
}

impl Bucket for TpsBucket {
fn put(&self, amt: isize) {
self.expire();
self.writer.fetch_add(amt, Ordering::SeqCst);
Expand All @@ -215,30 +211,26 @@ impl Bucket for TpsBucket {
false
}
}

fn reserve(&self) -> isize {
self.sum()
}
}

#[cfg(test)]
mod tests {
use crate::retry::budget::BudgetTrait;
use crate::retry::budget::Budget;

use super::*;
use tokio::time;

#[test]
fn tps_empty() {
let bgt = Budget::new_tps(Duration::from_secs(1), 0, 1.0);
let bgt = TpsBudget::new(Duration::from_secs(1), 0, 1.0);
assert!(!bgt.withdraw());
}

#[tokio::test]
async fn tps_leaky() {
time::pause();

let bgt = Budget::new_tps(Duration::from_secs(1), 0, 1.0);
let bgt = TpsBudget::new(Duration::from_secs(1), 0, 1.0);
bgt.deposit();

time::advance(Duration::from_secs(3)).await;
Expand All @@ -250,7 +242,7 @@ mod tests {
async fn tps_slots() {
time::pause();

let bgt = Budget::new_tps(Duration::from_secs(1), 0, 0.5);
let bgt = TpsBudget::new(Duration::from_secs(1), 0, 0.5);
bgt.deposit();
bgt.deposit();
time::advance(Duration::from_millis(901)).await;
Expand All @@ -273,7 +265,7 @@ mod tests {

#[tokio::test]
async fn tps_reserve() {
let bgt = Budget::new_tps(Duration::from_secs(1), 5, 1.0);
let bgt = TpsBudget::new(Duration::from_secs(1), 5, 1.0);
assert!(bgt.withdraw());
assert!(bgt.withdraw());
assert!(bgt.withdraw());
Expand Down