Skip to content

Commit

Permalink
Retain replication stats between job runs
Browse files Browse the repository at this point in the history
Previously stats counts between job runs were reset. So if a job was stopped
and restarted by the scheduler, its docs_written, docs_read, doc_write_failures,
etc., counts would go back to 0. For doc_write_failures this was especially bad
as it hid the fact that some documents were not replicated to the target
because either a VDU failed or one of the limits were hit.

This change preserves stats across job runs. Everytime active tasks is updated,
the stats object in rep record of each job in scheduler's ets table will be
updated asynchronously. On next job start the job will reinitialize from last
saved stats.

Relates somewhat to issue #1159
  • Loading branch information
nickva committed Nov 9, 2018
1 parent 370d931 commit 00b28c2
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 13 deletions.
3 changes: 2 additions & 1 deletion src/couch_replicator/src/couch_replicator.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
view = nil :: any() | '_',
doc_id :: any() | '_',
db_name = null :: null | binary() | '_',
start_time = {0, 0, 0} :: erlang:timestamp() | '_'
start_time = {0, 0, 0} :: erlang:timestamp() | '_',
stats = couch_replicator_stats:new() :: orddict:orddict() | '_'
}).

-type rep_id() :: {string(), string()}.
Expand Down
18 changes: 17 additions & 1 deletion src/couch_replicator/src/couch_replicator_scheduler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@
health_threshold/0,
jobs/0,
job/1,
restart_job/1
restart_job/1,
update_job_stats/2
]).

%% config_listener callbacks
Expand Down Expand Up @@ -215,6 +216,11 @@ restart_job(JobId) ->
end.


-spec update_job_stats(job_id(), term()) -> ok.
update_job_stats(JobId, Stats) ->
gen_server:cast(?MODULE, {update_job_stats, JobId, Stats}).


%% gen_server functions

init(_) ->
Expand Down Expand Up @@ -283,6 +289,16 @@ handle_cast({set_interval, Interval}, State) when is_integer(Interval),
couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]),
{noreply, State#state{interval = Interval}};

handle_cast({update_job_stats, JobId, Stats}, State) ->
case rep_state(JobId) of
nil ->
ok;
#rep{} = Rep ->
NewRep = Rep#rep{stats = Stats},
true = ets:update_element(?MODULE, JobId, {#job.rep, NewRep})
end,
{noreply, State};

handle_cast(UnexpectedMsg, State) ->
couch_log:error("~p: received un-expected cast ~p", [?MODULE, UnexpectedMsg]),
{noreply, State}.
Expand Down
22 changes: 11 additions & 11 deletions src/couch_replicator/src/couch_replicator_scheduler_job.erl
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
source_name = SourceName,
target_name = TargetName,
start_seq = {_Ts, StartSeq},
committed_seq = {_, CommittedSeq},
highest_seq_done = {_, HighestSeq},
checkpoint_interval = CheckpointInterval
} = State = init_state(Rep),
Expand Down Expand Up @@ -159,16 +158,9 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
{source, ?l2b(SourceName)},
{target, ?l2b(TargetName)},
{continuous, get_value(continuous, Options, false)},
{revisions_checked, 0},
{missing_revisions_found, 0},
{docs_read, 0},
{docs_written, 0},
{changes_pending, get_pending_count(State)},
{doc_write_failures, 0},
{source_seq, HighestSeq},
{checkpointed_source_seq, CommittedSeq},
{checkpoint_interval, CheckpointInterval}
]),
] ++ rep_stats(State)),
couch_task_status:set_update_frequency(1000),

% Until OTP R14B03:
Expand Down Expand Up @@ -582,7 +574,8 @@ init_state(Rep) ->
source = Src0, target = Tgt,
options = Options, user_ctx = UserCtx,
type = Type, view = View,
start_time = StartTime
start_time = StartTime,
stats = Stats
} = Rep,
% Adjust minimum number of http source connections to 2 to avoid deadlock
Src = adjust_maxconn(Src0, BaseId),
Expand Down Expand Up @@ -631,7 +624,8 @@ init_state(Rep) ->
checkpoint_interval = get_value(checkpoint_interval, Options,
?DEFAULT_CHECKPOINT_INTERVAL),
type = Type,
view = View
view = View,
stats = Stats
},
State#rep_state{timer = start_timer(State)}.

Expand Down Expand Up @@ -983,13 +977,19 @@ update_task(State) ->
current_through_seq = {_, ThroughSeq},
highest_seq_done = {_, HighestSeq}
} = State,
update_scheduler_job_stats(State),
couch_task_status:update(
rep_stats(State) ++ [
{source_seq, HighestSeq},
{through_seq, ThroughSeq}
]).


update_scheduler_job_stats(#rep_state{rep_details = Rep, stats = Stats}) ->
JobId = Rep#rep.id,
couch_replicator_scheduler:update_job_stats(JobId, Stats).


rep_stats(State) ->
#rep_state{
committed_seq = {_, CommittedSeq},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% https://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_replicator_retain_stats_between_job_runs).

-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_replicator/src/couch_replicator.hrl").

-define(DELAY, 500).
-define(TIMEOUT, 60000).
-define(i2l(I), integer_to_list(I)).
-define(io2b(Io), iolist_to_binary(Io)).


setup() ->
Ctx = test_util:start_couch([couch_replicator]),
Source = setup_db(),
Target = setup_db(),
{Ctx, {Source, Target}}.


teardown({Ctx, {Source, Target}}) ->
teardown_db(Source),
teardown_db(Target),
ok = application:stop(couch_replicator),
ok = test_util:stop_couch(Ctx).


stats_retained_test_() ->
{
setup,
fun setup/0,
fun teardown/1,
fun t_stats_retained/1
}.


t_stats_retained({_Ctx, {Source, Target}}) ->
?_test(begin
populate_db(Source, 42),
{ok, RepPid, RepId} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
check_active_tasks(42, 42),
reschedule_job(RepPid),
check_active_tasks(42, 42),
couch_replicator_scheduler:remove_job(RepId)
end).


setup_db() ->
DbName = ?tempdb(),
{ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
ok = couch_db:close(Db),
DbName.


teardown_db(DbName) ->
ok = couch_server:delete(DbName, [?ADMIN_CTX]),
ok.


reschedule_job(RepPid) ->
Ref = erlang:monitor(process, RepPid),
gen_server:cast(couch_replicator_scheduler, {set_max_jobs, 0}),
couch_replicator_scheduler:reschedule(),
receive
{'DOWN', Ref, _, _, _} -> ok
after ?TIMEOUT ->
erlang:error(timeout)
end,
gen_server:cast(couch_replicator_scheduler, {set_max_jobs, 500}),
couch_replicator_scheduler:reschedule().


check_active_tasks(DocsRead, DocsWritten) ->
RepTask = wait_for_task_status(),
?assertNotEqual(timeout, RepTask),
?assertEqual(DocsRead, couch_util:get_value(docs_read, RepTask)),
?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)).


replication_tasks() ->
lists:filter(fun(P) ->
couch_util:get_value(type, P) =:= replication
end, couch_task_status:all()).


wait_for_task_status() ->
test_util:wait(fun() ->
case replication_tasks() of
[] -> wait;
[RepTask] -> RepTask
end
end).


populate_db(DbName, DocCount) ->
{ok, Db} = couch_db:open_int(DbName, []),
Docs = lists:foldl(
fun(DocIdCounter, Acc) ->
Id = ?io2b(["doc", ?i2l(DocIdCounter)]),
Doc = #doc{id = Id, body = {[]}},
[Doc | Acc]
end,
[], lists:seq(1, DocCount)),
{ok, _} = couch_db:update_docs(Db, Docs, []),
ok = couch_db:close(Db).


wait_target_in_sync(Source, Target) ->
{ok, SourceDb} = couch_db:open_int(Source, []),
{ok, SourceInfo} = couch_db:get_db_info(SourceDb),
ok = couch_db:close(SourceDb),
SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
wait_target_in_sync_loop(SourceDocCount, Target, 300).


wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
erlang:error({assertion_failed, [
{module, ?MODULE}, {line, ?LINE},
{reason, "Could not get source and target databases in sync"}
]});

wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
{ok, Target} = couch_db:open_int(TargetName, []),
{ok, TargetInfo} = couch_db:get_db_info(Target),
ok = couch_db:close(Target),
TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
case TargetDocCount == DocCount of
true ->
true;
false ->
ok = timer:sleep(?DELAY),
wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
end.


replicate(Source, Target) ->
SrcUrl = couch_replicator_test_helper:db_url(Source),
TgtUrl = couch_replicator_test_helper:db_url(Target),
RepObject = {[
{<<"source">>, SrcUrl},
{<<"target">>, TgtUrl},
{<<"continuous">>, true}
]},
{ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
ok = couch_replicator_scheduler:add_job(Rep),
couch_replicator_scheduler:reschedule(),
Pid = couch_replicator_test_helper:get_pid(Rep#rep.id),
{ok, Pid, Rep#rep.id}.

0 comments on commit 00b28c2

Please sign in to comment.