Skip to content

Commit

Permalink
Merge pull request #1 from esl/support_other_units
Browse files Browse the repository at this point in the history
Support other units
  • Loading branch information
chrzaszcz committed Jan 3, 2024
2 parents 5f28665 + 251fbf4 commit 68a85b9
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 68 deletions.
140 changes: 101 additions & 39 deletions src/opuntia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
%%
%% This module implements the token-bucket traffic-shaping algorithm.
%%
%% The rate is given in tokens per millisecond and a bucket size.
%% The rate is given in `t:tokens()' per `t:time_unit()' and a bucket size.
%% Resolution is in native unit times as described by `erlang:monotonic_time/0'.
%%
%% The delay is always returned in milliseconds unit,
Expand All @@ -13,9 +13,10 @@
-export([new/1, update/2]).

-ifdef(TEST).
-export([create/2, calculate/3]).
-export([create/2, calculate/3, convert_time_unit/3]).
-else.
-compile({inline, [create/2, calculate/3, convert_native_to_ms/1]}).
-compile({inline, [create/2, calculate/3, convert_time_unit/3, timediff_in_units/3,
unbounded_available_tokens/3, exactly_available_tokens/2, final_state/4]}).
-endif.

-include("opuntia.hrl").
Expand All @@ -24,7 +25,10 @@
%% Number of milliseconds that is advise to wait after a shaping update.

-type rate() :: non_neg_integer().
%% Number of tokens accepted per millisecond.
%% Number of tokens accepted per `t:time_unit()'.

-type time_unit() :: second | millisecond | microsecond | nanosecond | native.
%% Supported shaping time units.

-type bucket_size() :: non_neg_integer().
%% Maximum capacity of the bucket regardless of how much time passes.
Expand All @@ -34,34 +38,41 @@

-type config() :: 0 | #{bucket_size := bucket_size(),
rate := rate(),
time_unit := time_unit(),
start_full := boolean()}.
-type shape() :: {bucket_size(), rate()}.
-type shape() :: {bucket_size(), rate(), time_unit()}.
%% See `new/1' for more details.

-type shaper() :: none | #token_bucket_shaper{}.
%% Shaper type

-export_type([shaper/0, shape/0, tokens/0, bucket_size/0, rate/0, delay/0]).
-export_type([shaper/0, shape/0, tokens/0, bucket_size/0, rate/0, time_unit/0, delay/0]).

-define(NON_NEG_INT(N), (is_integer(N) andalso N > 0)).
-define(POS_INTEGER(N), (is_integer(N) andalso N > 0)).
-define(TU(T), (second =:= T orelse
millisecond =:= T orelse
microsecond =:= T orelse
nanosecond =:= T orelse
native =:= T)).

%% @doc Creates a new shaper according to the configuration.
%%
%% If zero is given, no shaper in created and any update action will always return zero delay;
%% to configure a shaper it will need a
%% to configure a shaper it will need a config like
%% ```
%% #{bucket_size => MaximumTokens, rate => Rate, start_full => Boolean},
%% #{bucket_size => MaximumTokens, rate => Rate, time_unit => TimeUnit, start_full => Boolean}
%% '''
%% where
%% <ul>
%% <li>`Rate' is the number of tokens per millisecond the bucket will grow with.</li>
%% <li>`TimeUnit' is the time unit of measurement as defined by `t:time_unit()'</li>
%% <li>`Rate' is the number of tokens per `TimeUnit' the bucket will grow with.</li>
%% <li>`MaximumTokens' is the maximum number of tokens the bucket can grow.</li>
%% <li>`StartFull' indicates if the shaper starts with the bucket full, or empty if not.</li>
%% </ul>
%%
%% So, for example, if we configure a shaper with the following:
%% ```
%% #{bucket_size => 60000, rate => 10, start_full => true}
%% #{bucket_size => 60000, rate => 10, time_unit => millisecond, start_full => true}
%% '''
%% it means that the bucket will
%% allow `10' tokens per `millisecond', up to 60000 tokens, regardless of how long it is left
Expand Down Expand Up @@ -89,17 +100,19 @@ create(0, _) ->
none;
create(#{bucket_size := MaximumTokens,
rate := Rate,
time_unit := TimeUnit,
start_full := StartFull},
NativeNow)
when ?NON_NEG_INT(MaximumTokens),
?NON_NEG_INT(Rate),
when ?POS_INTEGER(MaximumTokens),
?POS_INTEGER(Rate),
MaximumTokens >= Rate,
?TU(TimeUnit),
is_boolean(StartFull) ->
AvailableAtStart = case StartFull of
true -> MaximumTokens;
false -> 0
end,
#token_bucket_shaper{shape = {MaximumTokens, Rate},
#token_bucket_shaper{shape = {MaximumTokens, Rate, TimeUnit},
available_tokens = AvailableAtStart,
last_update = NativeNow,
debt = 0.0}.
Expand All @@ -109,21 +122,16 @@ calculate(none, _, _) ->
{none, 0};
calculate(Shaper, 0, _) ->
{Shaper, 0};
calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate},
calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate, TimeUnit},
available_tokens = LastAvailableTokens,
last_update = NativeLastUpdate,
debt = LastDebt} = Shaper, TokensNowUsed, NativeNow) ->
NativeTimeSinceLastUpdate = NativeNow - NativeLastUpdate,
debt = OverPenalisedInUnitsLastTime} = Shaper, TokensNowUsed, NativeNow) ->

%% This is now a float and so will all below be, to preserve best rounding errors possible
TimeSinceLastUpdate = convert_native_to_ms(NativeTimeSinceLastUpdate) + LastDebt,
TimeDiffInUnits = timediff_in_units(TimeUnit, NativeLastUpdate, NativeNow),

%% How much we might have recovered since last time
AvailableAtGrowthRate = Rate * TimeSinceLastUpdate,
UnboundedTokenGrowth = LastAvailableTokens + AvailableAtGrowthRate,
UnboundedTokens = unbounded_available_tokens(Rate, LastAvailableTokens, TimeDiffInUnits),

%% Real recovery cannot grow higher than the actual rate in the window frame
ExactlyAvailableNow = min(MaximumTokens, UnboundedTokenGrowth),
ExactlyAvailableNow = exactly_available_tokens(MaximumTokens, UnboundedTokens),

%% How many are available after using TokensNowUsed can't be smaller than zero
TokensAvailable = max(0, ExactlyAvailableNow - TokensNowUsed),
Expand All @@ -132,27 +140,81 @@ calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate},
TokensOverused = max(0, TokensNowUsed - ExactlyAvailableNow),

%% And then MaybeDelay will be zero if TokensOverused was zero
MaybeDelayMs = TokensOverused / Rate,
OverUsedRateNow = TokensOverused / Rate,

NewShaper = Shaper#token_bucket_shaper{available_tokens = TokensAvailable},
Punish = OverUsedRateNow - OverPenalisedInUnitsLastTime,
final_state(NewShaper, TimeUnit, Punish, NativeNow).

-spec timediff_in_units(time_unit(), integer(), integer()) -> float().
timediff_in_units(TimeUnit, NativeLastUpdate, NativeNow) ->
%% Time difference between now and the last update, in native
NativeTimeSinceLastUpdate = NativeNow - NativeLastUpdate,
%% This is now a float and so will all below be, to preserve best rounding errors possible
convert_time_unit(NativeTimeSinceLastUpdate, native, TimeUnit).

%% We penalise rounding up, the most important contract is that rate will never exceed that
%% requested, but the same way timeouts in Erlang promise not to arrive any time earlier but
%% don't promise at what time in the future they would arrive, nor we promise any upper bound
%% to the limits of the shaper delay.
RoundedDelayMs = ceil(MaybeDelayMs),
%% Unbounded growth is calculated, with float precision, in the configured time units
%%
%% Note that it can be negative, if earlier we have penalised giving a larger `last_update' and now we
%% update even before we have reach the point in time where the previous `last_update' was set
%%
%% If the growth was negative, that means that it has grown the debt instead
-spec unbounded_available_tokens(rate(), tokens(), float()) -> float().
unbounded_available_tokens(Rate, LastAvailableTokens, TimeDiffInUnits) ->
%% How much we might have recovered since last time
AvailableAtGrowthRate = Rate * TimeDiffInUnits,
%% Unbounded growth at rate since the last update
LastAvailableTokens + AvailableAtGrowthRate.

NewShaper = Shaper#token_bucket_shaper{available_tokens = TokensAvailable,
last_update = NativeNow + RoundedDelayMs,
debt = RoundedDelayMs - MaybeDelayMs},
{NewShaper, RoundedDelayMs}.
%% This is the real growth considering the maximum bucket size.
-spec exactly_available_tokens(bucket_size(), float()) -> float().
exactly_available_tokens(MaximumTokens, UnboundedTokens) ->
%% Real recovery cannot grow higher than the actual rate in the window frame
ExactlyAvailableNow0 = min(MaximumTokens, UnboundedTokens),
%% But it can't be negative either which can happen if we were already in debt,
%% but this is a debt we will pay when we calculate the final punishment in final_state
max(+0.0, ExactlyAvailableNow0).

%% We penalise rounding up, the most important contract is that rate will never exceed that
%% requested, but the same way timeouts in Erlang promise not to arrive any time earlier but
%% don't promise at what time in the future they would arrive, nor we promise any upper bound
%% to the limits of the shaper delay.
%%
%% Two cases, either:
%% Punish is positive: even after paying the old debt you incur a new debt again
%% Punish is negative: I overpenalised you last time, you get off now but with a future bill
final_state(Shaper, TimeUnit, Punish, NativeNow) when Punish >= +0.0 ->
DelayMs = convert_time_unit(Punish, TimeUnit, millisecond),
RoundedDelayMs = ceil(DelayMs),
Debt = RoundedDelayMs - DelayMs,
DebtInUnits = convert_time_unit(Debt, millisecond, TimeUnit),
DelayNative = convert_time_unit(RoundedDelayMs, TimeUnit, native),
RoundedDelayNative = ceil(DelayNative),
NewShaper = Shaper#token_bucket_shaper{last_update = NativeNow + RoundedDelayNative,
debt = DebtInUnits},
{NewShaper, RoundedDelayMs};
final_state(Shaper, TimeUnit, Punish, NativeNow) when Punish < +0.0 ->
DebtInUnits = convert_time_unit(-Punish, millisecond, TimeUnit),
NewShaper = Shaper#token_bucket_shaper{last_update = NativeNow,
debt = DebtInUnits},
{NewShaper, 0}.

%% Avoid rounding errors by using floats and float division,
%% erlang:convert_native_to_ms works only with integers
-spec convert_native_to_ms(number()) -> float().
convert_native_to_ms(Time) ->
time_unit_multiplier(millisecond) * Time / time_unit_multiplier(native).
%% erlang:convert_time_unit works only with integers
-spec convert_time_unit(number(), time_unit(), time_unit()) -> float().
convert_time_unit(Time, SameUnit, SameUnit) -> Time;
convert_time_unit(Time, FromUnit, ToUnit) ->
time_unit_multiplier(ToUnit) * Time / time_unit_multiplier(FromUnit).

-compile({inline, [time_unit_multiplier/1]}).
-spec time_unit_multiplier(time_unit()) -> pos_integer().
time_unit_multiplier(native) ->
erts_internal:time_unit();
time_unit_multiplier(nanosecond) ->
1000*1000*1000;
time_unit_multiplier(microsecond) ->
1000*1000;
time_unit_multiplier(millisecond) ->
1000.
1000;
time_unit_multiplier(second) ->
1.
1 change: 0 additions & 1 deletion src/opuntia.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
available_tokens :: opuntia:tokens(),
last_update :: integer(),
debt :: float() %% Always in the range [0.0, 1.0]
%% Signifies the unnecesary number of milliseconds of penalisation
}).

-endif.
Loading

0 comments on commit 68a85b9

Please sign in to comment.