diff --git a/include/jobs.hrl b/include/jobs.hrl index 7cf45e4..47ec97e 100644 --- a/include/jobs.hrl +++ b/include/jobs.hrl @@ -102,7 +102,7 @@ -record(queue, {name :: any(), mod :: atom(), type = fifo :: fifo | #producer{} | #passive{} - | #action{}, + | #action{}, group :: atom(), regulators = [] :: [regulator() | regulator_ref()], max_time :: undefined | integer(), @@ -114,7 +114,7 @@ check_counter = 0 :: integer(), waiters = [] :: [{pid(), reference()}], st - }). + }). -record(sampler, {name, mod, diff --git a/rebar.config b/rebar.config index 5fe82b5..85fe2ab 100644 --- a/rebar.config +++ b/rebar.config @@ -1,7 +1,11 @@ %% -*- erlang -*- +{erl_first_files, + ["src/jobs_queue.erl", "src/jobs_sampler.erl"]}. {erl_opts, [debug_info]}. {xref_checks, [undefined_function_calls]}. +{require_otp_vsn, "R15|R16"}. + {cover_enabled, true}. {eunit_opts, [verbose]}. diff --git a/src/jobs_queue.erl b/src/jobs_queue.erl index e0b4ad2..dfcc387 100644 --- a/src/jobs_queue.erl +++ b/src/jobs_queue.erl @@ -36,35 +36,33 @@ delete/1]). -export([in/3, out/2, - peek/1, + peek/1, info/2, all/1, empty/1, is_empty/1, timedout/1, timedout/2]). --export([behaviour_info/1]). - -include("jobs.hrl"). --import(jobs_lib, [timestamp/0]). -record(st, {table}). %-type timestamp() :: integer(). -type job() :: {pid(), reference()}. -type entry() :: {timestamp(), job()}. +-type info_item() :: max_time | oldest_job | length. -behaviour_info(callbacks) -> - [{new , 2}, - {delete , 1}, - {in , 3}, - {peek , 1}, - {out , 2}, - {all , 1}, - {info , 2}]; -behaviour_info(_) -> - undefined. +-callback new(fifo | lifo, #queue{}) -> #queue{}. +-callback delete(#queue{}) -> any(). +-callback in(timestamp(), job(), #queue{}) -> #queue{}. +-callback peek(#queue{}) -> entry(). +-callback out(N :: integer(), #queue{}) -> {[entry()], #queue{}}. +-callback all(#queue{}) -> [entry()]. +-callback info(info_item(), #queue{}) -> any(). +-export_type([job/0, + entry/0, + info_item/0]). %% @spec new(Options, #queue{}) -> #queue{} %% @doc Instantiate a new queue. @@ -146,9 +144,6 @@ out(N,#queue{st = #st{table = T}}=Q) when N >= 0 -> all(#queue{st = #st{table = T}}) -> ets:select(T, [{{'$1'},[],['$1']}]). - --type info_item() :: max_time | oldest_job | length. - -spec info(info_item(), #queue{}) -> any(). %% @spec info(Item, #queue{}) -> Info %% Item = max_time | oldest_job | length @@ -172,7 +167,7 @@ timedout(#queue{max_time = TO} = Q) -> timedout(_ , #queue{oldest_job = undefined}) -> []; timedout(TO, #queue{st = #st{table = Tab}} = Q) -> - Now = timestamp(), + Now = jobs_lib:timestamp(), {Objs, OJ} = find_expired(Tab, Now, TO), {Objs, Q#queue{oldest_job = OJ}}. diff --git a/src/jobs_queue_list.erl b/src/jobs_queue_list.erl index 6d3fc63..6ab768d 100644 --- a/src/jobs_queue_list.erl +++ b/src/jobs_queue_list.erl @@ -27,10 +27,13 @@ -author('ulf.wiger@erlang-solutions.com'). -copyright('Erlang Solutions Ltd.'). +-behaviour(jobs_queue). + -export([new/2, delete/1]). -export([in/3, out/2, + peek/1, info/2, all/1, empty/1, @@ -73,6 +76,12 @@ out(N, #queue{st = L, oldest_job = OJ} = Q) when N >= 0 -> end, {Out, Q#queue{st = Rest, oldest_job = OJ1}}. +peek(#queue { st = []}) -> + undefined; +peek(#queue { st = [H|_]}) -> + H. + + split(N, L) -> split(N, L, []). diff --git a/src/jobs_sampler.erl b/src/jobs_sampler.erl index ace59f1..8b8dda8 100644 --- a/src/jobs_sampler.erl +++ b/src/jobs_sampler.erl @@ -25,19 +25,18 @@ -module(jobs_sampler). -export([start_link/0, start_link/1, - trigger_sample/0, - tell_sampler/2, - subscribe/0, - end_subscription/0, - calc/3]). + trigger_sample/0, + tell_sampler/2, + subscribe/0, + end_subscription/0, + calc/3]). -export([init/1, - behaviour_info/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). -include("jobs.hrl"). @@ -59,13 +58,14 @@ modifiers = orddict:new(), remote_modifiers = []}). - -behaviour_info(callbacks) -> - [{init, 2}, - {sample, 2}, - {handle_msg, 3}, - {calc, 2}]. - +-callback init(atom(), proplists:proplist()) -> + {ok, State :: term()}. +-callback sample(timestamp(), State :: term()) -> + {Res :: term(), NState :: term()} | ignore. +-callback handle_msg(M :: term(), TS :: timestamp(), State :: term()) -> + {ignore, NState :: term()} | {log, L :: term(), NState :: term()}. +-callback calc(History :: term(), State :: term()) -> + {[{Q :: atom(), Val :: integer()}], NState :: term()}. trigger_sample() -> gen_server:cast(?MODULE, sample).