Skip to content

Commit

Permalink
Fix fabric worker failures for partition requests
Browse files Browse the repository at this point in the history
Previously any failed node or rexi worker error resulted in requests failing
immediately even though there were available workers to keep handling the
request. This was because the progress check function didn't account for the
fact that partition requests only use a handful of shards which, by design, do
not complete the full ring.

Here we fix both partition info queries and dreyfus search functionality. We
follow the pattern from fabric and pass through a set of "ring options" that
let the progress function know it is dealing with partitions instead of a full
ring.
  • Loading branch information
nickva authored and jiangphcn committed Jan 17, 2020
1 parent 6db8b57 commit 881e0e0
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 85 deletions.
133 changes: 115 additions & 18 deletions src/dreyfus/src/dreyfus_fabric.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-

-module(dreyfus_fabric).
-export([get_json_docs/2, handle_error_message/6]).
-export([get_json_docs/2, handle_error_message/7]).

-include_lib("couch/include/couch_db.hrl").
-include_lib("mem3/include/mem3.hrl").
Expand All @@ -36,40 +36,42 @@ callback(timeout, _Acc) ->
{error, timeout}.

handle_error_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker,
Counters, _Replacements, _StartFun, _StartArgs) ->
case fabric_util:remove_down_workers(Counters, NodeRef) of
Counters, _Replacements, _StartFun, _StartArgs, RingOpts) ->
case fabric_util:remove_down_workers(Counters, NodeRef, RingOpts) of
{ok, NewCounters} ->
{ok, NewCounters};
error ->
{error, {nodedown, <<"progress not possible">>}}
end;
handle_error_message({rexi_EXIT, {maintenance_mode, _}}, Worker,
Counters, Replacements, StartFun, StartArgs) ->
handle_replacement(Worker, Counters, Replacements, StartFun, StartArgs);
Counters, Replacements, StartFun, StartArgs, RingOpts) ->
handle_replacement(Worker, Counters, Replacements, StartFun, StartArgs,
RingOpts);
handle_error_message({rexi_EXIT, Reason}, Worker,
Counters, _Replacements, _StartFun, _StartArgs) ->
handle_error(Reason, Worker, Counters);
Counters, _Replacements, _StartFun, _StartArgs, RingOpts) ->
handle_error(Reason, Worker, Counters, RingOpts);
handle_error_message({error, Reason}, Worker,
Counters, _Replacements, _StartFun, _StartArgs) ->
handle_error(Reason, Worker, Counters);
Counters, _Replacements, _StartFun, _StartArgs, RingOpts) ->
handle_error(Reason, Worker, Counters, RingOpts);
handle_error_message({'EXIT', Reason}, Worker,
Counters, _Replacements, _StartFun, _StartArgs) ->
handle_error({exit, Reason}, Worker, Counters);
Counters, _Replacements, _StartFun, _StartArgs, RingOpts) ->
handle_error({exit, Reason}, Worker, Counters, RingOpts);
handle_error_message(Reason, Worker, Counters,
_Replacements, _StartFun, _StartArgs) ->
_Replacements, _StartFun, _StartArgs, RingOpts) ->
couch_log:error("Unexpected error during request: ~p", [Reason]),
handle_error(Reason, Worker, Counters).
handle_error(Reason, Worker, Counters, RingOpts).

handle_error(Reason, Worker, Counters0) ->
handle_error(Reason, Worker, Counters0, RingOpts) ->
Counters = fabric_dict:erase(Worker, Counters0),
case fabric_view:is_progress_possible(Counters) of
case fabric_ring:is_progress_possible(Counters, RingOpts) of
true ->
{ok, Counters};
false ->
{error, Reason}
end.

handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs) ->
handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs,
RingOpts) ->
OldCounters = lists:filter(fun({#shard{ref=R}, _}) ->
R /= Worker#shard.ref
end, OldCntrs0),
Expand All @@ -79,12 +81,12 @@ handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs) ->
NewCounter = start_replacement(StartFun, StartArgs, Repl),
fabric_dict:store(NewCounter, nil, CounterAcc)
end, OldCounters, Replacements),
true = fabric_view:is_progress_possible(NewCounters),
true = fabric_ring:is_progress_possible(NewCounters, RingOpts),
NewRefs = fabric_dict:fetch_keys(NewCounters),
{new_refs, NewRefs, NewCounters, NewReplacements};
false ->
handle_error({nodedown, <<"progress not possible">>},
Worker, OldCounters)
Worker, OldCounters, RingOpts)
end.

start_replacement(StartFun, StartArgs, Shard) ->
Expand All @@ -106,3 +108,98 @@ start_replacement(StartFun, StartArgs, Shard) ->
{dreyfus_rpc, StartFun,
[Shard#shard.name|StartArgs1]}),
Shard#shard{ref = Ref}.


-ifdef(TEST).

-include_lib("eunit/include/eunit.hrl").


node_down_test() ->
[S1, S2, S3] = [
mk_shard("n1", [0, 4]),
mk_shard("n1", [5, ?RING_END]),
mk_shard("n2", [0, ?RING_END])
],
[W1, W2, W3] = [
S1#shard{ref = make_ref()},
S2#shard{ref = make_ref()},
S3#shard{ref = make_ref()}
],
Counters1 = fabric_dict:init([W1, W2, W3], nil),

N1 = S1#shard.node,
Msg1 = {rexi_DOWN, nil, {nil, N1}, nil},
Res1 = handle_error_message(Msg1, nil, Counters1, nil, nil, nil, []),
?assertEqual({ok, [{W3, nil}]}, Res1),

{ok, Counters2} = Res1,
N2 = S3#shard.node,
Msg2 = {rexi_DOWN, nil, {nil, N2}, nil},
Res2 = handle_error_message(Msg2, nil, Counters2, nil, nil, nil, []),
?assertEqual({error, {nodedown, <<"progress not possible">>}}, Res2).


worker_error_test() ->
[S1, S2] = [
mk_shard("n1", [0, ?RING_END]),
mk_shard("n2", [0, ?RING_END])
],
[W1, W2] = [S1#shard{ref = make_ref()}, S2#shard{ref = make_ref()}],
Counters1 = fabric_dict:init([W1, W2], nil),

Res1 = handle_error(bam, W1, Counters1, []),
?assertEqual({ok, [{W2, nil}]}, Res1),

{ok, Counters2} = Res1,
?assertEqual({error, boom}, handle_error(boom, W2, Counters2, [])).


node_down_with_partitions_test() ->
[S1, S2] = [
mk_shard("n1", [0, 4]),
mk_shard("n2", [0, 8])
],
[W1, W2] = [
S1#shard{ref = make_ref()},
S2#shard{ref = make_ref()}
],
Counters1 = fabric_dict:init([W1, W2], nil),
RingOpts = [{any, [S1, S2]}],

N1 = S1#shard.node,
Msg1 = {rexi_DOWN, nil, {nil, N1}, nil},
Res1 = handle_error_message(Msg1, nil, Counters1, nil, nil, nil, RingOpts),
?assertEqual({ok, [{W2, nil}]}, Res1),

{ok, Counters2} = Res1,
N2 = S2#shard.node,
Msg2 = {rexi_DOWN, nil, {nil, N2}, nil},
Res2 = handle_error_message(Msg2, nil, Counters2, nil, nil, nil, RingOpts),
?assertEqual({error, {nodedown, <<"progress not possible">>}}, Res2).


worker_error_with_partitions_test() ->
[S1, S2] = [
mk_shard("n1", [0, 4]),
mk_shard("n2", [0, 8])],
[W1, W2] = [
S1#shard{ref = make_ref()},
S2#shard{ref = make_ref()}
],
Counters1 = fabric_dict:init([W1, W2], nil),
RingOpts = [{any, [S1, S2]}],

Res1 = handle_error(bam, W1, Counters1, RingOpts),
?assertEqual({ok, [{W2, nil}]}, Res1),

{ok, Counters2} = Res1,
?assertEqual({error, boom}, handle_error(boom, W2, Counters2, RingOpts)).


mk_shard(Name, Range) ->
Node = list_to_atom(Name),
BName = list_to_binary(Name),
#shard{name = BName, node = Node, range = Range}.

-endif.
9 changes: 6 additions & 3 deletions src/dreyfus/src/dreyfus_fabric_group1.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
top_groups,
counters,
start_args,
replacements
replacements,
ring_opts
}).

go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) ->
Expand All @@ -39,6 +40,7 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
DesignName = dreyfus_util:get_design_docid(DDoc),
dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName),
Shards = dreyfus_util:get_shards(DbName, QueryArgs),
RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards),
Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, group1, [DDoc,
IndexName, dreyfus_util:export(QueryArgs)]),
Replacements = fabric_view:get_shard_replacements(DbName, Workers),
Expand All @@ -50,7 +52,8 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
top_groups = [],
counters = Counters,
start_args = [DDoc, IndexName, QueryArgs],
replacements = Replacements
replacements = Replacements,
ring_opts = RingOpts
},
try
rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
Expand Down Expand Up @@ -89,7 +92,7 @@ handle_message(Error, Worker, State0) ->
State = upgrade_state(State0),
case dreyfus_fabric:handle_error_message(Error, Worker,
State#state.counters, State#state.replacements,
group1, State#state.start_args) of
group1, State#state.start_args, State#state.ring_opts) of
{ok, Counters} ->
{ok, State#state{counters=Counters}};
{new_refs, NewRefs, NewCounters, NewReplacements} ->
Expand Down
9 changes: 6 additions & 3 deletions src/dreyfus/src/dreyfus_fabric_group2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
top_groups,
counters,
start_args,
replacements
replacements,
ring_opts
}).

go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) ->
Expand All @@ -41,6 +42,7 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
DesignName = dreyfus_util:get_design_docid(DDoc),
dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName),
Shards = dreyfus_util:get_shards(DbName, QueryArgs),
RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards),
Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, group2,
[DDoc, IndexName, dreyfus_util:export(QueryArgs)]),
Replacements = fabric_view:get_shard_replacements(DbName, Workers),
Expand All @@ -54,7 +56,8 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
top_groups = [],
counters = Counters,
start_args = [DDoc, IndexName, QueryArgs],
replacements = Replacements
replacements = Replacements,
ring_opts = RingOpts
},
try
rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
Expand Down Expand Up @@ -102,7 +105,7 @@ handle_message(Error, Worker, State0) ->
State = upgrade_state(State0),
case dreyfus_fabric:handle_error_message(Error, Worker,
State#state.counters, State#state.replacements,
group2, State#state.start_args) of
group2, State#state.start_args, State#state.ring_opts) of
{ok, Counters} ->
{ok, State#state{counters=Counters}};
{new_refs, NewRefs, NewCounters, NewReplacements} ->
Expand Down
6 changes: 3 additions & 3 deletions src/dreyfus/src/dreyfus_fabric_info.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, {Counters, Acc}) ->

handle_message({rexi_EXIT, Reason}, Worker, {Counters, Acc}) ->
NewCounters = fabric_dict:erase(Worker, Counters),
case fabric_view:is_progress_possible(NewCounters) of
case fabric_ring:is_progress_possible(NewCounters) of
true ->
{ok, {NewCounters, Acc}};
false ->
Expand All @@ -74,15 +74,15 @@ handle_message({ok, Info}, Worker, {Counters, Acc}) ->

handle_message({error, Reason}, Worker, {Counters, Acc}) ->
NewCounters = fabric_dict:erase(Worker, Counters),
case fabric_view:is_progress_possible(NewCounters) of
case fabric_ring:is_progress_possible(NewCounters) of
true ->
{ok, {NewCounters, Acc}};
false ->
{error, Reason}
end;
handle_message({'EXIT', _}, Worker, {Counters, Acc}) ->
NewCounters = fabric_dict:erase(Worker, Counters),
case fabric_view:is_progress_possible(NewCounters) of
case fabric_ring:is_progress_possible(NewCounters) of
true ->
{ok, {NewCounters, Acc}};
false ->
Expand Down
18 changes: 12 additions & 6 deletions src/dreyfus/src/dreyfus_fabric_search.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
top_docs,
counters,
start_args,
replacements
replacements,
ring_opts
}).

go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) ->
Expand All @@ -40,10 +41,11 @@ go(DbName, DDoc, IndexName, #index_query_args{bookmark=nil}=QueryArgs) ->
DesignName = dreyfus_util:get_design_docid(DDoc),
dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName),
Shards = dreyfus_util:get_shards(DbName, QueryArgs),
RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards),
Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, search,
[DDoc, IndexName, dreyfus_util:export(QueryArgs)]),
Counters = fabric_dict:init(Workers, nil),
go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters);
go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters, RingOpts);

go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
Bookmark0 = try dreyfus_bookmark:unpack(DbName, QueryArgs)
Expand All @@ -54,6 +56,7 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
Shards = dreyfus_util:get_shards(DbName, QueryArgs),
LiveNodes = [node() | nodes()],
LiveShards = [S || #shard{node=Node} = S <- Shards, lists:member(Node, LiveNodes)],
RingOpts = dreyful_util:get_ring_opts(QueryArgs, LiveShards),
Bookmark1 = dreyfus_bookmark:add_missing_shards(Bookmark0, LiveShards),
Counters0 = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, After}) ->
QueryArgs1 = dreyfus_util:export(QueryArgs#index_query_args{
Expand All @@ -73,14 +76,16 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
end
end, Bookmark1),
Counters = fabric_dict:init(Counters0, nil),
WorkerShards = fabric_dict:fetch_keys(Counters),
RingOpts = dreyfus_util:get_ring_opts(QueryArgs, WorkerShards),
QueryArgs2 = QueryArgs#index_query_args{
bookmark = Bookmark1
},
go(DbName, DDoc, IndexName, QueryArgs2, Counters, Bookmark1);
go(DbName, DDoc, IndexName, QueryArgs2, Counters, Bookmark1, RingOpts);
go(DbName, DDoc, IndexName, OldArgs) ->
go(DbName, DDoc, IndexName, dreyfus_util:upgrade(OldArgs)).

go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark) ->
go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark, RingOpts) ->
{Workers, _} = lists:unzip(Counters),
#index_query_args{
limit = Limit,
Expand All @@ -94,7 +99,8 @@ go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark) ->
top_docs = #top_docs{total_hits=0,hits=[]},
counters = Counters,
start_args = [DDoc, IndexName, QueryArgs],
replacements = Replacements
replacements = Replacements,
ring_opts = RingOpts
},
RexiMon = fabric_util:create_monitors(Workers),
try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
Expand Down Expand Up @@ -154,7 +160,7 @@ handle_message(Error, Worker, State0) ->
State = upgrade_state(State0),
case dreyfus_fabric:handle_error_message(Error, Worker,
State#state.counters, State#state.replacements,
search, State#state.start_args) of
search, State#state.start_args, State#state.ring_opts) of
{ok, Counters} ->
{ok, State#state{counters=Counters}};
{new_refs, NewRefs, NewCounters, NewReplacements} ->
Expand Down
22 changes: 21 additions & 1 deletion src/dreyfus/src/dreyfus_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").

-export([get_shards/2, sort/2, upgrade/1, export/1, time/2]).
-export([get_shards/2, get_ring_opts/2, sort/2, upgrade/1, export/1, time/2]).
-export([in_black_list/1, in_black_list/3, maybe_deny_index/3]).
-export([get_design_docid/1]).
-export([
Expand Down Expand Up @@ -59,6 +59,15 @@ use_ushards(#index_query_args{stable=true}) ->
use_ushards(#index_query_args{}) ->
false.


get_ring_opts(#index_query_args{partition = nil}, _Shards) ->
[];
get_ring_opts(#index_query_args{}, Shards) ->
Shards1 = lists:map(fun(#shard{} = S) ->
S#shard{ref = undefined}
end, Shards),
[{any, Shards1}].

-spec sort(Order :: relevance | [any()], [#sortable{}]) -> [#sortable{}].
sort(Sort, List0) ->
{List1, Stash} = stash_items(List0),
Expand Down Expand Up @@ -418,4 +427,15 @@ stash_test() ->
Unstashed = hd(unstash_items(Stashed, Stash)),
?assertEqual(Unstashed#sortable.item, bar).


ring_opts_test() ->
Shards = [#shard{name = foo, ref = make_ref()}],

QArgs1 = #index_query_args{partition = nil},
?assertEqual([], get_ring_opts(QArgs1, Shards)),

QArgs2 = #index_query_args{partition = <<"x">>},
?assertMatch([{any, [#shard{name = foo, ref = undefined}]}],
get_ring_opts(QArgs2, Shards)).

-endif.
Loading

0 comments on commit 881e0e0

Please sign in to comment.