Skip to content

Commit

Permalink
Add wrek_t to manage dag data
Browse files Browse the repository at this point in the history
I was fed up of littering wrek.erl and wrek_vert.erl with calls to digraph:*, so
I wrapped the digraph as well as the #{pid() => any()} map in a record defined
in wrek_t.erl.
  • Loading branch information
rkallos committed Nov 28, 2018
1 parent 83258f4 commit 48a33b2
Show file tree
Hide file tree
Showing 5 changed files with 358 additions and 205 deletions.
163 changes: 53 additions & 110 deletions src/wrek.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,11 @@
]).

-record(state, {
children = #{} :: #{pid() => any()},
dag = undefined :: digraph:graph() | undefined,
event_mgr = undefined :: pid() | undefined,
failure_mode = total :: partial | total,
id = ?id() :: dag_id(),
sandbox = undefined :: file:filename_all() | undefined
sandbox = undefined :: file:filename_all() | undefined,
wrek = undefined :: wrek_t:t() | undefined
}).

-type state() :: #state{}.
Expand Down Expand Up @@ -84,15 +83,11 @@ code_change(_Req, _From, State) ->

-spec handle_call(_, _, state()) -> {reply, _, state()} | {stop, _, state()}.

handle_call({put_sandbox, Dir}, {From, _}, State) ->
#state{
children = #{From := Name},
dag = Dag
} = State,
{Name, Vert} = digraph:vertex(Dag, Name),
handle_call({put_sandbox, Dir}, {From, _}, State = #state{wrek = Wrek}) ->
{ok, {Name, Vert}} = wrek_t:child(Wrek, From),
Vert2 = wrek_vert_t:set_dir(Vert, Dir),
digraph:add_vertex(Dag, Name, Vert2),
{reply, ok, State};
Wrek2 = wrek_t:add_vertex(Wrek, Name, Vert2),
{reply, ok, State#state{wrek = Wrek2}};

handle_call(sandbox, _From, State) ->
{reply, State#state.sandbox, State};
Expand All @@ -109,39 +104,29 @@ handle_cast(_Req, State) ->

-spec handle_info(_, state()) -> {noreply, state()}.

handle_info({'EXIT', Pid, {shutdown, {ok, Data}}}, State0) ->
#state{
children = #{Pid := Name},
dag = Dag
} = State0,

{Name, Vert} = digraph:vertex(Dag, Name),

Vert2 = wrek_vert_t:succeed(Vert, Data),
digraph:add_vertex(Dag, Name, Vert2),
handle_info({'EXIT', Pid, {shutdown, {ok, Data}}}, State) ->
#state{wrek = Wrek} = State,

State = remove_vert(State0, Pid),
start_verts_or_exit(State);
Wrek2 = wrek_t:child_succeeded(Wrek, Pid, Data),
start_verts_or_exit(State#state{wrek = Wrek2});

handle_info({'EXIT', Pid, {shutdown, Reason}}, State) ->
#state{
children = Children,
dag = Dag,
event_mgr = EvMgr,
failure_mode = FailMode,
id = Id
id = Id,
wrek = Wrek
} = State,
#{Pid := Name} = Children,
{ok, {Name, _Vert}} = wrek_t:child(Wrek, Pid),
wrek_event:wrek_error(EvMgr, Id, {vert, Name}),
case FailMode of
total ->
{stop, {error, Reason}, State};
partial ->
{Name, Vert} = digraph:vertex(Dag, Name),
digraph:add_vertex(Dag, Name, wrek_vert_t:fail(Vert, Reason)),
State2 = remove_vert(State, Pid),
propagate_partial_failure(State2, Name),
start_verts_or_exit(State)
Wrek2 = wrek_t:child_failed(Wrek, Pid, Reason),
State2 = State#state{wrek = Wrek2},
State3 = propagate_partial_failure(State2, Name),
start_verts_or_exit(State3)
end;

handle_info(_Req, State) ->
Expand All @@ -153,30 +138,31 @@ handle_info(_Req, State) ->
init({Id, DagMap, Opts}) ->
process_flag(trap_exit, true),

{ok, Dag} = wrek_utils:from_verts(DagMap),
{ok, Wrek} = wrek_t:from_verts(DagMap),

EvMgr = proplists:get_value(event_manager, Opts, undefined),
FailMode = proplists:get_value(failure_mode, Opts, total),

Sandbox = make_dag_sandbox(Id),

State = #state{
dag = Dag,
event_mgr = EvMgr,
failure_mode = FailMode,
id = Id,
sandbox = Sandbox
sandbox = Sandbox,
wrek = Wrek
},

wrek_event:wrek_start(EvMgr, Id, DagMap),

{ok, State2} = start_verts(State),

case maps:size(State2#state.children) of
0 ->
wrek_event:wrek_error(EvMgr, Id, {unable_to_start, DagMap}),
{stop, {unable_to_start, DagMap}};
_ ->
case wrek_t:is_active(State2#state.wrek) of
false ->
Reason = {unable_to_start, DagMap},
wrek_event:wrek_error(EvMgr, Id, Reason),
{stop, Reason};
true ->
{ok, State2}
end.

Expand All @@ -188,24 +174,6 @@ terminate(_Reason, _State) ->

%% private

-spec is_dag_done(state()) -> boolean().

is_dag_done(#state{dag = Dag}) ->
Verts = [digraph:vertex(Dag, V) || V <- digraph:vertices(Dag)],
lists:all(fun({_Name, Vert}) ->
wrek_vert_t:is_finished(Vert)
end, Verts).


-spec is_vert_ready(digraph:graph(), digraph:vertex()) -> boolean().

is_vert_ready(Dag, Name) ->
Deps = [digraph:vertex(Dag, V) || V <- wrek_utils:in_vertices(Dag, Name)],
lists:all(fun({_Name, Vert}) ->
wrek_vert_t:has_succeeded(Vert)
end, Deps).


-define(DIRNAME,
lists:flatten(
io_lib:format(
Expand All @@ -223,87 +191,62 @@ make_dag_sandbox(Id) ->

-spec make_vert_data(state(), _) -> any().

make_vert_data(#state{dag = Dag}, Name) ->
Reaching =
[digraph:vertex(Dag, V) || V <- digraph_utils:reaching([Name], Dag)],
maps:from_list(Reaching).


-spec remove_vert(state(), pid()) -> state().
make_vert_data(#state{wrek = Wrek}, Name) ->
Dependencies =
[wrek_t:vertex(Wrek, N) || N <- wrek_t:dependencies(Wrek, Name)],
maps:from_list([wrek_t:vertex(Wrek, Name) | Dependencies]).

remove_vert(State = #state{children = Children}, Pid) ->
Children2 = maps:remove(Pid, Children),
State#state{children = Children2}.


-spec propagate_partial_failure(state(), digraph:vertex()) -> ok.
-spec propagate_partial_failure(state(), digraph:vertex()) -> state().

propagate_partial_failure(State, Name) ->
#state{
dag = Dag,
event_mgr = EvMgr,
id = Id
id = Id,
wrek = Wrek
} = State,
Reachable = digraph_utils:reachable_neighbours([Name], Dag),
Fun = fun(Vert) ->
{Vert, Label} = digraph:vertex(Dag, Vert),
Label2 = wrek_vert_t:set_status(Label, cancelled),
digraph:add_vertex(Dag, Vert, Label2),
wrek_event:wrek_msg(EvMgr, Id, {vert_cancelled, Vert}),
ok
end,
lists:foreach(Fun, Reachable).


-spec ready_verts(state()) -> [digraph:vertex()].

ready_verts(#state{dag = Dag, children = Children}) ->
lists:filter(fun(Name) ->
{Name, Vert} = digraph:vertex(Dag, Name),
is_vert_ready(Dag, Name) andalso
not wrek_vert_t:is_finished(Vert) andalso
not lists:member(Name, maps:values(Children))
end, digraph:vertices(Dag)).
Wrek2 = lists:foldl(fun(VertName, Acc) ->
wrek_event:wrek_msg(EvMgr, Id, {vert_cancelled, VertName}),
wrek_t:cancel_vertex(Acc, VertName)
end, Wrek, wrek_t:dependants(Wrek, Name)),
State#state{wrek = Wrek2}.


-spec start_verts(state()) -> {ok, state()} | {error, _}.

start_verts(State = #state{children = Children}) ->
ReadyVerts = ready_verts(State),
Children2 =
lists:foldl(
fun(Name, Acc) ->
{ok, Pid} = start_vert(State, Name),
Acc#{Pid => Name}
end, Children, ReadyVerts),
State2 = State#state{children = Children2},
start_verts(State = #state{wrek = Wrek}) ->
ReadyVerts = wrek_t:ready_verts(Wrek),
Wrek2 = lists:foldl(fun(Name, Acc) ->
{ok, Pid} = start_vert(State, Name),
wrek_t:child_started(Acc, Name, Pid)
end, Wrek, ReadyVerts),
State2 = State#state{wrek = Wrek2},
{ok, State2}.


-spec start_vert(state(), digraph:vertex()) -> {ok, pid()}.

start_vert(State = #state{dag = Dag, id = DagId}, Name) ->
start_vert(State, Name) ->
#state{
dag = Dag,
event_mgr = EventMgr,
id = DagId
id = DagId,
wrek = Wrek
} = State,
VertId = {DagId, ?id()},
{Name, Vert} = digraph:vertex(Dag, Name),
Vert2 = wrek_vert_t:set_id(Vert, VertId),
digraph:add_vertex(Dag, Name, Vert2),
Wrek2 = wrek_t:set_vert_id(Wrek, Name, VertId),
{Name, Vert} = wrek_t:vertex(Wrek2, Name),

wrek_event:wrek_msg(EventMgr, DagId, {starting_vert, VertId}),

Data = make_vert_data(State, Name),
Args = {Vert2, Data, EventMgr, self()},
Args = {Vert, Data, EventMgr, self()},
gen_server:start_link(wrek_vert, Args, []).


-spec start_verts_or_exit(state()) -> {noreply, state()} | {stop, normal, state()}.

start_verts_or_exit(State) ->
case is_dag_done(State) of
start_verts_or_exit(State = #state{wrek = Wrek}) ->
case wrek_t:is_finished(Wrek) of
true ->
#state{
event_mgr = EvMgr,
Expand Down
Loading

0 comments on commit 48a33b2

Please sign in to comment.