Skip to content

Commit

Permalink
Stateful property tests for the server
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Dec 7, 2023
1 parent 7913801 commit e49a766
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 29 deletions.
23 changes: 8 additions & 15 deletions src/opuntia_srv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
-export([start_link/2, wait/4, reset_shapers/1]).

%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).

%% Record definitions
-record(opuntia_state, {
name :: name(),
max_delay :: opuntia:rate(), %% Maximum amount of time units to wait
max_delay :: opuntia:delay(), %% Maximum amount of time units to wait
gc_ttl :: non_neg_integer(), %% How many seconds to store each shaper
gc_time :: non_neg_integer(), %% How often to run the gc
gc_ref :: undefined | reference(),
Expand All @@ -26,7 +26,7 @@
-type name() :: atom().
-type key() :: term().
-type seconds() :: non_neg_integer().
-type args() :: #{max_delay => opuntia:rate(),
-type args() :: #{max_delay => opuntia:delay(),
gc_interval => seconds(),
ttl => seconds()}.
-type maybe_rate() :: fun(() -> opuntia:rate() | opuntia:rate()).
Expand All @@ -40,11 +40,11 @@ start_link(Name, Args) ->
-spec wait(gen_server:server_ref(), key(), opuntia:tokens(), maybe_rate()) ->
continue | {error, max_delay_reached}.
wait(Shaper, Key, Tokens, Config) ->
gen_server:call(Shaper, {wait, Key, Tokens, Config}).
gen_server:call(Shaper, {wait, Key, Tokens, Config}, infinity).

%% @doc Ask server to forget its shapers
reset_shapers(ProcName) ->
gen_server:call(ProcName, reset_shapers).
gen_server:call(ProcName, reset_shapers, infinity).

%% gen_server Function Definitions
-spec init({name(), args()}) -> {ok, opuntia_state()}.
Expand Down Expand Up @@ -95,12 +95,6 @@ handle_info(Info, #opuntia_state{name = Name} = State) ->
telemetry:execute([opuntia, unknown_request, Name], #{value => 1}, #{msg => Info, type => info}),
{noreply, State}.

terminate(_Reason, _State) ->
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
Expand All @@ -122,10 +116,9 @@ save_shaper(#opuntia_state{shapers = Shapers} = State, Key, Shaper) ->
cleanup(State = #opuntia_state{name = Name, shapers = Shapers, gc_ttl = TTL}) ->
telemetry:execute([opuntia, cleanup, Name], #{}, #{}),
TimestampThreshold = erlang:system_time(second) - TTL,
F = fun(_, #token_bucket{last_update = ATime}) ->
Min = erlang:convert_time_unit(TimestampThreshold, second, millisecond),
ATime > Min
end,
Min = erlang:convert_time_unit(TimestampThreshold, second, millisecond),
F = fun(_, #token_bucket{last_update = ATime}) -> ATime > Min;
(_, none) -> false end,
RemainingShapers = maps:filter(F, Shapers),
State#opuntia_state{shapers = RemainingShapers}.

Expand Down
117 changes: 103 additions & 14 deletions test/opuntia_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
init_per_testcase/2,
end_per_testcase/2]).

-include_lib("stdlib/include/assert.hrl").
-include_lib("proper/include/proper.hrl").
-include_lib("common_test/include/ct.hrl").

all() ->
[
Expand All @@ -25,13 +25,15 @@ groups() ->
{throughput_throttle, [parallel],
[
run_shaper_with_zero_does_not_shape,
run_basic_shaper_property
run_basic_shaper_property,
run_stateful_server
]}
].

%%%===================================================================
%%% Overall setup/teardown
%%%===================================================================

init_per_suite(Config) ->
ct:pal("Online schedulers ~p~n", [erlang:system_info(schedulers_online)]),
application:ensure_all_started(telemetry),
Expand All @@ -46,20 +48,30 @@ init_per_group(_, Config) ->
end_per_group(_Groupname, _Config) ->
ok.

init_per_testcase(run_stateful_server, Config) ->
[{{pid, run_stateful_server}, spawn(fun keep_table/0)} | Config];
init_per_testcase(_TestCase, Config) ->
Config.

end_per_testcase(run_stateful_server, Config) ->
Pid = ?config({pid, run_stateful_server}, Config),
Pid ! clean_table;
end_per_testcase(_TestCase, _Config) ->
ok.

keep_table() ->
ets:new(?MODULE, [named_table, set, public,
{write_concurrency, true}, {read_concurrency, true}]),
receive Msg -> Msg end.

%%%===================================================================
%%% Individual Test Cases (from groups() definition)
%%%===================================================================

run_shaper_with_zero_does_not_shape(_) ->
Prop = ?FORALL(
TokensToSpend,
integer(1, 9999),
tokens(),
begin
Shaper = opuntia:new(0),
{TimeUs, _LastShaper} = timer:tc(fun run_shaper/2, [Shaper, TokensToSpend]),
Expand All @@ -71,7 +83,7 @@ run_shaper_with_zero_does_not_shape(_) ->
run_basic_shaper_property(_) ->
Prop = ?FORALL(
{TokensToSpend, RatePerMs},
{integer(1, 9999), integer(1, 9999)},
{tokens(), rate()},
begin
Shaper = opuntia:new(RatePerMs),
{TimeUs, _LastShaper} = timer:tc(fun run_shaper/2, [Shaper, TokensToSpend]),
Expand All @@ -82,8 +94,94 @@ run_basic_shaper_property(_) ->
[TokensToSpend, RatePerMs, TimeMs, MinimumExpected, Val]),
Val
end),
run_prop(?FUNCTION_NAME, Prop).
run_prop(?FUNCTION_NAME, Prop, 100_000, 256).

%%%===================================================================
%% Server stateful property
%%%===================================================================

run_stateful_server(_) ->
Prop =
?FORALL(Cmds, commands(?MODULE),
begin
Config = #{max_delay => 99999, gc_interval => 1},
{ok, Pid} = opuntia_srv:start_link(?MODULE, Config),
{History, State, Res} = run_commands(?MODULE, Cmds, [{server, Pid}]),
?WHENFAIL(io:format("H: ~p~nS: ~p~n Res: ~p~n", [History, State, Res]), Res == ok)
end),
run_prop(?FUNCTION_NAME, Prop, 10_000, 256).

command(_State) ->
oneof([
{call, ?MODULE, wait, [{var, server}, key(), tokens(), config()]},
{call, ?MODULE, reset_shapers, [{var, server}]}
]).


initial_state() ->
#{}.

precondition(_State, {call, ?MODULE, reset_shapers, [_Server]}) ->
true;
precondition(State, {call, ?MODULE, wait, [Server, Key, _Tokens, Config]}) ->
case maps:is_key(Key, State) of
true -> %% We already know when this one started
true;
false -> %% Track start for this key
Now = erlang:monotonic_time(millisecond),
Rate = get_rate_from_config(Config),
ets:insert(?MODULE, {{Server, Key}, Rate, Now}),
true
end.

postcondition(_State, {call, ?MODULE, reset_shapers, [_Server]}, Res) ->
ok =:= Res;
postcondition(State, {call, ?MODULE, wait, [Server, Key, Tokens, _Config]}, Res) ->
Now = erlang:monotonic_time(millisecond),
[{_, Rate, Start}] = ets:lookup(?MODULE, {Server, Key}),
TokensNowConsumed = tokens_now_consumed(State, Key, Tokens),
MinimumExpected = calculate_accepted_range(TokensNowConsumed, Rate),
Duration = Now - Start,
continue =:= Res andalso MinimumExpected =< Duration.

next_state(_State, _Result, {call, ?MODULE, reset_shapers, [_Server]}) ->
#{};
next_state(State, _Result, {call, ?MODULE, wait, [_Server, Key, Tokens, _Config]}) ->
TokensNowConsumed = tokens_now_consumed(State, Key, Tokens),
State#{Key => TokensNowConsumed}.

tokens_now_consumed(State, Key, NewTokens) ->
TokensConsumedSoFar = maps:get(Key, State, 0),
TokensConsumedSoFar + NewTokens.

wait(Shaper, Key, Tokens, Config) ->
opuntia_srv:wait(Shaper, Key, Tokens, Config).

reset_shapers(Shaper) ->
opuntia_srv:reset_shapers(Shaper).

get_rate_from_config(N) when is_integer(N), N >= 0 ->
N;
get_rate_from_config(Config) when is_function(Config, 0) ->
Config().

key() ->
binary().

tokens() ->
integer(1, 9999).

config() ->
union([0, rate(), function(0, rate())]).

rate() ->
integer(1, 9999).

%%%===================================================================
%% Helpers
%%%===================================================================
calculate_accepted_range(_, 0) ->
0;
calculate_accepted_range(TokensToSpend, RatePerMs) when TokensToSpend =< RatePerMs ->
0;
calculate_accepted_range(TokensToSpend, RatePerMs) ->
Expand All @@ -98,15 +196,6 @@ run_shaper(Shaper, TokensLeft) ->
timer:sleep(DelayMs),
run_shaper(NewShaper, TokensLeft - TokensConsumed).




run_prop(PropName, Property) ->
run_prop(PropName, Property, 100_000).

run_prop(PropName, Property, NumTests) ->
run_prop(PropName, Property, NumTests, 256).

run_prop(PropName, Property, NumTests, WorkersPerScheduler) ->
Opts = [quiet, noshrink, long_result, {start_size, 2}, {numtests, NumTests},
{numworkers, WorkersPerScheduler * erlang:system_info(schedulers_online)}],
Expand Down

0 comments on commit e49a766

Please sign in to comment.