Skip to content

Commit

Permalink
Merge pull request #2 from NelsonVides/improvements
Browse files Browse the repository at this point in the history
Improvements
  • Loading branch information
NelsonVides committed Dec 19, 2023
2 parents bc99b54 + a16788b commit 532155b
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 117 deletions.
163 changes: 134 additions & 29 deletions src/opuntia.erl
Original file line number Diff line number Diff line change
@@ -1,53 +1,158 @@
%% @doc `opuntia', traffic shapers for Erlang and Elixir
%%
%% This module implements the token bucket traffic shaping algorithm.
%% The time unit of measurement is millisecond, as this is the unit receives and timers use in the BEAM.
%% This module implements the token-bucket traffic-shaping algorithm.
%%
%% The rate is given in tokens per millisecond and a bucket size.
%% Resolution is in native unit times as described by `erlang:monotonic_time/0'.
%%
%% The delay is always returned in milliseconds unit,
%% as this is the unit receives and timers use in the BEAM.
%% @end
-module(opuntia).

-export([new/1, update/2]).

-ifdef(TEST).
-export([create/2, calculate/3]).
-else.
-compile({inline, [create/2, calculate/3, convert_native_to_ms/1]}).
-endif.

-include("opuntia.hrl").

-type timestamp() :: number().
-type tokens() :: non_neg_integer().
-type delay() :: non_neg_integer().
%% Number of milliseconds that is advise to wait after a shaping update.

-type rate() :: non_neg_integer().
-type shaper() :: #token_bucket{} | none.
%% Number of tokens accepted per millisecond.

-type bucket_size() :: non_neg_integer().
%% Maximum capacity of the bucket regardless of how much time passes.

-type tokens() :: non_neg_integer().
%% Unit element the shaper consumes, for example bytes or requests.

-type config() :: 0 | #{bucket_size := bucket_size(),
rate := rate(),
start_full := boolean()}.
-type shape() :: {bucket_size(), rate()}.
%% See `new/1' for more details.

-type shaper() :: none | #token_bucket_shaper{}.
%% Shaper type

-export_type([shaper/0, shape/0, tokens/0, bucket_size/0, rate/0, delay/0]).

-export_type([tokens/0, timestamp/0, delay/0, rate/0, shaper/0]).
-define(NON_NEG_INT(N), (is_integer(N) andalso N > 0)).

-spec new(rate()) -> shaper().
%% @doc Creates a new shaper according to the configuration.
%%
%% If zero is given, no shaper in created and any update action will always return zero delay;
%% to configure a shaper it will need a
%% ```
%% #{bucket_size => MaximumTokens, rate => Rate, start_full => Boolean},
%% '''
%% where
%% <ul>
%% <li>`Rate' is the number of tokens per millisecond the bucket will grow with.</li>
%% <li>`MaximumTokens' is the maximum number of tokens the bucket can grow.</li>
%% <li>`StartFull' indicates if the shaper starts with the bucket full, or empty if not.</li>
%% </ul>
%%
%% So, for example, if we configure a shaper with the following:
%% ```
%% #{bucket_size => 60000, rate => 10, start_full => true}
%% '''
%% it means that the bucket will
%% allow `10' tokens per `millisecond', up to 60000 tokens, regardless of how long it is left
%% unused to charge: it will never charge further than 60000 tokens.
-spec new(config()) -> shaper().
new(0) ->
none;
new(MaxRatePerMs) ->
#token_bucket{rate = MaxRatePerMs,
available_tokens = MaxRatePerMs,
last_update = erlang:monotonic_time(millisecond)}.
new(Shape) ->
create(Shape, erlang:monotonic_time()).

%% @doc Update shaper and return possible waiting time.
%%
%% This function takes the current shaper state, and the number of tokens that have been consumed,
%% and returns a tuple containing the new shaper state, and a possibly non-zero number of
%% unit times to wait if more tokens that the shaper allows were consumed.
-spec update(shaper(), tokens()) -> {shaper(), rate()}.
update(none, _Size) ->
-spec update(shaper(), tokens()) -> {shaper(), delay()}.
update(none, _TokensNowUsed) ->
{none, 0};
update(#token_bucket{rate = MaxRatePerMs,
available_tokens = LastAvailableTokens,
last_update = LastUpdate} = Shaper, TokensNowUsed) ->
Now = erlang:monotonic_time(millisecond),
% How much we might have recovered since last time
TimeSinceLastUpdate = Now - LastUpdate,
PossibleTokenGrowth = round(MaxRatePerMs * TimeSinceLastUpdate),
% Available plus recovered cannot grow higher than the actual rate limit
ExactlyAvailableNow = min(MaxRatePerMs, LastAvailableTokens + PossibleTokenGrowth),
% Now check how many tokens are available by substracting how many where used,
% and how many where overused
update(Shaper, TokensNowUsed) ->
calculate(Shaper, TokensNowUsed, erlang:monotonic_time()).

%% Helpers
-spec create(config(), integer()) -> shaper().
create(0, _) ->
none;
create(#{bucket_size := MaximumTokens,
rate := Rate,
start_full := StartFull},
NativeNow)
when ?NON_NEG_INT(MaximumTokens),
?NON_NEG_INT(Rate),
MaximumTokens >= Rate,
is_boolean(StartFull) ->
AvailableAtStart = case StartFull of
true -> MaximumTokens;
false -> 0
end,
#token_bucket_shaper{shape = {MaximumTokens, Rate},
available_tokens = AvailableAtStart,
last_update = NativeNow,
debt = 0.0}.

-spec calculate(shaper(), tokens(), integer()) -> {shaper(), delay()}.
calculate(none, _, _) ->
{none, 0};
calculate(Shaper, 0, _) ->
{Shaper, 0};
calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate},
available_tokens = LastAvailableTokens,
last_update = NativeLastUpdate,
debt = LastDebt} = Shaper, TokensNowUsed, NativeNow) ->
NativeTimeSinceLastUpdate = NativeNow - NativeLastUpdate,

%% This is now a float and so will all below be, to preserve best rounding errors possible
TimeSinceLastUpdate = convert_native_to_ms(NativeTimeSinceLastUpdate) + LastDebt,

%% How much we might have recovered since last time
AvailableAtGrowthRate = Rate * TimeSinceLastUpdate,
UnboundedTokenGrowth = LastAvailableTokens + AvailableAtGrowthRate,

%% Real recovery cannot grow higher than the actual rate in the window frame
ExactlyAvailableNow = min(MaximumTokens, UnboundedTokenGrowth),

%% How many are available after using TokensNowUsed can't be smaller than zero
TokensAvailable = max(0, ExactlyAvailableNow - TokensNowUsed),

%% How many tokens I overused might be zero if I didn't overused any
TokensOverused = max(0, TokensNowUsed - ExactlyAvailableNow),
MaybeDelay = TokensOverused / MaxRatePerMs,
RoundedDelay = floor(MaybeDelay) + 1,
NewShaper = Shaper#token_bucket{available_tokens = TokensAvailable,
last_update = Now + MaybeDelay},
{NewShaper, RoundedDelay}.

%% And then MaybeDelay will be zero if TokensOverused was zero
MaybeDelayMs = TokensOverused / Rate,

%% We penalise rounding up, the most important contract is that rate will never exceed that
%% requested, but the same way timeouts in Erlang promise not to arrive any time earlier but
%% don't promise at what time in the future they would arrive, nor we promise any upper bound
%% to the limits of the shaper delay.
RoundedDelayMs = ceil(MaybeDelayMs),

NewShaper = Shaper#token_bucket_shaper{available_tokens = TokensAvailable,
last_update = NativeNow + RoundedDelayMs,
debt = RoundedDelayMs - MaybeDelayMs},
{NewShaper, RoundedDelayMs}.

%% Avoid rounding errors by using floats and float division,
%% erlang:convert_native_to_ms works only with integers
-spec convert_native_to_ms(number()) -> float().
convert_native_to_ms(Time) ->
time_unit_multiplier(millisecond) * Time / time_unit_multiplier(native).

-compile({inline, [time_unit_multiplier/1]}).
time_unit_multiplier(native) ->
erts_internal:time_unit();
time_unit_multiplier(millisecond) ->
1000.
8 changes: 5 additions & 3 deletions src/opuntia.hrl
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
-ifndef(OPUNTIA).
-define(OPUNTIA, true).

-record(token_bucket, {
rate :: opuntia:rate(),
-record(token_bucket_shaper, {
shape :: opuntia:shape(),
available_tokens :: opuntia:tokens(),
last_update :: opuntia:timestamp()
last_update :: integer(),
debt :: float() %% Always in the range [0.0, 1.0]
%% Signifies the unnecesary number of milliseconds of penalisation
}).

-endif.
55 changes: 31 additions & 24 deletions src/opuntia_srv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,43 @@
-record(opuntia_state, {
name :: name(),
max_delay :: opuntia:delay(), %% Maximum amount of time units to wait
gc_ttl :: non_neg_integer(), %% How many seconds to store each shaper
gc_time :: non_neg_integer(), %% How often to run the gc
gc_ref :: undefined | reference(),
cleanup_ttl :: non_neg_integer(), %% How many seconds to store each shaper
cleanup_time :: non_neg_integer(), %% How often to run the gc
cleanup_ref :: undefined | reference(),
shapers = #{} :: #{key() := opuntia:shaper()}
}).
-type opuntia_state() :: #opuntia_state{}. %% @private
-type name() :: atom().
-type key() :: term().
-type seconds() :: non_neg_integer().
-type args() :: #{max_delay => opuntia:delay(),
gc_interval => seconds(),
cleanup_interval => seconds(),
ttl => seconds()}.
-type maybe_rate() :: fun(() -> opuntia:rate()) | opuntia:rate().

%% @doc starts a shaper server
-type shape() :: 0 | #{bucket_size := opuntia:bucket_size(), rate := opuntia:rate(),
time_unit := millisecond, start_full := true}.
-type gen_shape() :: fun(() -> shape()) | shape().
%% This accepts a function that generates the shape, if such shape was too expensive to calculate.
%% Note that for this server, only full buckets and in milliseconds are valid, due to the nature of
%% gen_server call timeouts.

%% @doc Start-links a shaper server
-spec start_link(name(), args()) -> ignore | {error, _} | {ok, pid()}.
start_link(Name, Args) ->
gen_server:start_link(?MODULE, {Name, Args}, []).

%% @doc Shapes the caller from executing the action
%%
%% This will do an actual blocking `gen_server:call/3'
-spec wait(gen_server:server_ref(), key(), opuntia:tokens(), maybe_rate()) ->
%% This will do an actual blocking `gen_server:call/3'.
-spec wait(gen_server:server_ref(), key(), opuntia:tokens(), gen_shape()) ->
continue | {error, max_delay_reached}.
wait(Shaper, Key, Tokens, Config) ->
gen_server:call(Shaper, {wait, Key, Tokens, Config}, infinity).

%% @doc Shapes the caller from executing the action, asynchronously
%%
%% This will do a `gen_server:send_request/2'. Usual pattern applies to receive the matching continue.
-spec request_wait(gen_server:server_ref(), key(), opuntia:tokens(), maybe_rate()) ->
%% This will do a `gen_server:send_request/2'.
%% Usual pattern applies to receive the matching continue.
-spec request_wait(gen_server:server_ref(), key(), opuntia:tokens(), gen_shape()) ->
gen_server:request_id().
request_wait(Shaper, Key, Tokens, Config) ->
gen_server:send_request(Shaper, {wait, Key, Tokens, Config}).
Expand All @@ -59,9 +65,10 @@ reset_shapers(ProcName) ->
-spec init({name(), args()}) -> {ok, opuntia_state()}.
init({Name, Args}) ->
MaxDelay = maps:get(max_delay, Args, 3000),
GCInt = timer:seconds(maps:get(gc_interval, Args, 30)),
GCInt = timer:seconds(maps:get(cleanup_interval, Args, 30)),
GCTTL = maps:get(ttl, Args, 120),
State = #opuntia_state{name = Name, max_delay = MaxDelay, gc_ttl = GCTTL, gc_time = GCInt},
State = #opuntia_state{name = Name, max_delay = MaxDelay,
cleanup_ttl = GCTTL, cleanup_time = GCInt},
{ok, schedule_cleanup(State)}.

%% @private
Expand Down Expand Up @@ -101,7 +108,7 @@ handle_cast(Msg, #opuntia_state{name = Name} = State) ->
{noreply, State}.

%% @private
handle_info({timeout, TRef, cleanup}, #opuntia_state{gc_ref = TRef} = State) ->
handle_info({timeout, TRef, cleanup}, #opuntia_state{cleanup_ref = TRef} = State) ->
{noreply, schedule_cleanup(cleanup(State))};
handle_info(Info, #opuntia_state{name = Name} = State) ->
telemetry:execute([opuntia, unknown_request, Name], #{value => 1}, #{msg => Info, type => info}),
Expand All @@ -117,32 +124,32 @@ find_or_create_shaper(#opuntia_state{shapers = Shapers}, Key, Config) ->
_ -> create_new_from_config(Config)
end.

create_new_from_config(N) when is_number(N), N >= 0 ->
opuntia:new(N);
create_new_from_config(Config) when is_function(Config, 0) ->
create_new_from_config(Config()).
create_new_from_config(Config());
create_new_from_config(N) ->
opuntia:new(N).

save_shaper(#opuntia_state{shapers = Shapers} = State, Key, Shaper) ->
State#opuntia_state{shapers = maps:put(Key, Shaper, Shapers)}.

cleanup(State = #opuntia_state{name = Name, shapers = Shapers, gc_ttl = TTL}) ->
cleanup(State = #opuntia_state{name = Name, shapers = Shapers, cleanup_ttl = TTL}) ->
telemetry:execute([opuntia, cleanup, Name], #{}, #{}),
TimestampThreshold = erlang:system_time(second) - TTL,
Min = erlang:convert_time_unit(TimestampThreshold, second, millisecond),
F = fun(_, #token_bucket{last_update = ATime}) -> ATime > Min;
F = fun(_, #token_bucket_shaper{last_update = ATime}) -> ATime > Min;
(_, none) -> false end,
RemainingShapers = maps:filter(F, Shapers),
State#opuntia_state{shapers = RemainingShapers}.

schedule_cleanup(#opuntia_state{gc_time = 0} = State) ->
schedule_cleanup(#opuntia_state{cleanup_time = 0} = State) ->
State;
schedule_cleanup(#opuntia_state{gc_time = GCInt} = State) ->
schedule_cleanup(#opuntia_state{cleanup_time = GCInt} = State) ->
TRef = erlang:start_timer(GCInt, self(), cleanup),
State#opuntia_state{gc_ref = TRef}.
State#opuntia_state{cleanup_ref = TRef}.

%% @doc It is a small hack
%% This function calls this in more efficient way:
%% This function calls this in a more efficient way:
%% timer:apply_after(DelayMs, gen_server, reply, [From, Reply]).
-spec reply_after(opuntia:rate(), {atom() | pid(), _}, continue) -> reference().
-spec reply_after(opuntia:delay(), {atom() | pid(), _}, continue) -> reference().
reply_after(DelayMs, {Pid, Tag}, Reply) ->
erlang:send_after(DelayMs, Pid, {Tag, Reply}).
Loading

0 comments on commit 532155b

Please sign in to comment.