Skip to content

Commit

Permalink
Don't double-encode changes sequence strings in the replicator
Browse files Browse the repository at this point in the history
Previously we always encoded the sequence as json. In case of a strings, the
most common case, this ended up with something like `"\"1-gA..\""`. Then, the
endpoints would strip out and ignore the extra `"`. So, avoid sending the extra
bytes just so they can be striped out in the end anyway. This should make the
logs look a bit cleaner too.

Integers or any other sequences are still json-encoded. Integers will still
look like `?since=123` and, in the unlikely we case we replicate with a
BigCouch 0.4 era endpoint, the `[SeqNum, OpaqueString]` should be properly
encoded as before and replications should work.

This also fixes a minor annoyance when the `_scheduler/{jobs,docs}` results
returning the default start sequence as `0` (an integer) even though in the
majority of case they'd almost always turn into a string. After this we
consistently return a string as it would be passed in the `_changes?since=...`
parameter.
  • Loading branch information
nickva committed Sep 17, 2022
1 parent e560508 commit 21eebad
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 9 deletions.
6 changes: 4 additions & 2 deletions src/couch_replicator/src/couch_replicator_api_wrap.erl
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ get_pending_count(#httpdb{} = Db, Seq) when is_number(Seq) ->
end
end);
get_pending_count(#httpdb{} = Db, Seq) ->
Options = [{path, "_changes"}, {qs, [{"since", ?JSON_ENCODE(Seq)}, {"limit", "0"}]}],
EncodedSeq = couch_replicator_utils:seq_encode(Seq),
Options = [{path, "_changes"}, {qs, [{"since", EncodedSeq}, {"limit", "0"}]}],
send_req(Db, Options, fun(200, _, {Props}) ->
{ok, couch_util:get_value(<<"pending">>, Props, null)}
end).
Expand Down Expand Up @@ -539,6 +540,7 @@ changes_since(
Options
) ->
Timeout = erlang:max(1000, InactiveTimeout div 3),
EncodedSeq = couch_replicator_utils:seq_encode(StartSeq),
BaseQArgs =
case get_value(continuous, Options, false) of
false ->
Expand All @@ -548,7 +550,7 @@ changes_since(
end ++
[
{"style", atom_to_list(Style)},
{"since", ?JSON_ENCODE(StartSeq)},
{"since", EncodedSeq},
{"timeout", integer_to_list(Timeout)}
],
DocIds = get_value(doc_ids, Options),
Expand Down
11 changes: 7 additions & 4 deletions src/couch_replicator/src/couch_replicator_scheduler_job.erl
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx = UserCtx} = Rep) -
{source, ?l2b(SourceName)},
{target, ?l2b(TargetName)},
{continuous, get_value(continuous, Options, false)},
{source_seq, HighestSeq},
{source_seq, seq_encode(HighestSeq)},
{checkpoint_interval, CheckpointInterval}
] ++ rep_stats(State)
),
Expand Down Expand Up @@ -1034,6 +1034,9 @@ get_pending_count_int(#rep_state{source = Db} = St) ->
{ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq),
Pending.

seq_encode(Seq) ->
couch_replicator_utils:seq_encode(Seq).

update_task(State) ->
#rep_state{
rep_details = #rep{id = JobId},
Expand All @@ -1043,8 +1046,8 @@ update_task(State) ->
Status =
rep_stats(State) ++
[
{source_seq, HighestSeq},
{through_seq, ThroughSeq}
{source_seq, seq_encode(HighestSeq)},
{through_seq, seq_encode(ThroughSeq)}
],
couch_replicator_scheduler:update_job_stats(JobId, Status),
couch_task_status:update(Status).
Expand All @@ -1063,7 +1066,7 @@ rep_stats(State) ->
{doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
{bulk_get_docs, couch_replicator_stats:bulk_get_docs(Stats)},
{bulk_get_attempts, couch_replicator_stats:bulk_get_attempts(Stats)},
{checkpointed_source_seq, CommittedSeq}
{checkpointed_source_seq, seq_encode(CommittedSeq)}
].

replication_start_error({unauthorized, DbUri}) ->
Expand Down
12 changes: 11 additions & 1 deletion src/couch_replicator/src/couch_replicator_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
ejson_state_info/1,
get_basic_auth_creds/1,
remove_basic_auth_creds/1,
normalize_basic_auth/1
normalize_basic_auth/1,
seq_encode/1
]).

-include_lib("ibrowse/include/ibrowse.hrl").
Expand Down Expand Up @@ -270,6 +271,15 @@ normalize_basic_auth(#httpdb{} = HttpDb) ->
end,
set_basic_auth_creds(User, Pass, HttpDb1).

seq_encode(Seq) when is_binary(Seq) ->
% Don't encode a string, we already got it encoded from the changes feed
Seq;
seq_encode(Seq) ->
% This could be either an integer sequence from CouchDB 1.x, a
% [Seq, Opaque] json array from BigCouch 0.4, or any other json
% object. We are being maximally compatible here.
?JSON_ENCODE(Seq).

-ifdef(TEST).

-include_lib("eunit/include/eunit.hrl").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
?assert(is_integer(couch_util:get_value(doc_write_failures, RepTask))),
?assert(is_integer(couch_util:get_value(revisions_checked, RepTask))),
?assert(is_integer(couch_util:get_value(missing_revisions_found, RepTask))),
?assert(is_integer(couch_util:get_value(checkpointed_source_seq, RepTask))),
?assert(is_integer(couch_util:get_value(source_seq, RepTask))),
?assert(is_binary(couch_util:get_value(checkpointed_source_seq, RepTask))),
?assert(is_binary(couch_util:get_value(source_seq, RepTask))),
Pending = couch_util:get_value(changes_pending, RepTask),
?assert(is_integer(Pending)).

Expand Down

0 comments on commit 21eebad

Please sign in to comment.