Skip to content

Commit

Permalink
Add wrek_vert_t to manage vert data
Browse files Browse the repository at this point in the history
This new module replaces the use of maps to represent verts with a record and a
set of getters/setters, as well as some more domain-specific functions. This
should make it easier/less error-prone to extend in the future.
  • Loading branch information
rkallos committed Nov 28, 2018
1 parent 0c64a95 commit 83258f4
Show file tree
Hide file tree
Showing 4 changed files with 438 additions and 133 deletions.
102 changes: 46 additions & 56 deletions src/wrek.erl
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,12 @@ code_change(_Req, _From, State) ->

handle_call({put_sandbox, Dir}, {From, _}, State) ->
#state{
children = #{From := Name},
dag = Dag
} = State,
{Name, Label} = digraph:vertex(Dag, Name),
digraph:add_vertex(Dag, Name, Label#{dir => Dir}),
children = #{From := Name},
dag = Dag
} = State,
{Name, Vert} = digraph:vertex(Dag, Name),
Vert2 = wrek_vert_t:set_dir(Vert, Dir),
digraph:add_vertex(Dag, Name, Vert2),
{reply, ok, State};

handle_call(sandbox, _From, State) ->
Expand All @@ -110,19 +111,22 @@ handle_cast(_Req, State) ->

handle_info({'EXIT', Pid, {shutdown, {ok, Data}}}, State0) ->
#state{
children = #{Pid := Name},
dag = Dag
} = State0,
children = #{Pid := Name},
dag = Dag
} = State0,

{Name, Vert} = digraph:vertex(Dag, Name),

{Name, Label} = digraph:vertex(Dag, Name),
digraph:add_vertex(Dag, Name, Label#{internal => Data}),
Vert2 = wrek_vert_t:succeed(Vert, Data),
digraph:add_vertex(Dag, Name, Vert2),

State = mark_vert_done(State0, Pid),
State = remove_vert(State0, Pid),
start_verts_or_exit(State);

handle_info({'EXIT', Pid, {shutdown, Reason}}, State) ->
#state{
children = Children,
dag = Dag,
event_mgr = EvMgr,
failure_mode = FailMode,
id = Id
Expand All @@ -133,7 +137,9 @@ handle_info({'EXIT', Pid, {shutdown, Reason}}, State) ->
total ->
{stop, {error, Reason}, State};
partial ->
State2 = mark_vert_failed(State, Pid),
{Name, Vert} = digraph:vertex(Dag, Name),
digraph:add_vertex(Dag, Name, wrek_vert_t:fail(Vert, Reason)),
State2 = remove_vert(State, Pid),
propagate_partial_failure(State2, Name),
start_verts_or_exit(State)
end;
Expand Down Expand Up @@ -182,26 +188,21 @@ terminate(_Reason, _State) ->

%% private

-spec has_done_key({digraph:vertex(), digraph:label()}) -> boolean().

has_done_key({_, #{status := _}}) -> true;
has_done_key(_) -> false.


-spec is_dag_done(state()) -> boolean().

is_dag_done(#state{dag = Dag}) ->
Verts = [digraph:vertex(Dag, V) || V <- digraph:vertices(Dag)],
lists:all(fun has_done_key/1, Verts).
lists:all(fun({_Name, Vert}) ->
wrek_vert_t:is_finished(Vert)
end, Verts).


-spec is_vert_ready(digraph:graph(), digraph:vertex()) -> boolean().

is_vert_ready(Dag, Vertex) ->
Deps = [digraph:vertex(Dag, V) || V <- wrek_utils:in_vertices(Dag, Vertex)],
lists:all(fun
({_, #{status := done}}) -> true;
(_) -> false
is_vert_ready(Dag, Name) ->
Deps = [digraph:vertex(Dag, V) || V <- wrek_utils:in_vertices(Dag, Name)],
lists:all(fun({_Name, Vert}) ->
wrek_vert_t:has_succeeded(Vert)
end, Deps).


Expand All @@ -228,27 +229,13 @@ make_vert_data(#state{dag = Dag}, Name) ->
maps:from_list(Reaching).


-spec mark_vert(state(), pid(), done | failed) -> state().
-spec remove_vert(state(), pid()) -> state().

mark_vert(State = #state{children = Children, dag = Dag}, Pid, Status) ->
#{Pid := Name} = Children,
remove_vert(State = #state{children = Children}, Pid) ->
Children2 = maps:remove(Pid, Children),
{Name, Label} = digraph:vertex(Dag, Name),
Label2 = Label#{status => Status},
digraph:add_vertex(Dag, Name, Label2),
State#state{children = Children2}.


-spec mark_vert_failed(state(), pid()) -> state().

mark_vert_failed(State, Pid) -> mark_vert(State, Pid, failed).


-spec mark_vert_done(state(), pid()) -> state().

mark_vert_done(State, Pid) -> mark_vert(State, Pid, done).


-spec propagate_partial_failure(state(), digraph:vertex()) -> ok.

propagate_partial_failure(State, Name) ->
Expand All @@ -260,7 +247,8 @@ propagate_partial_failure(State, Name) ->
Reachable = digraph_utils:reachable_neighbours([Name], Dag),
Fun = fun(Vert) ->
{Vert, Label} = digraph:vertex(Dag, Vert),
digraph:add_vertex(Dag, Vert, Label#{status => cancelled}),
Label2 = wrek_vert_t:set_status(Label, cancelled),
digraph:add_vertex(Dag, Vert, Label2),
wrek_event:wrek_msg(EvMgr, Id, {vert_cancelled, Vert}),
ok
end,
Expand All @@ -270,10 +258,12 @@ propagate_partial_failure(State, Name) ->
-spec ready_verts(state()) -> [digraph:vertex()].

ready_verts(#state{dag = Dag, children = Children}) ->
[Vertex || Vertex <- digraph:vertices(Dag),
is_vert_ready(Dag, Vertex),
not has_done_key(digraph:vertex(Dag, Vertex)),
not lists:member(Vertex, maps:values(Children))].
lists:filter(fun(Name) ->
{Name, Vert} = digraph:vertex(Dag, Name),
is_vert_ready(Dag, Name) andalso
not wrek_vert_t:is_finished(Vert) andalso
not lists:member(Name, maps:values(Children))
end, digraph:vertices(Dag)).


-spec start_verts(state()) -> {ok, state()} | {error, _}.
Expand All @@ -294,19 +284,19 @@ start_verts(State = #state{children = Children}) ->

start_vert(State = #state{dag = Dag, id = DagId}, Name) ->
#state{
dag = Dag,
event_mgr = EventMgr,
id = DagId
} = State,
dag = Dag,
event_mgr = EventMgr,
id = DagId
} = State,
VertId = {DagId, ?id()},
{Name, Label0} = digraph:vertex(Dag, Name),
Label = Label0#{id => VertId},
digraph:add_vertex(Dag, Name, Label),
{Name, Vert} = digraph:vertex(Dag, Name),
Vert2 = wrek_vert_t:set_id(Vert, VertId),
digraph:add_vertex(Dag, Name, Vert2),

wrek_event:wrek_msg(EventMgr, DagId, {starting_vert, VertId}),

Data = make_vert_data(State, Name),
Args = {Data, EventMgr, VertId, Name, self()},
Args = {Vert2, Data, EventMgr, self()},
gen_server:start_link(wrek_vert, Args, []).


Expand All @@ -316,9 +306,9 @@ start_verts_or_exit(State) ->
case is_dag_done(State) of
true ->
#state{
event_mgr = EvMgr,
id = Id
} = State,
event_mgr = EvMgr,
id = Id
} = State,
wrek_event:wrek_done(EvMgr, Id),
{stop, normal, State};
false ->
Expand Down
20 changes: 14 additions & 6 deletions src/wrek_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,18 @@ add_edges(Dag, To, [From | Froms]) ->
end.


-spec add_vertices(digraph:graph(), [{any(), wrek:vert_defn()}]) -> ok.
-spec add_vertices(digraph:graph(), [{any(), wrek:vert_defn()}]) ->
ok | {error, any()}.

add_vertices(_Dag, []) ->
ok;

add_vertices(Dag, Verts) ->
lists:foreach(
fun({Name, Defn}) -> digraph:add_vertex(Dag, Name, Defn) end,
Verts
).
add_vertices(Dag, [{Name, Defn} | Rest]) ->
case wrek_vert_t:from_defn(Defn) of
{ok, Vert} ->
Vert2 = wrek_vert_t:set_name(Vert, Name),
digraph:add_vertex(Dag, Name, Vert2),
add_vertices(Dag, Rest);
{error, _} = Err ->
Err
end.
Loading

0 comments on commit 83258f4

Please sign in to comment.