Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduling Replicator #470

Merged
merged 9 commits into from
Apr 28, 2017
Merged

Scheduling Replicator #470

merged 9 commits into from
Apr 28, 2017

Conversation

nickva
Copy link
Contributor

@nickva nickva commented Apr 4, 2017

Introduce Scheduling CouchDB Replicator

Jira: https://issues.apache.org/jira/browse/COUCHDB-3324

The core of the new replicator is a scheduler. It which allows running a large number of replication jobs by switching between them, stopping some and starting others periodically. Jobs which fail are backed off exponentially. There is also an improved inspection and querying API: _scheduler/jobs and _scheduler/docs.

Replication protocol hasn't change so it is possible to replicate between CouchDB 1.x, 2.x, PouchDB, and other implementations of CouchDB replication protocol.

Scheduler

Scheduler allows running a large number of replication jobs. Tested with up to 100k replication jobs in 3 node cluster. Replication jobs are run in a fair, round-robin fashion. Scheduler behavior can be configured by these configuration options in [replicator] sections:

  • max_jobs : Number of actively running replications. Making this too high
    could cause performance issues. Making it too low could mean replications jobs
    might not have enough time to make progress before getting unscheduled again.
    This parameter can be adjusted at runtime and will take effect during next
    rescheduling cycle.

  • interval : Scheduling interval in milliseconds. During each reschedule
    cycle scheduler might start or stop up to "max_churn" number of jobs.

  • max_churn : Maximum number of replications to start and stop during
    rescheduling. This parameter along with "interval" defines the rate of job
    replacement. During startup, however a much larger number of jobs could be
    started (up to max_jobs) in a short period of time.

_scheduler/{jobs,docs} API

There is an improved replication state querying API, with a focus on ease of use and performance. The new API avoids having to update the replication document with transient state updates. In production that can lead to conflicts and performance issues. The two new APIs are:

  • _scheduler/jobs : This endpoint shows active replication jobs. These are jobs managed by the scheduler. Some of them might be running, some might be waiting to run, or backed off (penalized) because they crashed too many times. Semantically this is somewhat equivalent to _active_tasks but focuses only on replications. Jobs which have completed or which were never created because of malformed replication document will not be shown here as they are not managed by the scheduler.

  • _scheduler/docs : This endpoint is an improvement on having to go back and re-read replication document to query their state. It represents the state of all the replications started from documents in _replicator dbs. Unlike _scheduler/jobs it will also show jobs which have failed or completed (that is, which are not managed by the scheduler anymore).

Compatibility Mode

Understandably some customers are using the document-based API to query replication states (triggered, error, completed etc). To ease the upgrade path, there is a compatibility configuration setting:

[replicator]
update_docs = false | true

It defaults to false but when set to true it will continue updating replication document with the state of the replication jobs.

Other Improvements

  • Network resource usage and performance was improved by implementing a common connection pool. This should help in cases of a large number of connections to the same sources or target. Previously connection pools were shared only withing a single replication job.

  • Improved rate limiting handling. Replicator requests will auto-discover rate limit capacity on targets and sources based on a proven Additive Increase / Multiplicative Decrease feedback control algorithm.

  • Improved performance by avoiding repeatedly retrying failing replication jobs. Instead use exponential backoff. In a large multi-user cluster, quite a few replication jobs are invalid, are crashing or failing (for various reasons such as inability to checkpoint to source, mismatched credentials, missing databases). Penalizing failing replication will free up system resources for more useful work.

  • Improved recovery from long but temporary network failure. Currently if replications jobs fail to start 10 times in a row, they will not be retried anymore. This is sometimes desirable, but in some scenarios (for example, after a sustained DNS failure which eventually recovers), replications reach their retry limit and cease to work. Previously it required operator intervention to continue. Scheduling replicator will never give up retrying a valid scheduled replication job. So it should recover automatically.

  • Better handling of filtered replications: Failing to fetch filters could block couch replicator manager, lead to message queue backups and memory exhaustion. Also, when replication filter code changes update replication accordingly (replication job ID should change in that case). This PR fixes both of those issues. Filters and replication ID calculation are managed in separate worker threads (in couch replicator doc processor). Also, when filter code changes on the source, replication jobs will restart and re-calculate their new ID automatically.

Related PR:

Documentation PR: apache/couchdb-documentation#123

Tests

  • EUnit test coverage. Some modules such as multidb_changes, have close to 100% coverage. Some have less. All previous replication tests have been updated to work with the new scheduling replicator.

  • Except for one modification, existing Javascript tests pass.

  • Additional integration tests. To validate, test and benchmark some edge cases, an additional toolkit was created: https://github.com/cloudant-labs/couchdyno/blob/master/README_tests.md which allows testing scenarios where nodes fail, creation of large number of replication jobs, manipulation of cluster configurations, and setting up long running (soak) tests.

@nickva nickva changed the title 63012 scheduler Scheduling Replicator Apr 5, 2017
@nickva nickva force-pushed the 63012-scheduler branch 3 times, most recently from c2d381c to f6658fc Compare April 5, 2017 19:18
@nickva
Copy link
Contributor Author

nickva commented Apr 6, 2017

Some additional information from testing how new scheduling replicator handles a large amount of jobs.

This is a graph of adding 1M replication jobs to a cluster of 3 nodes. Replication jobs were added using couchdyno/rep tool : https://github.com/cloudant-labs/couchdyno/blob/master/README_rep.md

rep.replicate_all_and_compare(n=1000, num=1, normal=False) 

That creates a connected cluster of replications for for n=1000, it would create 1000*1000=1M replication jobs.

db_changes

couch_replicator.docs.db_changes tracks the total number of changes seen by the replicator. In this case the replicator has seen the 1M replication documents created above.

jobs_total

couch_replicator.jobs.total tracks the number of replication jobs managed by the scheduler. Out of the 1M jobs, each node in the cluster picked up about 330K each.

scheduler

couch_replicator.jobs.starts counts the number of jobs which have been started by the scheduler. In this case a time derivative was applied. So it shows how the scheduler interval works. Every time it runs it start another 20 jobs (this is specified by the max_churn parameter).

@davisp
Copy link
Member

davisp commented Apr 7, 2017

Hooray scheduling replicator! This looks pretty awesome. Its quite big so I'm gonna split my review into roughly three parts: concerns, things that look like bugs, and then style issues. Commencing in 3, 2, 1, now!

Copy link
Member

@davisp davisp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not going to be overly specific on how this change is handled but we will need to do something here or we risk opening a security hole if requests are re-used between replications.

handle_call({acquire, URL}, From, State) ->
{Pid, _Ref} = From,
case ibrowse_lib:parse_url(URL) of
#url{host=Host, port=Port} ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the most worrisome part of the whole PR for me. We're starting HTTP connections outside of this pool and then parsing the URLs after the fact. And specifically, we're not checking here that the URLs we're provided don't have a username/password set. If we did get one with that and assuming that ibrowse caches the credentials in the worker then its possible we're leaking authentication credentials across replications.

We should very much insist that all workers do not have authentication cached and then make sure we provide it for each request individually. And we should also assert that ibrowse isn't caching credentials between requests as well.

Copy link
Contributor Author

@nickva nickva Apr 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davisp Good point. I think parsing url outside the gen_server and calling acquire with host, port tuple might work?

@sagelywizard Want to take a look at this one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And ensuring that we're not getting a username/password passed to ibrowse and then verifying and/or somehow asserting that ibrowse hasn't cached a username/password after it was used in a request are the more important bits to me.

Copy link
Contributor Author

@nickva nickva Apr 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a worker is released I don't see the credentials in it anymore

Here is a state of relinquished ibrowse_http_client:init/1 worker

{state,"localhost",15984,infinity,#Ref<0.0.2.22525>,false,undefined,undefined,
       undefined,undefined,undefined,[],false,#Port<0.32462>,false,[],
       {[],[]},
       undefined,idle,undefined,
       <<>>,
       0,0,[],undefined,undefined,undefined,undefined,false,undefined,
       undefined,
       <<>>,
       undefined,false,undefined,0,undefined}

While it was making a request it had adm:pass in there. In other words it doesn't seem like credentials are cached in the worker process after it is released.

Dictionary had these fields:

my_trace_flag: false
ibrowse_trace_token: ["localhost",58,"15984"]
http_prot_vsn: "HTTP/1.1"
conn_close:"false"
'$initial_call': {ibrowse_http_client,init,1}
'$ancestors': [<0.23517.1>,0.23508.1>,couch_replicator_scheduler_sup,couch_replicator_sup,
 <0.369.0>]

So I think we might be ok here. It basically has the same issue as current code where worker crashing could spill its credentials in the log sometimes.

Also notice ibrowse client doesn't store the password on worker spawn_link just keep host, port and protocol around:

https://github.com/apache/couchdb-ibrowse/blob/master/src/ibrowse_http_client.erl#L115

So we might actually be ok here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, looks like ibrowse is good there. For sake of completeness though when we parse the URL in couch_replicator_connection lets use a wrapper function that specifically drops the user/pass pair out of the record to be clear that we're not passing credentials to the worker.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm satisfied that this isn't an issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although it might be an issue in the case ibrowse is updated latter on (or we replace http client). How hard it would be to write a test case which would check that released process do not have leftover data? If we would have a test then we would be able to catch the problem if ibrowse would start to leak data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iilyak @davisp Added connection pool tests which check that worker processes don't hold on to credentials:

worker_discards_creds_on_create({Host, Port}) ->
?_test(begin
{User, Pass, B64Auth} = user_pass(),
URL = "http:https://" ++ User ++ ":" ++ Pass ++ "@" ++ Host ++ ":" ++ Port,
{ok, WPid} = couch_replicator_connection:acquire(URL),
Internals = worker_internals(WPid),
?assert(string:str(Internals, B64Auth) =:= 0),
?assert(string:str(Internals, Pass) =:= 0)
end).
worker_discards_url_creds_after_request({Host, _}) ->
?_test(begin
{User, Pass, B64Auth} = user_pass(),
{Port, ServerPid} = server(),
PortStr = integer_to_list(Port),
URL = "http:https://" ++ User ++ ":" ++ Pass ++ "@" ++ Host ++ ":" ++ PortStr,
{ok, WPid} = couch_replicator_connection:acquire(URL),
?assertMatch({ok, "200", _, _}, send_req(WPid, URL, [], [])),
Internals = worker_internals(WPid),
?assert(string:str(Internals, B64Auth) =:= 0),
?assert(string:str(Internals, Pass) =:= 0),
couch_replicator_connection:release(WPid),
unlink(ServerPid),
exit(ServerPid, kill)
end).
worker_discards_creds_in_headers_after_request({Host, _}) ->
?_test(begin
{_User, Pass, B64Auth} = user_pass(),
{Port, ServerPid} = server(),
PortStr = integer_to_list(Port),
URL = "http:https://" ++ Host ++ ":" ++ PortStr,
{ok, WPid} = couch_replicator_connection:acquire(URL),
Headers = [{"Authorization", "Basic " ++ B64Auth}],
?assertMatch({ok, "200", _, _}, send_req(WPid, URL, Headers, [])),
Internals = worker_internals(WPid),
?assert(string:str(Internals, B64Auth) =:= 0),
?assert(string:str(Internals, Pass) =:= 0),
couch_replicator_connection:release(WPid),
unlink(ServerPid),
exit(ServerPid, kill)
end).
worker_discards_proxy_creds_after_request({Host, _}) ->
?_test(begin
{User, Pass, B64Auth} = user_pass(),
{Port, ServerPid} = server(),
PortStr = integer_to_list(Port),
URL = "http:https://" ++ Host ++ ":" ++ PortStr,
{ok, WPid} = couch_replicator_connection:acquire(URL),
Opts = [
{proxy_host, Host},
{proxy_port, Port},
{proxy_user, User},
{proxy_pass, Pass}
],
?assertMatch({ok, "200", _, _}, send_req(WPid, URL, [], Opts)),
Internals = worker_internals(WPid),
?assert(string:str(Internals, B64Auth) =:= 0),
?assert(string:str(Internals, Pass) =:= 0),
couch_replicator_connection:release(WPid),
unlink(ServerPid),
exit(ServerPid, kill)
end).

We check 4 cases:

  1. After worker creation
  2. After request if credentials were embedded in URL
  3. After request if credentials passed in an Authorization header
  4. After request if credentials were set as proxy parameters

The check is performed on the worker process's dictionary and its state. (The assumption here it is a gen_server, which holds for current ibrowse. If it stops being so tests will fail and we'll know).

ibrowse internally translates URL embedded user and password to basic authorization header, so we always check both the plaintext password and the base64-encoded version of "User:Pass" string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another vector where credentials could leak. The message queue of a worker process.
Here ibrowse sends full url (with credentials) to the worker process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yap, definitely.

Copy link
Member

@davisp davisp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple things that jumped out at me as possible bugs when reading the PR. Not sure on the question about age, the others seem like legit bugs though.

@@ -270,6 +270,9 @@ transfer_fields([{<<"_replication_state">>, _} = Field | Rest],
transfer_fields([{<<"_replication_state_time">>, _} = Field | Rest],
#doc{body=Fields} = Doc) ->
transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
transfer_fields([{<<"_replication_start_time">>, _} = Field | Rest],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we add this or was it just always missing? Adding _ fields to docs will lead to oddness with the replicator when replicating to versions of couch that don't allow this field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting. It's the case of replicator the _replicator db itself right?

Anyone have any thoughts on this one @sagelywizard @rnewson ?

Add a note that replicating the _replicator from CouchDB 3.0 to <3.0 is not supported if it has completed replications in the source db?

Use a non-underscored version? replication_start_time. But that's inconsistent a bit with other ones.

Remove this field? But then _scheduler/docs docs output will be inconsistent since for completed replications we read that from the view on disk. Dashboard code will have to be modified as well perhaps.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its the backwards replication that hurts yeah. I wanted to add a new _ field for quorum info once but it got shot down cause it'd break replication with every CouchDB instance older than when it was added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Paul had a good idea. Remove the _replication_start_time and for failure case just use state time as start time. And for completed add start time (or rather duration) to _replication_stats.

@garrensmith Will this this affect Fauxton code ^ ?

Running0 = running_jobs(),
ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end,
Running1 = lists:filter(ContinuousPred, Running0),
Running2 = lists:sort(fun oldest_job_first/2, Running1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is just confusion, but it seems odd that we're prefering to insert old jobs in pending_maybe_replace/2, but preferring to remove the oldest job when we stop a subset. I'm going to run on the assumption that the use of "oldest" means two different things here. In pending_maybe_replace I'm going to assume its time since last activity, and when here its "been active the longest". Assuming that's the case we may want to rename things.

Reading it as is, it sounds like we're stoping and starting the same set of jobs repeatedly and never actually progressing through the queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oldest_job_first/2 orders from oldest started to latest. In case of stop_jobs we look only at running jobs, then pick the ones that have been running the longest and stop those.

In pending_maybe_replace we are looking for stopped jobs and want to find which ones are candidates to start. So we need to find jobs which have been waiting the longest, to find those we look at job's last started timestamp. But we don't want to load them all into memory there could be a huge number of pending jobs. So we do an an ets fold.

In the fold function we pick only healthy pending jobs (pending_fold/2) and as we iterate through we try to keep only up to Count oldest jobs. As soon as we find a job that is older than the youngest one so far, we remove the youngest and insert the newly found one.

For example, let's say the gb_sets accumulator has [5, 7, 11] in there. 11 is the youngest one. Next iteration we find 6. So we bump 11 out and insert 6 in the set and end up with [5, 6, 7] and so on. In the end we might end up with [1,3,5] for example.

The confusion probably comes from calling the jobs with the largest timestamp the youngest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understood the rest of the logic just fine yeah. My concern was that either there's a bug or we're calling two different age related things the same name (ie, oldest for longest running and oldest for least recently ran). Since it was saying oldest for both it sounded like we weren't actually cycling through jobs.

At the very least we may want to change a couple names to make that more clear rather than let every person that ever reads this code have to figure it out again.

Copy link
Contributor Author

@nickva nickva Apr 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem with is with using old. It's overridden to mean two things. When looking at running jobs and finding which ones to stop. We can use longest_running instead of oldest_first.

And maybe explain in the comments in pending_maybe_replace what we mean by old and young.

ok = update_state_started(Job, Child, Ref, State),
couch_log:notice("~p: Job ~p started as ~p",
[?MODULE, Job#job.id, Child]);
{error, {already_started, OtherPid}} when node(OtherPid) =:= node ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly you mean node() and not the atom node here and in the next case statement right?

Also, am I missing where we're checking to see if a job exists on a different node somehow? I'm not seeing that logic anywhere (granted this is a fairly large PR I'm still processing).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug! 🐛 You're right, should be node()

Copy link
Member

@davisp davisp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those should be examples for all of the bigger style issues I saw. As mentioned I'm not gonna go through and nit pick every instance or it would get a bit overwhelming with such a large PR.

-export([start_link/4]).

-export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
-export([code_change/3, terminate/2]).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let the style review commence!

I'm only picking on this module because its the first new module in the PR's diff view though this applies to all new modules and even old ones where we're changing things in the front matter and moving functions around to some degree.

A module should have the following basic outline:

We should work on organizing our export lists so that diffs are easier to read in the future. In general, we should have three export attributes for: public api, behavior callbacks, private module callbacks that need to be exported. Where behavior exports will have one attribute per behavior.

Within an export we should have a single function per line. This way when these change the diff will be easier to read and it will be apparent on what's changing rather than our current status quo of sticking random functions into a module with randomly placed additions to the export attributes.

For this module these should look something like:

-export([
    start_link/4
]).

-export([
    init/1,
    terminate/2,
    handle_call/3,
    handle_cast/2,
    handle_info/2,
    code_change/3
]).

-export([
    changes_reader/3,
    changes_reader_cb/3
]).

Also, the functions in the module should follow that order and any module private functions should follow after those in some logical order (I tend to prefer the order they were referenced by the exported functions so that its easy to know about where they are when searching for their definition, but that can be trumped if some other logical ordering makes sense, i.e., for trivial functions that aren't referenced often because their intent/implementation is obvious can usually just go as the very last functions in the module before any tests).

Also a note on whitespace as there's quite a bit of variation throughout this PR. In general, there's a preference for two empty lines between sections and functions and a single empty line in places where it helps with readability. So, for instance, we're missing empty lines between the behavior attribute and the exports, and then between exports and includes, etc since these are different "sections". For the most part this PR uses two empty lines between functions but in some cases it only uses one or will even use three or four in some places. We'll want to normalize all of those.

Places where single empty lines are helpful are when a handle_call/handle_cast function for a gen_server has a number of large clauses. A single empty line between clauses can be helpful to read each individually. Also in export lists it can be useful to group different related functions into sections delimited by empty lines so that related functions are obvious in their relation to each other. Above all though, lets try and be consistent.

Also considering that a lot of these modules are new I'm being extra persnickety since these are greenfield and we're not doing things like requiring a style cleanup and then a change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I'm not going to go through and note every occurrence of these style issues as I care about everyone's inboxes. One of these days I'm gonna find time to write a tool or find one that works for us to use so that we can just have accepted truth for style rather than a doc hidden somewhere on a wiki.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some cleanups in this area. Restructured exports, fixed some > 80 line chars. I am sure didn't get all of them.

I tried using https://github.com/inaka/elvis with a custom style config. But the number of rules there are not enough and some are not useful. 80 char lies was probably the most helpful one, others I had to ignore to avoid generating noise.

I think we need a customized emacs-based reformatter thing like this: https://github.com/fenollp/erlang-formatter.

Also, some of the code was copied from old modules, for example couch_scheduler_scheduler_job is mostly the old couch_replicator gen_server I skipped some of those to not make the diff even larger.

We'd probably do a reformatting pass-only as a separate set of PRs and such. But it would be good to have an automated formatter-thing first.

since = Since,
feed = "normal",
timeout = infinity
}, {json_req, null}, Db),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor Nit: I'd pull the #changes_args{} out and assign it to a variable and then call handle_db_changes/3 on a single line. Generally we have three patterns for formatting a function call:

  1. all on one line
  2. all args and closing paren and comma on new line indented twice more
  3. each arg on a separate line, indented twice, closing parent/comma on separate line indented once

The idea here being that we want to communicate the sections and make things easier on eyeballs.

DbName = couch_util:get_value(<<"id">>, Change),
case DbName of <<"_design/", _/binary>> -> ok; _Else ->
case couch_replicator_utils:is_deleted(Change) of
true ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Case statement patterns should be in one indent.

% Convenience function for gen_servers to subscribe to {cluster, stable} and
% {cluster, unstable} events from couch_replicator clustering module.
-spec link_cluster_event_listener(pid()) -> pid().
link_cluster_event_listener(GenServer) when is_pid(GenServer) ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The specificity of this to gen_server is odd. Normally I would go for something like:

link_clustr_event_listener(Mod, Fun, Args) -> ...

Which would then end up invoking something like erlang:apply(Mod, Fun, Args ++ [Event])

-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([code_change/3, terminate/2]).

-export([acquire/1, relinquish/1]).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relinquish is a funny verb here. Usually its acquire/release for resource handling.

@@ -31,8 +31,7 @@
-record(state, {
url,
limit, % max # of workers allowed
free = [], % free workers (connections)
busy = [], % busy workers (connections)
conns = [],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its called conns but stores variables named Worker which was hard to keep straight in my head.

-include("couch_replicator_api_wrap.hrl").
-include("couch_replicator.hrl").

-import(couch_util, [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imports are the devil and we should take every opportunity to remove them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👿 agree, not sure what I was thinking

Job = #job{
id = Rep#rep.id,
rep = Rep,
history = [{added, os:timestamp()}]},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The closing bracket of the record should be on its own newline.



format_status(_Opt, [_PDict, State]) ->
[{max_jobs, State#state.max_jobs},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just Say No to Visual Indents!

stop_excess_jobs(State, Running) ->
#state{max_jobs=MaxJobs} = State,
StopCount = Running - MaxJobs,
if StopCount > 0 ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A space saver pattern is to flip your logic and move the true branch up:

if StopCont =< 0 -> ok; true ->
    stuff...
end,


relinquish(Worker) ->
unlink(Worker),
gen_server:cast(?MODULE, {relinquish, Worker}).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unlink cast pattern seems suspicious to me. Though I guess the gen_server links to everything and monitors clients so theoretically there's no hole in the dance here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yap. We are unlinking from owner here. If worker dies after unlinking we'll monitor its death and find out.

{Pid, _Ref} = From,
case ibrowse_lib:parse_url(URL) of
#url{host=Host, port=Port} ->
case ets:match_object(?MODULE, #connection{host=Host, port=Port, mref=undefined, _='_'}, 1) of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I missed this in the style nits:

Just Say No to Lines Longer Than 80 Characters!


-spec interval(#state{}) -> non_neg_integer().
interval(#state{period = Period, start_period = Period0, start_time = T0}) ->
case now_diff_sec(T0) > Period of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another bug I spotted and forgot about, I'm pretty sure Period and Period0 are backwards here. Also, I really think we should probably rename the name bindings because obviously either me or the code is wrong and its hard to know which (though I'm pretty sure its the code :).

Copy link
Contributor Author

@nickva nickva Apr 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming start_period Period0 was silly. The logic also assumes that start period is a shorter than normal period.

So on startup we want to wait just a bit for nodes to connect and cluster to stabilize, but not for a whole minute. But if cluster has been up for a while, and we notice membership changes, chances are there are probably restarts happening so the wait is longer.

The logic there says, if time since startup is greater than a full period (minutes) then use normal period to wait. But if time since startup is short (seconds) assume we are in the start phase so just wait a few seconds before rescan.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohhhhhh. Yeah, that makes sense finally. We should add a comment then cause that's super not obvious to me at least. Something simple like:

% For the first Period seconds after node boot we check cluster stability every
% StartPeriod seconds. Once the initial Period seconds have passed we continue
% to monitor once every Period seconds.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference, my first read on that function, I thought the intent was to return StartPeriod during the first StartPeriod seconds and then Period once we'd been alive for more than StartPeriod. Ie, during StartPeriod we returned one interval, and after we return the other. As opposed to, for the first Period return StartPeriod, in second until infinity Period, return Period.


-spec is_stable(#state{}) -> boolean().
is_stable(#state{last_change = TS} = State) ->
now_diff_sec(TS) > interval(State).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is in seconds should we go with >=? Or not? I can't convince myself either way if equal to period is correct or not or harmless that its interval+1 in reality.

Copy link
Contributor Author

@nickva nickva Apr 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think > shows intent better, and below you noticed it's a float (with / operation)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

USec when USec < 0 ->
0;
USec when USec >= 0 ->
USec / 1000000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, though with my last comment, given that we use / instead of div it probably doesn't make a difference since it'll be interval+1usec rather than interval+1sec.

-spec now_msec() -> msec().
now_msec() ->
{Mega, Sec, Micro} = os:timestamp(),
((Mega * 1000000) + Sec) * 1000 + round(Micro / 1000).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think div would be more appropriate here? Or do we really specifically want rounding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Div will work. Good idea!


% Definitions

-define(SHARDS_N, 16).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I also wanted to ask why we're jumping straight to sharded ets tables? Is this something that we found to be a bottleneck or something that we're worried will be a bottleneck.

Also, regardless, we should probably pull this out into its own module somewhere so that the logic is separate. Also, there was a neat generational ets cache I saw somewhere if we're worried about deleting things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could handle all the replication connections in a larger cluster so seemed bottleneck-y to have all connection search through a single ets table for each request.

Pulling it to a separate module would be reasonable

delete_worker(Worker) ->
ets:delete(?MODULE, Worker#connection.worker),
unlink(Worker#connection.worker),
spawn(fun() -> ibrowse_http_client:stop(Worker#connection.worker) end),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, every Interval seconds, we go through a sweep through the ETS table and find worker processes which don't have a monitor reference for deletion. Is there ever a situation where lingering worker processes/connections aren't closed? I do see that that stop/1 does have a timeout, so if it's issued a stop, at some point the timeout will trigger and we'll kill it. If this couch_replicator_connection process crashes/restarts, would those worker processes stick around until some other time out? Just brainstorming here.

Copy link
Contributor Author

@nickva nickva Apr 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation. Yeah they are not closed immediately because then there is no chance fo reuse them and the connection pool won't serve its purpose. So connections stay around after they are used for some time ready to be picked up by some other owner. However we don't want them to stick around forever either, otherwise the pool will just keep growing. So we sweep through and close the idle connection.

Now ibrowse clients actually have a close on idle timeout as well (I think 90 sec or so). But I think this is a case of us not trusting that mechanism 100% and implementing our own cleanup.

pp_rep_id/1
]).

-define(WORKER_TIMEOUT_MSEC, 61000).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha any reason this is 61000?

Copy link
Contributor Author

@nickva nickva Apr 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because 30 seconds was a timeout for requests (if I remember correctly) and so after worker is spawned, we'd want the worker to get a chance to make a few requests (maybe one failing one and a retry) and then fail with its own error (timeout, network error), which would be more specific and informative, before it simply gets killed because of the timeout here. That is, if all fails and the worker is actually blocked then 61 sec is a safety net to brutally kill the worker so doesn't end up hung forever.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add your description as comment.

Copy link
Member

@davisp davisp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few minor nits. Tests look pretty good. Also I'm gonna review each module's tests independently cause there's quite a few of them.

meck:expect(couch_db, close, 1, ok),
mock_changes_reader(),
% create process to stand in for couch_ever_server
% mocking erlang:monitor doesn't, so give it real process to monitor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couch_event_server? Also doesn't (work) I assume?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. Yap, you're right. Will fix it

exit({Server, DbName})
end.

kill_mock_change_reader_and_get_its_args(Pid) ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, but you've got mock_changes_reader_loop and kill_mock_change_reader.*, would be nicer to keep the plurality constant for changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point

t_handle_info_other_event() ->
?_test(begin
State = mock_state(),
handle_info_check({'$couch_event', ?DBNAME, somethingelse}, State)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like assertions for not calling db_created|deleted|found would be useful here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Will do.

Copy link
Member

@davisp davisp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit suggestion.

Url1 = <<"http:https://adm:pass@host/db">>,
?assertEqual(<<"http:https://adm:*****@host/db/">>, strip_url_creds(Url1)),
Url2 = <<"https://adm:pass@host/db">>,
?assertEqual(<<"https://adm:*****@host/db/">>, strip_url_creds(Url2))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add tests for port and query strings as well.

gen_server:cast(Server, Event).


% Private helpers for multidb changes API, these updates into the doc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"these updates into" seems like its missing a word.

% exception which would indicate this document is malformed. This exception
% should propagate to db_change function and will be recorded as permanent
% failure in the document. User will have to delete and re-create the
% document to fix the problem.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete and recreate? Would an update not work just as well for some reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Comments is outdated. Update now works. Before had to do delete and recreate dance.

Row0#rdoc{rid = RepId, info = couch_util:to_binary(Msg)};
#rdoc{rid = nil} ->
% Calculated new replication id for non-filtered replication.
% Remove replication doc body, after this we won't needed any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"we won't needed" seems like its missing a word. And assuming its "we won't be needed" or something its not entirely clear what's not needed or what we're doing about it.

Wait = get_worker_wait(Doc),
Ref = make_ref(),
true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}),
couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to use ?MODULE here. Reviewing this I had a "wait, isn't that the module I'm reading?" moment and had to scroll up to double check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Different module couch_replicator_doc_processor_worker is the worker module that does the work of turning replication doc -> replication task running in scheduler bit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Hooray for long module names I guess.

-spec error_backoff(non_neg_integer()) -> seconds().
error_backoff(ErrCnt) ->
Exp = min(ErrCnt, ?ERROR_MAX_BACKOFF_EXPONENT),
random:uniform(64 bsl Exp).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The math here could use a comment.

filter_backoff() ->
Total = ets:info(?MODULE, size),
Range = 1 + min(2 * (Total / 10), ?TS_DAY_SEC),
60 + random:uniform(round(Range)).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The math here could use a comment.


% Change handled when cluster is unstable (nodes are added or removed), so
% job is not added. A rescan will be triggered soon and change will be
% evaluated again.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that process_change drops anything when the cluster is unstable, but I'm not following how it schedules a rescan to trigger this again and its not tested here. I'm assuming this is basically handled elsewhere but a note in the comment on how that rescan works would be useful.

Copy link
Contributor Author

@nickva nickva Apr 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a notify_cluster_event callback. That gets a {cluster, stable|unstable} message.

After cluster is unstable we ignore the changes. When it become stable again we get the {cluster, stable} message we do two things:

  • In couch_replicator_doc_processor on each node we stop all jobs which don't belong there anymore:
ets:foldl(fun cluster_membership_foldl/2, nil, ?MODULE),
  • And the full rescan is triggered in the couch_replicator_db_changes module. There when cluster is stable we restart the multidb_changes process and that rescans everything which adds jobs which should be running on this node.
restart_mdb_changes(stop_mdb_changes(State)).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotchya.

meck:expect(couch_replicator_clustering, owner, 2, different_node),
?assert(ets:member(?MODULE, {?DB, ?DOC1})),
gen_server:cast(?MODULE, {cluster, stable}),
timer:sleep(100),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than sleeping you could use meck:wait on ets:delete/2 or couch_replicator_scheduler:remove_job/1

Copy link
Member

@davisp davisp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like couch_replicator_rate_limiter could use some tests and if we could pass our own now_msecs() value it'd be pretty simple to calculate some test data to assert we're generating the correct intervals.

@redgeoff
Copy link
Contributor

Background: https://issues.apache.org/jira/browse/COUCHDB-3391

Summary: I tested the max_jobs config and it doesn't appear to be working as I would expect. Sorry, if I missed a detail.

Steps to reproduce:

  1. Pull and build Scheduling Replicator #470
  2. Clone https://github.com/redgeoff/couchdb2-replicator-not-scalable-vagrant. I used vagrant to set up my test, but you should be able to run the tests outside a VM as well.
  3. Modify couchdb2-replicator-not-scalable-vagrant/index.js and replace localhost with the correct dev DB host
  4. Set the config replicator.max_jobs=10
  5. Run npm run demo (this just runs index.js)

=> this results in Unhandled rejection Error: internal_server_error errors. If you were to modify https://github.com/redgeoff/couchdb2-replicator-not-scalable-vagrant/blob/master/index.js#L10 and use a smaller number, e.g. 10 then there would be no issue as you would not exhaust the file descriptors and db connections. My assumption is that the max_jobs config should throttle the replications and prevent the DB from throwing errors.

@nickva
Copy link
Contributor Author

nickva commented Apr 26, 2017

@redgeoff

Thanks for taking a look!

max_jobs parameter would limit the number of running replication jobs at one time. File descriptors can be exhausted by other things not just replication tasks. In this case it is probably the databases opened. Also note each db by default is split into 8 shards (the q value). So that starts to add up.

I gave your setup a try and got it to work with these settings:

(I didn't try to minimize them, you can probably tweak them to find minimal values)

FD limits:
$ ulimit -n 8192

Config settings:

couchdb.max_dbs_open = 3000
cluster.q=1
cluster.n=1
replicator.max_jobs = 10
replicator.worker_processes = 1

I put q=1, n=1 based on your use case in https://issues.apache.org/jira/browse/COUCHDB-3391

With all that set up. Started couch (I build a local scheduling replicator in Vagrant)

couchdb@ubuntu-xenial:~/couchdb$ ./bin/couchdb
[info] 2017-04-26T19:22:44.457066Z couchdb@localhost <0.7.0> -------- Application couch_log started on node couchdb@localhost
...

Created _users and _replicator dbs. I am not sure if your test does this. Here is how it looks:

couchdb@ubuntu-xenial:/vagrant$ http put http:https://localhost:5984/_users
HTTP/1.1 201 Created
...

{
    "ok": true
}

couchdb@ubuntu-xenial:/vagrant$ http put http:https://localhost:5984/_replicator
HTTP/1.1 201 Created
...

{
    "ok": true
}

Ran your demo:

npm run demo

> [email protected] demo /vagrant
> ./index.js

without any errors.

And finally showing off the states-based filtering:

$ http http:https://localhost:5984/_scheduler/docs?states=running
HTTP/1.1 200 OK
...

{
    "docs": [
        {
            "database": "_replicator",
            "doc_id": "7e801385699c0723d64ce2b684000378",
            "error_count": 0,
            "id": "2f691a8a3e337142691e7679e2c146a1+continuous",
            "info": null,
            "last_updated": "2017-04-26T19:14:07Z",
            "node": "couchdb@localhost",
            "proxy": null,
            "source": "http:https://localhost:5984/test_1493234045679_2_a/",
            "start_time": "2017-04-26T19:14:07Z",
            "state": "running",
            "target": "http:https://localhost:5984/test_1493234045679_2_b/"
        },
        {
            "database": "_replicator",
            "doc_id": "7e801385699c0723d64ce2b684001144",
            "error_count": 0,
            "id": "2f42b5ccbfb20e82d319054af8cb7828+continuous",
            "info": null,
            "last_updated": "2017-04-26T19:14:08Z",
            "node": "couchdb@localhost",
            "proxy": null,
            "source": "http:https://localhost:5984/test_1493234045679_6_a/",
            "start_time": "2017-04-26T19:14:08Z",
            "state": "running",
            "target": "http:https://localhost:5984/test_1493234045679_6_b/"
        },
      ...<8 more docs in here>...
    ],
    "offset": 0,
    "total_rows": 1000
}

Without any crashes

http http:https://localhost:5984/_scheduler/docs?states=crashing
HTTP/1.1 200 OK
Cache-Control: must-revalidate
Content-Type: application/json
Date: Wed, 26 Apr 2017 19:26:03 GMT
Server: CouchDB/2.0.0-49786f7 (Erlang OTP/18)
Transfer-Encoding: chunked
X-Couch-Request-ID: 465148b54d
X-CouchDB-Body-Time: 0

{
    "docs": [],
    "offset": 0,
    "total_rows": 1000
}

@@ -115,7 +115,9 @@ else
endif
# This might help with emfile errors during `make javascript`: ulimit -n 10240
@rm -rf dev/lib
@dev/run -n 1 -q --with-admin-party-please test/javascript/run $(suites)
@dev/run -n 1 -q --with-admin-party-please \
-c 'startup_jitter=0' \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a critical change for this patch? Why?

If so you also need to make the change in Makefile.win.

Copy link
Contributor Author

@nickva nickva Apr 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to make the javascript test pass in Travis. Without this change the integration test suite takes too long and times out. There are enough replications starting that 5 second average wait starts to add up quickly.

And it might be a useful feature in general for development (to override settings while running ./dev/run)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +1 on the entire concept, and -0 on the implementation. For instance, if this grows beyond 1-2 settings we should probably implement it differently.

Recently I added the local.d and default.d directories to the overlay folders. Perhaps instead you should just create a file local.d/startup-jitter.ini and put your setting in there. In fact, a lot of the machinations dev/run goes through are obsolete if we just move to creating local.d/*.ini files instead of in-place substitutions.

Don't let this block getting your PRs landed, but if you don't tackle it, would you file an issue please? and I might look at it sooner rather than later.

Copy link
Member

@sagelywizard sagelywizard left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took another look, and my only concerns are the things that @wohali commented about. So, once those are addressed, I'm 👍. Awesome, awesome changes!

Copy link
Member

@davisp davisp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. All of my issues have been covered. Awesome work everyone!

@wohali
Copy link
Member

wohali commented Apr 28, 2017

@redgeoff As far as I'm aware, CouchDB doesn't make any attempt to check system/process ulimit on max fds. If I recall correctly, you'll start getting {error, system_limit} when you hit this limit. (Don't forget that the erl process itself by default limits Erlang to 65536 max simultaneous ports open or 8196 on Windows, which can be adjusted with +Q in vm.args or on the command line.) If the test passes as @nickva shows with a sufficiently high ulimit, I don't see any need to block this PR.

rnewson and others added 9 commits April 28, 2017 17:35
Scheduling replicator can run a large number of replication jobs by scheduling
them. It will periodically stop some jobs and start new ones. Jobs that fail
will be penalized with an exponential backoff.

Jira: COUCHDB-3324
This module maintains cluster membership information for replication and
provides functions to check ownership of replication jobs.

A cluster membership change is registered only after a configurable
`cluster_quiet_period` interval has passed since the last node addition or
removal. This is useful in cases of rolling node reboots in a cluster in order
to avoid rescanning for membership changes after every node up and down event,
and instead doing only on rescan at the very end.

Jira: COUCHDB-3324
Monitor shards which match a suffix for creation, deletion, and doc updates.

To use implement `couch_multidb_changes` behavior. Call `start_link` with
DbSuffix, with an option to skip design docs (`skip_ddocs`). Behavior
callback functions will be called when shards are created, deleted, found and
updated.

Jira: COUCHDB-3324
This commit adds functionality to share connections between
replications. This is to solve two problems:

- Prior to this commit, each replication would create a pool of
  connections and hold onto those connections as long as the replication
  existed. This was wasteful and cause CouchDB to use many unnecessary
  connections.
- When the pool was being terminated, the pool would block while the
  socket was closed. This would cause the entire replication scheduler
  to block. By reusing connections, connections are never closed by
  clients. They are only ever relinquished. This operation is always
  fast.

This commit adds an intermediary process which tracks which connection
processes are being used by which client. It monitors clients and
connections. If a client or connection crashes, the paired
client/connection will be terminated.

A client can gracefully relinquish ownership of a connection. If that
happens, the connection will be shared with another client. If the
connection remains idle for too long, it will be closed.

Jira: COUCHDB-3324
AIMD: additive increase / multiplicative decrease feedback control algorithm.

https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease

This is an algorithm which converges on the available channel capacity.
Each participant doesn't a priori know the capacity and participants don't
communicate or know about each other (so they don't coordinate to divide
the capacity among themselves).

A variation of this is used in TCP congestion control algorithm. This is proven
to converge, while for example, additive increase / additive decrease  or
multiplicative increase / multiplicative decrease won't.

A few tweaks were applied to the base control logic:

 * Estimated value is an interval (period) instead of a rate. This is for
   convenience, as users will probably want to know how much to sleep. But,
   rate is just 1000 / interval, so it is easy to transform.

 * There is a hard max limit for estimated period.  Mainly as a practical concern
   as connections sleeping too long will timeout and / or jobs will waste time
   sleeping and consume scheduler slots, while others could be running.

 * There is a time decay component used to handle large pauses between updates.
   In case of large update interval, assume (optimistically) some successful
   requests have been made. Intuitively, the more time passes, the less accurate
   the estimated period probably is.

 * The rate of updates applied to the algorithm is limited. This effectively
   acts as a low pass filter and make the algorithm handle better spikes and
   short bursts of failures. This is not a novel idea, some alternative TCP
   control algorithms like Westwood+ do something similar.

 * There is a large downward pressure applied to the increasing interval as it
   approaches the max limit. This is done by tweaking the additive factor via
   a step function. In practice this has effect of trying to make it a bit
   harder for jobs to cross the maximum backoff threshold, as they would be
   killed and potentially lose intermediate work.

Main API functions are:

   success(Key) -> IntervalInMilliseconds

   failure(Key) -> IntervalInMilliseconds

   interval(Key) -> IntervalInMilliseconds

Key is any (hashable by phash2) term. Typically would be something like
{Method, Url}. The result from the function is the current period value. Caller
would then presumably choose to sleep for that amount of time before or after
making requests. The current interval can be read with interval(Key) function.

Implementation is sharded ETS tables based on the key and there is a periodic
timer which cleans unused items.

Jira: COUCHDB-3324
Over the years utils accumulated a lot of functionality. Clean up a bit by
separating it into specific modules according to semantics:

 - couch_replicator_docs : Handle read and writing to replicator dbs.
   It includes updating state fields, parsing options from documents, and
   making sure replication VDU design document is in sync.

 - couch_replicator_filters : Fetch and manipulate replication filters.

 - couch_replicator_ids : Calculate replication IDs. Handles versioning and
   Pretty formatting of IDs. Filtered replications using user filter functions
   incorporate a filter code hash into the calculation, in that case call
   couch_replicator_filters module to fetch the filter from the source.

Jira: COUCHDB-3324
Document processor listens for `_replicator` db document updates, parses those
changes then tries to add replication jobs to the scheduler.

Listening for changes happens in `couch_multidb_changes module`. That module is
generic and is set up to listen to shards with `_replicator` suffix by
`couch_replicator_db_changes`. Updates are then passed to the document
processor's `process_change/2` function.

Document replication ID calculation, which can involve fetching filter code
from the source DB, and addition to the scheduler, is done in a separate
worker process: `couch_replicator_doc_processor_worker`.

Before couch replicator manager did most of this work. There are a few
improvement over previous implementation:

 * Invalid (malformed) replication documents are immediately failed and will
 not be continuously retried.

 * Replication manager message queue backups is unfortunately a common issue
 in production. This is because processing document updates is a serial
 (blocking)  operation. Most of that blocking code was moved to separate worker
 processes.

 * Failing filter fetches have an exponential backoff.

 * Replication documents don't have to be deleted first then re-added in order
 update the replication. Document processor on update will compare new and
 previous replication related document fields and update the replication job
 if those changed. Users can freely update unlrelated (custom) fields in their
 replication docs.

 * In case of filtered replications using custom functions, document processor
 will periodically check if filter code on the source has changed. Filter code
 contents is factored into replication ID calculation. If filter code changes
 replication ID will change as well.

Jira: COUCHDB-3324
Glue together all the scheduling replicator pieces.

Scheduler is the main component. It can run a large number of replication jobs
by switching between them, stopping and starting some periodically. Jobs
which fail are backed off exponentially. Normal (non-continuous) jobs will be
allowed to run to completion to preserve their current semantics.

Scheduler behavior can configured by these configuration options in
`[replicator]` sections:

 * `max_jobs` : Number of actively running replications. Making this too high
 could cause performance issues. Making it too low could mean replications jobs
 might not have enough time to make progress before getting unscheduled again.
 This parameter can be adjusted at runtime and will take effect during next
 reschudling cycle.

 * `interval` : Scheduling interval in milliseconds. During each reschedule
 cycle scheduler might start or stop up to "max_churn" number of jobs.

 * `max_churn` : Maximum number of replications to start and stop during
 rescheduling. This parameter along with "interval" defines the rate of job
 replacement. During startup, however a much larger number of jobs could be
 started (up to max_jobs) in short period of time.

Replication jobs are added to the scheduler by the document processor or from
the `couch_replicator:replicate/2` function when called from `_replicate` HTTP
endpoint handler.

Document processor listens for updates via couch_mutlidb_changes module then
tries to add replication jobs to the scheduler. Sometimes translating a
document update to a replication job could fail, either permantly (if document
is malformed and missing some expected fields for example) or temporarily if
it is a filtered replication and filter cannot be fetched. A failed filter
fetch will be retried with an exponential backoff.

couch_replicator_clustering is in charge of monitoring cluster membership
changes. When membership changes, after a configurable quiet period, a rescan
will be initiated. Rescan will shufle replication jobs to make sure a
replication job is running on only one node.

A new set of stats were added to introspect scheduler and doc processor
internals.

The top replication supervisor structure is `rest_for_one`. This means if
a child crashes, all children to the "right" of it will be restarted (if
visualized supervisor hierarchy as an upside-down tree). Clustering,
connection pool and rate limiter are towards the "left" as they are more
fundamental, if clustering child crashes, most other components will be
restart. Doc process or and multi-db changes children are towards the "right".
If they crash, they can be safely restarted without affecting already running
replication or components like clustering or connection pool.

Jira: COUCHDB-3324
The `_scheduler/jobs` endpoint provides a view of all replications managed by
the scheduler. This endpoint includes more information on the replication than
the `_scheduler/docs` endpoint, including the history of state transitions of
the replication. This part was implemented by Benjamin Bastian.

The `_scheduler/docs` endpoint provides a view of all replicator docs which
have been seen by the scheduler. This endpoint includes useful information such
as the state of the replication and the coordinator node. The implemention of
`_scheduler/docs` mimics closely `_all_docs` behavior: similar pagination,
HTTP request processing and fabric / rexi setup. The algorithm is roughly
as follows:

 * http endpoint:
   - parses query args like it does for any view query
   - parses states to filter by, states are kept in the `extra` query arg

 * Call is made to couch_replicator_fabric. This is equivalent to
   fabric:all_docs. Here the typical fabric / rexi setup is happening.

 * Fabric worker is in `couch_replicator_fabric_rpc:docs/3`. This worker is
   similar to fabric_rpc's all_docs handler. However it is a bit more intricate
   to handle both replication document in terminal state as well as those which
   are active.

   - Before emitting it queries the state of the document to see if it is in a
     terminal state. If it is, it filters it and decides if it should be
     emitted or not.

   - If the document state cannot be found from the document. It tries to
     fetch active state from local node's doc processor via key based lookup.
     If it finds, it can also filter it based on state and emit it or skip.

   - If the document cannot be found in the node's local doc processor ETS
     table, the row is emitted with a doc value of `undecided`. This will
     let the coordinator fetch the state by possibly querying other nodes's
     doc processors.

  * Coordinator then starts handling messages. This also mostly mimics all_docs.
    At this point the most interesting thing is handling `undecided` docs. If
    one is found, then `replicator:active_doc/2` is queried. There, all nodes
    where document shards live are queries. This is better than a previous
    implementation where all nodes were queries all the time.

  * The final work happens in `couch_replicator_httpd` where the emitting
    callback is. There we only the doc is emitted (not keys, rows, values).
    Another thing that happens is the `Total` value is decremented to
    account for the always-present _design  doc.

Because of this a bunch of stuff was removed. Including an extra view which
was build and managed by the previous implementation.

As a bonus, other view-related parameters such as skip and limit seems to
work out of the box and don't have to be implemented ad-hoc.

Also, most importantly  many thanks to Paul Davis for suggesting this approach.

Jira: COUCHDB-3324
@nickva nickva merged commit f7a711d into master Apr 28, 2017
@wohali
Copy link
Member

wohali commented Apr 28, 2017

\o/ congrats! so exciting.

@redgeoff
Copy link
Contributor

redgeoff commented Apr 28, 2017 via email

@wohali
Copy link
Member

wohali commented Apr 29, 2017

The good news is that yes, this commit DOES more efficiently handle replication with less connections, but only for multiple replications between the same hosts. So it's great for clusters and data center backups and so on, but doesn't really help the database-and-replication-per-user model very much.

@nickva nickva deleted the 63012-scheduler branch May 15, 2017 19:34
iilyak added a commit to cloudant/couchdb that referenced this pull request Jun 7, 2017
This reverts commit f7a711d, reversing
changes made to 350a67b.

Conflicts:
	src/couch/src/couch_multidb_changes.erl
	src/couch_replicator/test/couch_replicator_compact_tests.erl
iilyak added a commit to cloudant/couchdb that referenced this pull request Jun 12, 2017
This reverts commit f7a711d, reversing
changes made to 350a67b.

Conflicts:
	src/couch/src/couch_multidb_changes.erl
	src/couch_replicator/test/couch_replicator_compact_tests.erl
iilyak added a commit to cloudant/couchdb that referenced this pull request Jun 23, 2017
This reverts commit f7a711d, reversing
changes made to 350a67b.

Conflicts:
	src/couch/src/couch_multidb_changes.erl
	src/couch_replicator/test/couch_replicator_compact_tests.erl
@badbod99
Copy link

badbod99 commented Feb 5, 2019

Was this actually finished and released? Struggling to understand the git history, seems to have been reverted. Also struggling to find in release notes or related documentation regarding settings. Possible in just not doing a good job of looking.

@wohali
Copy link
Member

wohali commented Feb 5, 2019

@badbod99 Yes, the scheduling replicator was merged and released in 2.1.0.

http:https://docs.couchdb.org/en/latest/whatsnew/2.1.html#version-2-1-0

@badbod99
Copy link

badbod99 commented Feb 5, 2019

Great, thanks. Not sure how I missed that!

nickva pushed a commit to nickva/couchdb that referenced this pull request Sep 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

10 participants