From 21eebad0fb6ea62786d915b29797983be537908a Mon Sep 17 00:00:00 2001 From: Nick Vatamaniuc Date: Fri, 16 Sep 2022 11:21:10 -0400 Subject: [PATCH] Don't double-encode changes sequence strings in the replicator Previously we always encoded the sequence as json. In case of a strings, the most common case, this ended up with something like `"\"1-gA..\""`. Then, the endpoints would strip out and ignore the extra `"`. So, avoid sending the extra bytes just so they can be striped out in the end anyway. This should make the logs look a bit cleaner too. Integers or any other sequences are still json-encoded. Integers will still look like `?since=123` and, in the unlikely we case we replicate with a BigCouch 0.4 era endpoint, the `[SeqNum, OpaqueString]` should be properly encoded as before and replications should work. This also fixes a minor annoyance when the `_scheduler/{jobs,docs}` results returning the default start sequence as `0` (an integer) even though in the majority of case they'd almost always turn into a string. After this we consistently return a string as it would be passed in the `_changes?since=...` parameter. --- .../src/couch_replicator_api_wrap.erl | 6 ++++-- .../src/couch_replicator_scheduler_job.erl | 11 +++++++---- src/couch_replicator/src/couch_replicator_utils.erl | 12 +++++++++++- .../test/eunit/couch_replicator_compact_tests.erl | 4 ++-- 4 files changed, 24 insertions(+), 9 deletions(-) diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl index a6e39cb023d..a44a79da1da 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -148,7 +148,8 @@ get_pending_count(#httpdb{} = Db, Seq) when is_number(Seq) -> end end); get_pending_count(#httpdb{} = Db, Seq) -> - Options = [{path, "_changes"}, {qs, [{"since", ?JSON_ENCODE(Seq)}, {"limit", "0"}]}], + EncodedSeq = couch_replicator_utils:seq_encode(Seq), + Options = [{path, "_changes"}, {qs, [{"since", EncodedSeq}, {"limit", "0"}]}], send_req(Db, Options, fun(200, _, {Props}) -> {ok, couch_util:get_value(<<"pending">>, Props, null)} end). @@ -539,6 +540,7 @@ changes_since( Options ) -> Timeout = erlang:max(1000, InactiveTimeout div 3), + EncodedSeq = couch_replicator_utils:seq_encode(StartSeq), BaseQArgs = case get_value(continuous, Options, false) of false -> @@ -548,7 +550,7 @@ changes_since( end ++ [ {"style", atom_to_list(Style)}, - {"since", ?JSON_ENCODE(StartSeq)}, + {"since", EncodedSeq}, {"timeout", integer_to_list(Timeout)} ], DocIds = get_value(doc_ids, Options), diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index e06a1ffea18..1ba933a5e2b 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -154,7 +154,7 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx = UserCtx} = Rep) - {source, ?l2b(SourceName)}, {target, ?l2b(TargetName)}, {continuous, get_value(continuous, Options, false)}, - {source_seq, HighestSeq}, + {source_seq, seq_encode(HighestSeq)}, {checkpoint_interval, CheckpointInterval} ] ++ rep_stats(State) ), @@ -1034,6 +1034,9 @@ get_pending_count_int(#rep_state{source = Db} = St) -> {ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq), Pending. +seq_encode(Seq) -> + couch_replicator_utils:seq_encode(Seq). + update_task(State) -> #rep_state{ rep_details = #rep{id = JobId}, @@ -1043,8 +1046,8 @@ update_task(State) -> Status = rep_stats(State) ++ [ - {source_seq, HighestSeq}, - {through_seq, ThroughSeq} + {source_seq, seq_encode(HighestSeq)}, + {through_seq, seq_encode(ThroughSeq)} ], couch_replicator_scheduler:update_job_stats(JobId, Status), couch_task_status:update(Status). @@ -1063,7 +1066,7 @@ rep_stats(State) -> {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)}, {bulk_get_docs, couch_replicator_stats:bulk_get_docs(Stats)}, {bulk_get_attempts, couch_replicator_stats:bulk_get_attempts(Stats)}, - {checkpointed_source_seq, CommittedSeq} + {checkpointed_source_seq, seq_encode(CommittedSeq)} ]. replication_start_error({unauthorized, DbUri}) -> diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl index c9cfac62fc1..e4a2cd12f0e 100644 --- a/src/couch_replicator/src/couch_replicator_utils.erl +++ b/src/couch_replicator/src/couch_replicator_utils.erl @@ -27,7 +27,8 @@ ejson_state_info/1, get_basic_auth_creds/1, remove_basic_auth_creds/1, - normalize_basic_auth/1 + normalize_basic_auth/1, + seq_encode/1 ]). -include_lib("ibrowse/include/ibrowse.hrl"). @@ -270,6 +271,15 @@ normalize_basic_auth(#httpdb{} = HttpDb) -> end, set_basic_auth_creds(User, Pass, HttpDb1). +seq_encode(Seq) when is_binary(Seq) -> + % Don't encode a string, we already got it encoded from the changes feed + Seq; +seq_encode(Seq) -> + % This could be either an integer sequence from CouchDB 1.x, a + % [Seq, Opaque] json array from BigCouch 0.4, or any other json + % object. We are being maximally compatible here. + ?JSON_ENCODE(Seq). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl index 1f241c7539d..373bd02ba22 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl @@ -93,8 +93,8 @@ check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) -> ?assert(is_integer(couch_util:get_value(doc_write_failures, RepTask))), ?assert(is_integer(couch_util:get_value(revisions_checked, RepTask))), ?assert(is_integer(couch_util:get_value(missing_revisions_found, RepTask))), - ?assert(is_integer(couch_util:get_value(checkpointed_source_seq, RepTask))), - ?assert(is_integer(couch_util:get_value(source_seq, RepTask))), + ?assert(is_binary(couch_util:get_value(checkpointed_source_seq, RepTask))), + ?assert(is_binary(couch_util:get_value(source_seq, RepTask))), Pending = couch_util:get_value(changes_pending, RepTask), ?assert(is_integer(Pending)).