Skip to content

Commit

Permalink
add remonitor code to DOWN message (apache#3144)
Browse files Browse the repository at this point in the history
Smoosh monitors the compactor pid to determine when the compaction jobs
finishes, and uses this for its idea of concurrency. However, this isn't
accurate in the case where the compaction job has to re-spawn to catch up on
intervening changes since the same logical compaction job continues with
another pid and smoosh is not aware. In such cases, a smoosh channel with
concurrency one can start arbitrarily many additional database compaction jobs.

To solve this problem, we added a check to see if a compaction PID exists for
a db in `start_compact`. But wee need to add another check because this check
is only for shard that comes off the queue. So the following can still occur:

1. Enqueue a bunch of stuff into channel with concurrency 1
2. Begin highest priority job, Shard1, in channel
3. Compaction finishes, discovers compaction file is behind main file
4. Smoosh-monitored PID for Shard1 exits, a new one starts to finish the job
5. Smoosh receives the 'DOWN' message, begins the next highest priority job,
Shard2
6. Channel concurrency is now 2, not 1

This change adds another check into the 'DOWN' message so that it checks for
that specific shard. If the compaction PID exists then it means a new process
was spawned and we just monitor that one and add it back to the queue. The
length of the queue does not change and therefore we won’t spawn new
compaction jobs.
  • Loading branch information
tonysun83 authored Sep 10, 2020
1 parent 45ddc93 commit a94e693
Showing 1 changed file with 25 additions and 6 deletions.
31 changes: 25 additions & 6 deletions src/smoosh/src/smoosh_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,9 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
#state{active=Active0, starting=Starting0} = State,
case lists:keytake(Job, 2, Active0) of
{value, {Key, _Pid}, Active1} ->
couch_log:warning("exit for compaction of ~p: ~p", [
smoosh_utils:stringify(Key), Reason]),
{ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
{noreply, maybe_start_compaction(State#state{active=Active1})};
State1 = maybe_remonitor_cpid(State#state{active=Active1}, Key,
Reason),
{noreply, maybe_start_compaction(State1)};
false ->
case lists:keytake(Ref, 1, Starting0) of
{value, {_, Key}, Starting1} ->
Expand Down Expand Up @@ -281,8 +280,7 @@ start_compact(State, Db) ->
Ref = erlang:monitor(process, DbPid),
DbPid ! {'$gen_call', {self(), Ref}, start_compact},
State#state{starting=[{Ref, Key}|State#state.starting]};
% database is still compacting so we can just monitor the existing
% compaction pid
% Compaction is already running, so monitor existing compaction pid.
CPid ->
couch_log:notice("Db ~s continuing compaction",
[smoosh_utils:stringify(Key)]),
Expand All @@ -293,6 +291,27 @@ start_compact(State, Db) ->
false
end.

maybe_remonitor_cpid(State, DbName, Reason) when is_binary(DbName) ->
{ok, Db} = couch_db:open_int(DbName, []),
case couch_db:get_compactor_pid(Db) of
nil ->
couch_log:warning("exit for compaction of ~p: ~p",
[smoosh_utils:stringify(DbName), Reason]),
{ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [DbName]),
State;
CPid ->
couch_log:notice("~s compaction already running. Re-monitor Pid ~p",
[smoosh_utils:stringify(DbName), CPid]),
erlang:monitor(process, CPid),
State#state{active=[{DbName, CPid}|State#state.active]}
end;
% not a database compaction, so ignore the pid check
maybe_remonitor_cpid(State, Key, Reason) ->
couch_log:warning("exit for compaction of ~p: ~p",
[smoosh_utils:stringify(Key), Reason]),
{ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
State.

schedule_unpause() ->
WaitSecs = list_to_integer(config:get("smoosh", "wait_secs", "30")),
erlang:send_after(WaitSecs * 1000, self(), unpause).
Expand Down

0 comments on commit a94e693

Please sign in to comment.