Skip to content

Commit

Permalink
Add and document global_timeout for entire DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
weatherman2095pro committed Apr 5, 2019
1 parent 11ca4a8 commit b0e36d3
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 8 deletions.
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,15 @@ Options
-------

- `{event_manager, pid()}`: Specify a `gen_event` process to forward events to
- `{failure_mode, partial | total}` (default: total): Switch between partial and total failure modes. Total failure will immediately shut down all running vertices within a DAG. Partial failure will cancel running all tasks reachable by any failed vertex, but will continue until all vertices finish running.

- `{failure_mode, partial | total}` (default: total): Switch between
partial and total failure modes. Total failure will immediately shut
down all running vertices within a DAG. Partial failure will cancel
running all tasks reachable by any failed vertex, but will continue
until all vertices finish running.
- `{global_timeout, integer() | undefined}`: Specify a timeout in
milliseconds by which the entire DAG will shutdown if it hasn't
already finished. An `undefined` value means no timeout. (default:
undefined)

Example
-------
Expand Down Expand Up @@ -76,8 +83,3 @@ VertDefn = #{module => wrek_sleep_vert, args => [], deps => [], timeout => 10},

Where `timeout` is the number of milliseconds until normal completion
or forced timeout.

TODO
----

- Add timeout for DAG. Timeouts for individual vertices have been implemented.
16 changes: 15 additions & 1 deletion src/wrek.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@

-type dag_map() :: #{any() := vert_defn()} | [{any(), vert_defn()}].

-type option() :: {event_manager, pid()} | {failure_mode, partial | total}.
-type option() :: {event_manager, pid()} | {failure_mode, partial | total} |
{global_timeout, pos_integer() | undefined}.

-export_type([
dag_id/0,
Expand Down Expand Up @@ -104,6 +105,13 @@ handle_cast(_Req, State) ->

-spec handle_info(_, state()) -> {noreply, state()}.

handle_info(timeout, #state{
event_mgr = EvMgr,
id = Id
} = State) ->
wrek_event:wrek_error(EvMgr, Id, timeout),
{stop, timeout, State};

handle_info({'EXIT', Pid, {shutdown, {ok, Data}}}, State) ->
#state{wrek = Wrek} = State,

Expand Down Expand Up @@ -143,6 +151,12 @@ init({Id, DagMap, Opts}) ->
EvMgr = proplists:get_value(event_manager, Opts, undefined),
FailMode = proplists:get_value(failure_mode, Opts, total),

case proplists:get_value(global_timeout, Opts, undefined) of
T when is_integer(T) ->
{ok, _TRef} = timer:send_after(T, timeout);
undefined -> ok
end,

Sandbox = make_dag_sandbox(Id),

State = #state{
Expand Down
40 changes: 40 additions & 0 deletions test/wrek_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,46 @@ custom_exec_callback_test() ->

?assertEqual(Msg, <<"good">>).

wrek_timeout_test() ->
exec:start(),
application:start(wrek),

{ok, EvMgr} = gen_event:start_link({local, wrek_test_manager}),
gen_event:add_handler(EvMgr, wrek_test_handler, [total, self()]),

VertMap = #{
one => ok_v([]),
two => ok_v([one]),
two_and_a_half => ok_v([one]),
three => ok_v([two])
},

{ok, _Pid} = wrek:start(VertMap, [{event_manager, EvMgr}, {global_timeout, 0}]),

Events = receive
#{evts := Evts} -> Evts
end,

gen_event:stop(EvMgr),

WrekStarts =
[E || E = #wrek_event{type = {wrek, start}} <- Events],
StartingVerts =
[E || E = #wrek_event{msg = {starting_vert, _}} <- Events],
VertStarts =
[E || E = #wrek_event{type = {vert, start}} <- Events],
VertDones =
[E || E = #wrek_event{type = {vert, done}} <- Events],
WrekDones =
[E || E = #wrek_event{type = {wrek, done}} <- Events],

?assertEqual(1, length(WrekStarts)),
?assertEqual(1, length(StartingVerts)),
?assertEqual(1, length(VertStarts)),
?assertEqual(0, length(VertDones)),
?assertEqual(0, length(WrekDones)).


%% private

ok_v(Deps) ->
Expand Down

0 comments on commit b0e36d3

Please sign in to comment.