-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scheduling Replicator #470
Conversation
c2d381c
to
f6658fc
Compare
Some additional information from testing how new scheduling replicator handles a large amount of jobs. This is a graph of adding 1M replication jobs to a cluster of 3 nodes. Replication jobs were added using couchdyno/rep tool : https://github.com/cloudant-labs/couchdyno/blob/master/README_rep.md
That creates a connected cluster of replications for for n=1000, it would create 1000*1000=1M replication jobs.
|
Hooray scheduling replicator! This looks pretty awesome. Its quite big so I'm gonna split my review into roughly three parts: concerns, things that look like bugs, and then style issues. Commencing in 3, 2, 1, now! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not going to be overly specific on how this change is handled but we will need to do something here or we risk opening a security hole if requests are re-used between replications.
handle_call({acquire, URL}, From, State) -> | ||
{Pid, _Ref} = From, | ||
case ibrowse_lib:parse_url(URL) of | ||
#url{host=Host, port=Port} -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the most worrisome part of the whole PR for me. We're starting HTTP connections outside of this pool and then parsing the URLs after the fact. And specifically, we're not checking here that the URLs we're provided don't have a username/password set. If we did get one with that and assuming that ibrowse caches the credentials in the worker then its possible we're leaking authentication credentials across replications.
We should very much insist that all workers do not have authentication cached and then make sure we provide it for each request individually. And we should also assert that ibrowse isn't caching credentials between requests as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davisp Good point. I think parsing url outside the gen_server and calling acquire with host, port tuple might work?
@sagelywizard Want to take a look at this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And ensuring that we're not getting a username/password passed to ibrowse and then verifying and/or somehow asserting that ibrowse hasn't cached a username/password after it was used in a request are the more important bits to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After a worker is released I don't see the credentials in it anymore
Here is a state of relinquished ibrowse_http_client:init/1
worker
{state,"localhost",15984,infinity,#Ref<0.0.2.22525>,false,undefined,undefined,
undefined,undefined,undefined,[],false,#Port<0.32462>,false,[],
{[],[]},
undefined,idle,undefined,
<<>>,
0,0,[],undefined,undefined,undefined,undefined,false,undefined,
undefined,
<<>>,
undefined,false,undefined,0,undefined}
While it was making a request it had adm:pass in there. In other words it doesn't seem like credentials are cached in the worker process after it is released.
Dictionary had these fields:
my_trace_flag: false
ibrowse_trace_token: ["localhost",58,"15984"]
http_prot_vsn: "HTTP/1.1"
conn_close:"false"
'$initial_call': {ibrowse_http_client,init,1}
'$ancestors': [<0.23517.1>,0.23508.1>,couch_replicator_scheduler_sup,couch_replicator_sup,
<0.369.0>]
So I think we might be ok here. It basically has the same issue as current code where worker crashing could spill its credentials in the log sometimes.
Also notice ibrowse client doesn't store the password on worker spawn_link just keep host, port and protocol around:
https://github.com/apache/couchdb-ibrowse/blob/master/src/ibrowse_http_client.erl#L115
So we might actually be ok here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, looks like ibrowse is good there. For sake of completeness though when we parse the URL in couch_replicator_connection lets use a wrapper function that specifically drops the user/pass pair out of the record to be clear that we're not passing credentials to the worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm satisfied that this isn't an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although it might be an issue in the case ibrowse is updated latter on (or we replace http client). How hard it would be to write a test case which would check that released process do not have leftover data? If we would have a test then we would be able to catch the problem if ibrowse would start to leak data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iilyak @davisp Added connection pool tests which check that worker processes don't hold on to credentials:
couchdb/src/couch_replicator/test/couch_replicator_connection_tests.erl
Lines 129 to 195 in 628cf6f
worker_discards_creds_on_create({Host, Port}) -> | |
?_test(begin | |
{User, Pass, B64Auth} = user_pass(), | |
URL = "https://" ++ User ++ ":" ++ Pass ++ "@" ++ Host ++ ":" ++ Port, | |
{ok, WPid} = couch_replicator_connection:acquire(URL), | |
Internals = worker_internals(WPid), | |
?assert(string:str(Internals, B64Auth) =:= 0), | |
?assert(string:str(Internals, Pass) =:= 0) | |
end). | |
worker_discards_url_creds_after_request({Host, _}) -> | |
?_test(begin | |
{User, Pass, B64Auth} = user_pass(), | |
{Port, ServerPid} = server(), | |
PortStr = integer_to_list(Port), | |
URL = "https://" ++ User ++ ":" ++ Pass ++ "@" ++ Host ++ ":" ++ PortStr, | |
{ok, WPid} = couch_replicator_connection:acquire(URL), | |
?assertMatch({ok, "200", _, _}, send_req(WPid, URL, [], [])), | |
Internals = worker_internals(WPid), | |
?assert(string:str(Internals, B64Auth) =:= 0), | |
?assert(string:str(Internals, Pass) =:= 0), | |
couch_replicator_connection:release(WPid), | |
unlink(ServerPid), | |
exit(ServerPid, kill) | |
end). | |
worker_discards_creds_in_headers_after_request({Host, _}) -> | |
?_test(begin | |
{_User, Pass, B64Auth} = user_pass(), | |
{Port, ServerPid} = server(), | |
PortStr = integer_to_list(Port), | |
URL = "https://" ++ Host ++ ":" ++ PortStr, | |
{ok, WPid} = couch_replicator_connection:acquire(URL), | |
Headers = [{"Authorization", "Basic " ++ B64Auth}], | |
?assertMatch({ok, "200", _, _}, send_req(WPid, URL, Headers, [])), | |
Internals = worker_internals(WPid), | |
?assert(string:str(Internals, B64Auth) =:= 0), | |
?assert(string:str(Internals, Pass) =:= 0), | |
couch_replicator_connection:release(WPid), | |
unlink(ServerPid), | |
exit(ServerPid, kill) | |
end). | |
worker_discards_proxy_creds_after_request({Host, _}) -> | |
?_test(begin | |
{User, Pass, B64Auth} = user_pass(), | |
{Port, ServerPid} = server(), | |
PortStr = integer_to_list(Port), | |
URL = "https://" ++ Host ++ ":" ++ PortStr, | |
{ok, WPid} = couch_replicator_connection:acquire(URL), | |
Opts = [ | |
{proxy_host, Host}, | |
{proxy_port, Port}, | |
{proxy_user, User}, | |
{proxy_pass, Pass} | |
], | |
?assertMatch({ok, "200", _, _}, send_req(WPid, URL, [], Opts)), | |
Internals = worker_internals(WPid), | |
?assert(string:str(Internals, B64Auth) =:= 0), | |
?assert(string:str(Internals, Pass) =:= 0), | |
couch_replicator_connection:release(WPid), | |
unlink(ServerPid), | |
exit(ServerPid, kill) | |
end). |
We check 4 cases:
- After worker creation
- After request if credentials were embedded in URL
- After request if credentials passed in an
Authorization
header - After request if credentials were set as proxy parameters
The check is performed on the worker process's dictionary and its state. (The assumption here it is a gen_server, which holds for current ibrowse. If it stops being so tests will fail and we'll know).
ibrowse
internally translates URL embedded user and password to basic authorization header, so we always check both the plaintext password and the base64-encoded version of "User:Pass" string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is another vector where credentials could leak. The message queue of a worker process.
Here ibrowse
sends full url (with credentials) to the worker process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yap, definitely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple things that jumped out at me as possible bugs when reading the PR. Not sure on the question about age, the others seem like legit bugs though.
src/couch/src/couch_doc.erl
Outdated
@@ -270,6 +270,9 @@ transfer_fields([{<<"_replication_state">>, _} = Field | Rest], | |||
transfer_fields([{<<"_replication_state_time">>, _} = Field | Rest], | |||
#doc{body=Fields} = Doc) -> | |||
transfer_fields(Rest, Doc#doc{body=[Field|Fields]}); | |||
transfer_fields([{<<"_replication_start_time">>, _} = Field | Rest], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we add this or was it just always missing? Adding _ fields to docs will lead to oddness with the replicator when replicating to versions of couch that don't allow this field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh interesting. It's the case of replicator the _replicator db itself right?
Anyone have any thoughts on this one @sagelywizard @rnewson ?
Add a note that replicating the _replicator from CouchDB 3.0 to <3.0 is not supported if it has completed replications in the source db?
Use a non-underscored version? replication_start_time
. But that's inconsistent a bit with other ones.
Remove this field? But then _scheduler/docs docs output will be inconsistent since for completed replications we read that from the view on disk. Dashboard code will have to be modified as well perhaps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its the backwards replication that hurts yeah. I wanted to add a new _ field for quorum info once but it got shot down cause it'd break replication with every CouchDB instance older than when it was added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Paul had a good idea. Remove the _replication_start_time and for failure case just use state time as start time. And for completed add start time (or rather duration) to _replication_stats
.
@garrensmith Will this this affect Fauxton code ^ ?
Running0 = running_jobs(), | ||
ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end, | ||
Running1 = lists:filter(ContinuousPred, Running0), | ||
Running2 = lists:sort(fun oldest_job_first/2, Running1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is just confusion, but it seems odd that we're prefering to insert old jobs in pending_maybe_replace/2, but preferring to remove the oldest job when we stop a subset. I'm going to run on the assumption that the use of "oldest" means two different things here. In pending_maybe_replace I'm going to assume its time since last activity, and when here its "been active the longest". Assuming that's the case we may want to rename things.
Reading it as is, it sounds like we're stoping and starting the same set of jobs repeatedly and never actually progressing through the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oldest_job_first/2
orders from oldest started to latest. In case of stop_jobs
we look only at running jobs, then pick the ones that have been running the longest and stop those.
In pending_maybe_replace
we are looking for stopped jobs and want to find which ones are candidates to start. So we need to find jobs which have been waiting the longest, to find those we look at job's last started
timestamp. But we don't want to load them all into memory there could be a huge number of pending jobs. So we do an an ets fold.
In the fold function we pick only healthy pending jobs (pending_fold/2
) and as we iterate through we try to keep only up to Count
oldest jobs. As soon as we find a job that is older than the youngest one so far, we remove the youngest and insert the newly found one.
For example, let's say the gb_sets accumulator has [5, 7, 11]
in there. 11
is the youngest one. Next iteration we find 6
. So we bump 11
out and insert 6
in the set and end up with [5, 6, 7]
and so on. In the end we might end up with [1,3,5]
for example.
The confusion probably comes from calling the jobs with the largest timestamp the youngest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understood the rest of the logic just fine yeah. My concern was that either there's a bug or we're calling two different age related things the same name (ie, oldest for longest running and oldest for least recently ran). Since it was saying oldest for both it sounded like we weren't actually cycling through jobs.
At the very least we may want to change a couple names to make that more clear rather than let every person that ever reads this code have to figure it out again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the problem with is with using old
. It's overridden to mean two things. When looking at running jobs and finding which ones to stop. We can use longest_running
instead of oldest_first
.
And maybe explain in the comments in pending_maybe_replace what we mean by old and young.
ok = update_state_started(Job, Child, Ref, State), | ||
couch_log:notice("~p: Job ~p started as ~p", | ||
[?MODULE, Job#job.id, Child]); | ||
{error, {already_started, OtherPid}} when node(OtherPid) =:= node -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Certainly you mean node()
and not the atom node
here and in the next case statement right?
Also, am I missing where we're checking to see if a job exists on a different node somehow? I'm not seeing that logic anywhere (granted this is a fairly large PR I'm still processing).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug! 🐛 You're right, should be node()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those should be examples for all of the bigger style issues I saw. As mentioned I'm not gonna go through and nit pick every instance or it would get a bit overwhelming with such a large PR.
-export([start_link/4]). | ||
|
||
-export([init/1, handle_call/3, handle_info/2, handle_cast/2]). | ||
-export([code_change/3, terminate/2]). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let the style review commence!
I'm only picking on this module because its the first new module in the PR's diff view though this applies to all new modules and even old ones where we're changing things in the front matter and moving functions around to some degree.
A module should have the following basic outline:
We should work on organizing our export lists so that diffs are easier to read in the future. In general, we should have three export attributes for: public api, behavior callbacks, private module callbacks that need to be exported. Where behavior exports will have one attribute per behavior.
Within an export we should have a single function per line. This way when these change the diff will be easier to read and it will be apparent on what's changing rather than our current status quo of sticking random functions into a module with randomly placed additions to the export attributes.
For this module these should look something like:
-export([
start_link/4
]).
-export([
init/1,
terminate/2,
handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3
]).
-export([
changes_reader/3,
changes_reader_cb/3
]).
Also, the functions in the module should follow that order and any module private functions should follow after those in some logical order (I tend to prefer the order they were referenced by the exported functions so that its easy to know about where they are when searching for their definition, but that can be trumped if some other logical ordering makes sense, i.e., for trivial functions that aren't referenced often because their intent/implementation is obvious can usually just go as the very last functions in the module before any tests).
Also a note on whitespace as there's quite a bit of variation throughout this PR. In general, there's a preference for two empty lines between sections and functions and a single empty line in places where it helps with readability. So, for instance, we're missing empty lines between the behavior attribute and the exports, and then between exports and includes, etc since these are different "sections". For the most part this PR uses two empty lines between functions but in some cases it only uses one or will even use three or four in some places. We'll want to normalize all of those.
Places where single empty lines are helpful are when a handle_call/handle_cast function for a gen_server has a number of large clauses. A single empty line between clauses can be helpful to read each individually. Also in export lists it can be useful to group different related functions into sections delimited by empty lines so that related functions are obvious in their relation to each other. Above all though, lets try and be consistent.
Also considering that a lot of these modules are new I'm being extra persnickety since these are greenfield and we're not doing things like requiring a style cleanup and then a change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I'm not going to go through and note every occurrence of these style issues as I care about everyone's inboxes. One of these days I'm gonna find time to write a tool or find one that works for us to use so that we can just have accepted truth for style rather than a doc hidden somewhere on a wiki.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some cleanups in this area. Restructured exports, fixed some > 80 line chars. I am sure didn't get all of them.
I tried using https://github.com/inaka/elvis with a custom style config. But the number of rules there are not enough and some are not useful. 80 char lies was probably the most helpful one, others I had to ignore to avoid generating noise.
I think we need a customized emacs-based reformatter thing like this: https://github.com/fenollp/erlang-formatter.
Also, some of the code was copied from old modules, for example couch_scheduler_scheduler_job is mostly the old couch_replicator gen_server I skipped some of those to not make the diff even larger.
We'd probably do a reformatting pass-only as a separate set of PRs and such. But it would be good to have an automated formatter-thing first.
since = Since, | ||
feed = "normal", | ||
timeout = infinity | ||
}, {json_req, null}, Db), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor Nit: I'd pull the #changes_args{}
out and assign it to a variable and then call handle_db_changes/3 on a single line. Generally we have three patterns for formatting a function call:
- all on one line
- all args and closing paren and comma on new line indented twice more
- each arg on a separate line, indented twice, closing parent/comma on separate line indented once
The idea here being that we want to communicate the sections and make things easier on eyeballs.
DbName = couch_util:get_value(<<"id">>, Change), | ||
case DbName of <<"_design/", _/binary>> -> ok; _Else -> | ||
case couch_replicator_utils:is_deleted(Change) of | ||
true -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Case statement patterns should be in one indent.
% Convenience function for gen_servers to subscribe to {cluster, stable} and | ||
% {cluster, unstable} events from couch_replicator clustering module. | ||
-spec link_cluster_event_listener(pid()) -> pid(). | ||
link_cluster_event_listener(GenServer) when is_pid(GenServer) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The specificity of this to gen_server is odd. Normally I would go for something like:
link_clustr_event_listener(Mod, Fun, Args) -> ...
Which would then end up invoking something like erlang:apply(Mod, Fun, Args ++ [Event])
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]). | ||
-export([code_change/3, terminate/2]). | ||
|
||
-export([acquire/1, relinquish/1]). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
relinquish is a funny verb here. Usually its acquire/release for resource handling.
@@ -31,8 +31,7 @@ | |||
-record(state, { | |||
url, | |||
limit, % max # of workers allowed | |||
free = [], % free workers (connections) | |||
busy = [], % busy workers (connections) | |||
conns = [], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its called conns
but stores variables named Worker
which was hard to keep straight in my head.
-include("couch_replicator_api_wrap.hrl"). | ||
-include("couch_replicator.hrl"). | ||
|
||
-import(couch_util, [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imports are the devil and we should take every opportunity to remove them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👿 agree, not sure what I was thinking
Job = #job{ | ||
id = Rep#rep.id, | ||
rep = Rep, | ||
history = [{added, os:timestamp()}]}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The closing bracket of the record should be on its own newline.
|
||
|
||
format_status(_Opt, [_PDict, State]) -> | ||
[{max_jobs, State#state.max_jobs}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just Say No to Visual Indents!
stop_excess_jobs(State, Running) -> | ||
#state{max_jobs=MaxJobs} = State, | ||
StopCount = Running - MaxJobs, | ||
if StopCount > 0 -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A space saver pattern is to flip your logic and move the true branch up:
if StopCont =< 0 -> ok; true ->
stuff...
end,
|
||
relinquish(Worker) -> | ||
unlink(Worker), | ||
gen_server:cast(?MODULE, {relinquish, Worker}). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This unlink cast pattern seems suspicious to me. Though I guess the gen_server links to everything and monitors clients so theoretically there's no hole in the dance here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yap. We are unlinking from owner here. If worker dies after unlinking we'll monitor its death and find out.
{Pid, _Ref} = From, | ||
case ibrowse_lib:parse_url(URL) of | ||
#url{host=Host, port=Port} -> | ||
case ets:match_object(?MODULE, #connection{host=Host, port=Port, mref=undefined, _='_'}, 1) of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I missed this in the style nits:
Just Say No to Lines Longer Than 80 Characters!
|
||
-spec interval(#state{}) -> non_neg_integer(). | ||
interval(#state{period = Period, start_period = Period0, start_time = T0}) -> | ||
case now_diff_sec(T0) > Period of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another bug I spotted and forgot about, I'm pretty sure Period and Period0 are backwards here. Also, I really think we should probably rename the name bindings because obviously either me or the code is wrong and its hard to know which (though I'm pretty sure its the code :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming start_period Period0
was silly. The logic also assumes that start period is a shorter than normal period.
So on startup we want to wait just a bit for nodes to connect and cluster to stabilize, but not for a whole minute. But if cluster has been up for a while, and we notice membership changes, chances are there are probably restarts happening so the wait is longer.
The logic there says, if time since startup is greater than a full period (minutes) then use normal period to wait. But if time since startup is short (seconds) assume we are in the start phase so just wait a few seconds before rescan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohhhhhh. Yeah, that makes sense finally. We should add a comment then cause that's super not obvious to me at least. Something simple like:
% For the first Period seconds after node boot we check cluster stability every
% StartPeriod seconds. Once the initial Period seconds have passed we continue
% to monitor once every Period seconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reference, my first read on that function, I thought the intent was to return StartPeriod during the first StartPeriod seconds and then Period once we'd been alive for more than StartPeriod. Ie, during StartPeriod we returned one interval, and after we return the other. As opposed to, for the first Period return StartPeriod, in second until infinity Period, return Period.
|
||
-spec is_stable(#state{}) -> boolean(). | ||
is_stable(#state{last_change = TS} = State) -> | ||
now_diff_sec(TS) > interval(State). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that this is in seconds should we go with >=
? Or not? I can't convince myself either way if equal to period is correct or not or harmless that its interval+1 in reality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think > shows intent better, and below you noticed it's a float (with / operation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
USec when USec < 0 -> | ||
0; | ||
USec when USec >= 0 -> | ||
USec / 1000000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, though with my last comment, given that we use /
instead of div
it probably doesn't make a difference since it'll be interval+1usec rather than interval+1sec.
-spec now_msec() -> msec(). | ||
now_msec() -> | ||
{Mega, Sec, Micro} = os:timestamp(), | ||
((Mega * 1000000) + Sec) * 1000 + round(Micro / 1000). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think div
would be more appropriate here? Or do we really specifically want rounding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Div will work. Good idea!
|
||
% Definitions | ||
|
||
-define(SHARDS_N, 16). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I also wanted to ask why we're jumping straight to sharded ets tables? Is this something that we found to be a bottleneck or something that we're worried will be a bottleneck.
Also, regardless, we should probably pull this out into its own module somewhere so that the logic is separate. Also, there was a neat generational ets cache I saw somewhere if we're worried about deleting things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could handle all the replication connections in a larger cluster so seemed bottleneck-y to have all connection search through a single ets table for each request.
Pulling it to a separate module would be reasonable
delete_worker(Worker) -> | ||
ets:delete(?MODULE, Worker#connection.worker), | ||
unlink(Worker#connection.worker), | ||
spawn(fun() -> ibrowse_http_client:stop(Worker#connection.worker) end), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, every Interval seconds, we go through a sweep through the ETS table and find worker processes which don't have a monitor reference for deletion. Is there ever a situation where lingering worker processes/connections aren't closed? I do see that that stop/1 does have a timeout, so if it's issued a stop, at some point the timeout will trigger and we'll kill it. If this couch_replicator_connection
process crashes/restarts, would those worker processes stick around until some other time out? Just brainstorming here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good observation. Yeah they are not closed immediately because then there is no chance fo reuse them and the connection pool won't serve its purpose. So connections stay around after they are used for some time ready to be picked up by some other owner. However we don't want them to stick around forever either, otherwise the pool will just keep growing. So we sweep through and close the idle connection.
Now ibrowse clients actually have a close on idle timeout as well (I think 90 sec or so). But I think this is a case of us not trusting that mechanism 100% and implementing our own cleanup.
pp_rep_id/1 | ||
]). | ||
|
||
-define(WORKER_TIMEOUT_MSEC, 61000). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha any reason this is 61000?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because 30 seconds was a timeout for requests (if I remember correctly) and so after worker is spawned, we'd want the worker to get a chance to make a few requests (maybe one failing one and a retry) and then fail with its own error (timeout, network error), which would be more specific and informative, before it simply gets killed because of the timeout here. That is, if all fails and the worker is actually blocked then 61 sec is a safety net to brutally kill the worker so doesn't end up hung forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add your description as comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few minor nits. Tests look pretty good. Also I'm gonna review each module's tests independently cause there's quite a few of them.
meck:expect(couch_db, close, 1, ok), | ||
mock_changes_reader(), | ||
% create process to stand in for couch_ever_server | ||
% mocking erlang:monitor doesn't, so give it real process to monitor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couch_event_server? Also doesn't (work) I assume?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. Yap, you're right. Will fix it
exit({Server, DbName}) | ||
end. | ||
|
||
kill_mock_change_reader_and_get_its_args(Pid) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, but you've got mock_changes_reader_loop and kill_mock_change_reader.*, would be nicer to keep the plurality constant for changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point
t_handle_info_other_event() -> | ||
?_test(begin | ||
State = mock_state(), | ||
handle_info_check({'$couch_event', ?DBNAME, somethingelse}, State) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like assertions for not calling db_created|deleted|found would be useful here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit suggestion.
Url1 = <<"https://adm:pass@host/db">>, | ||
?assertEqual(<<"https://adm:*****@host/db/">>, strip_url_creds(Url1)), | ||
Url2 = <<"https://adm:pass@host/db">>, | ||
?assertEqual(<<"https://adm:*****@host/db/">>, strip_url_creds(Url2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add tests for port and query strings as well.
gen_server:cast(Server, Event). | ||
|
||
|
||
% Private helpers for multidb changes API, these updates into the doc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"these updates into" seems like its missing a word.
% exception which would indicate this document is malformed. This exception | ||
% should propagate to db_change function and will be recorded as permanent | ||
% failure in the document. User will have to delete and re-create the | ||
% document to fix the problem. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete and recreate? Would an update not work just as well for some reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Comments is outdated. Update now works. Before had to do delete and recreate dance.
Row0#rdoc{rid = RepId, info = couch_util:to_binary(Msg)}; | ||
#rdoc{rid = nil} -> | ||
% Calculated new replication id for non-filtered replication. | ||
% Remove replication doc body, after this we won't needed any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"we won't needed" seems like its missing a word. And assuming its "we won't be needed" or something its not entirely clear what's not needed or what we're doing about it.
Wait = get_worker_wait(Doc), | ||
Ref = make_ref(), | ||
true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}), | ||
couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might want to use ?MODULE here. Reviewing this I had a "wait, isn't that the module I'm reading?" moment and had to scroll up to double check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. Different module couch_replicator_doc_processor_worker
is the worker module that does the work of turning replication doc -> replication task running in scheduler bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. Hooray for long module names I guess.
-spec error_backoff(non_neg_integer()) -> seconds(). | ||
error_backoff(ErrCnt) -> | ||
Exp = min(ErrCnt, ?ERROR_MAX_BACKOFF_EXPONENT), | ||
random:uniform(64 bsl Exp). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The math here could use a comment.
filter_backoff() -> | ||
Total = ets:info(?MODULE, size), | ||
Range = 1 + min(2 * (Total / 10), ?TS_DAY_SEC), | ||
60 + random:uniform(round(Range)). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The math here could use a comment.
|
||
% Change handled when cluster is unstable (nodes are added or removed), so | ||
% job is not added. A rescan will be triggered soon and change will be | ||
% evaluated again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that process_change drops anything when the cluster is unstable, but I'm not following how it schedules a rescan to trigger this again and its not tested here. I'm assuming this is basically handled elsewhere but a note in the comment on how that rescan works would be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a notify_cluster_event
callback. That gets a {cluster, stable|unstable}
message.
After cluster is unstable we ignore the changes. When it become stable again we get the {cluster, stable}
message we do two things:
- In couch_replicator_doc_processor on each node we stop all jobs which don't belong there anymore:
ets:foldl(fun cluster_membership_foldl/2, nil, ?MODULE),
- And the full rescan is triggered in the couch_replicator_db_changes module. There when cluster is stable we restart the multidb_changes process and that rescans everything which adds jobs which should be running on this node.
restart_mdb_changes(stop_mdb_changes(State)).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotchya.
meck:expect(couch_replicator_clustering, owner, 2, different_node), | ||
?assert(ets:member(?MODULE, {?DB, ?DOC1})), | ||
gen_server:cast(?MODULE, {cluster, stable}), | ||
timer:sleep(100), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than sleeping you could use meck:wait on ets:delete/2
or couch_replicator_scheduler:remove_job/1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like couch_replicator_rate_limiter could use some tests and if we could pass our own now_msecs() value it'd be pretty simple to calculate some test data to assert we're generating the correct intervals.
Background: https://issues.apache.org/jira/browse/COUCHDB-3391 Summary: I tested the max_jobs config and it doesn't appear to be working as I would expect. Sorry, if I missed a detail. Steps to reproduce:
=> this results in |
Thanks for taking a look!
I gave your setup a try and got it to work with these settings: (I didn't try to minimize them, you can probably tweak them to find minimal values) FD limits: Config settings:
I put q=1, n=1 based on your use case in https://issues.apache.org/jira/browse/COUCHDB-3391 With all that set up. Started couch (I build a local scheduling replicator in Vagrant)
Created
Ran your demo:
without any errors. And finally showing off the states-based filtering:
Without any crashes
|
@@ -115,7 +115,9 @@ else | |||
endif | |||
# This might help with emfile errors during `make javascript`: ulimit -n 10240 | |||
@rm -rf dev/lib | |||
@dev/run -n 1 -q --with-admin-party-please test/javascript/run $(suites) | |||
@dev/run -n 1 -q --with-admin-party-please \ | |||
-c 'startup_jitter=0' \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a critical change for this patch? Why?
If so you also need to make the change in Makefile.win
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to make the javascript test pass in Travis. Without this change the integration test suite takes too long and times out. There are enough replications starting that 5 second average wait starts to add up quickly.
And it might be a useful feature in general for development (to override settings while running ./dev/run)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm +1 on the entire concept, and -0 on the implementation. For instance, if this grows beyond 1-2 settings we should probably implement it differently.
Recently I added the local.d
and default.d
directories to the overlay folders. Perhaps instead you should just create a file local.d/startup-jitter.ini
and put your setting in there. In fact, a lot of the machinations dev/run
goes through are obsolete if we just move to creating local.d/*.ini
files instead of in-place substitutions.
Don't let this block getting your PRs landed, but if you don't tackle it, would you file an issue please? and I might look at it sooner rather than later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took another look, and my only concerns are the things that @wohali commented about. So, once those are addressed, I'm 👍. Awesome, awesome changes!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. All of my issues have been covered. Awesome work everyone!
@redgeoff As far as I'm aware, CouchDB doesn't make any attempt to check system/process ulimit on max fds. If I recall correctly, you'll start getting |
Scheduling replicator can run a large number of replication jobs by scheduling them. It will periodically stop some jobs and start new ones. Jobs that fail will be penalized with an exponential backoff. Jira: COUCHDB-3324
This module maintains cluster membership information for replication and provides functions to check ownership of replication jobs. A cluster membership change is registered only after a configurable `cluster_quiet_period` interval has passed since the last node addition or removal. This is useful in cases of rolling node reboots in a cluster in order to avoid rescanning for membership changes after every node up and down event, and instead doing only on rescan at the very end. Jira: COUCHDB-3324
Monitor shards which match a suffix for creation, deletion, and doc updates. To use implement `couch_multidb_changes` behavior. Call `start_link` with DbSuffix, with an option to skip design docs (`skip_ddocs`). Behavior callback functions will be called when shards are created, deleted, found and updated. Jira: COUCHDB-3324
This commit adds functionality to share connections between replications. This is to solve two problems: - Prior to this commit, each replication would create a pool of connections and hold onto those connections as long as the replication existed. This was wasteful and cause CouchDB to use many unnecessary connections. - When the pool was being terminated, the pool would block while the socket was closed. This would cause the entire replication scheduler to block. By reusing connections, connections are never closed by clients. They are only ever relinquished. This operation is always fast. This commit adds an intermediary process which tracks which connection processes are being used by which client. It monitors clients and connections. If a client or connection crashes, the paired client/connection will be terminated. A client can gracefully relinquish ownership of a connection. If that happens, the connection will be shared with another client. If the connection remains idle for too long, it will be closed. Jira: COUCHDB-3324
AIMD: additive increase / multiplicative decrease feedback control algorithm. https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease This is an algorithm which converges on the available channel capacity. Each participant doesn't a priori know the capacity and participants don't communicate or know about each other (so they don't coordinate to divide the capacity among themselves). A variation of this is used in TCP congestion control algorithm. This is proven to converge, while for example, additive increase / additive decrease or multiplicative increase / multiplicative decrease won't. A few tweaks were applied to the base control logic: * Estimated value is an interval (period) instead of a rate. This is for convenience, as users will probably want to know how much to sleep. But, rate is just 1000 / interval, so it is easy to transform. * There is a hard max limit for estimated period. Mainly as a practical concern as connections sleeping too long will timeout and / or jobs will waste time sleeping and consume scheduler slots, while others could be running. * There is a time decay component used to handle large pauses between updates. In case of large update interval, assume (optimistically) some successful requests have been made. Intuitively, the more time passes, the less accurate the estimated period probably is. * The rate of updates applied to the algorithm is limited. This effectively acts as a low pass filter and make the algorithm handle better spikes and short bursts of failures. This is not a novel idea, some alternative TCP control algorithms like Westwood+ do something similar. * There is a large downward pressure applied to the increasing interval as it approaches the max limit. This is done by tweaking the additive factor via a step function. In practice this has effect of trying to make it a bit harder for jobs to cross the maximum backoff threshold, as they would be killed and potentially lose intermediate work. Main API functions are: success(Key) -> IntervalInMilliseconds failure(Key) -> IntervalInMilliseconds interval(Key) -> IntervalInMilliseconds Key is any (hashable by phash2) term. Typically would be something like {Method, Url}. The result from the function is the current period value. Caller would then presumably choose to sleep for that amount of time before or after making requests. The current interval can be read with interval(Key) function. Implementation is sharded ETS tables based on the key and there is a periodic timer which cleans unused items. Jira: COUCHDB-3324
Over the years utils accumulated a lot of functionality. Clean up a bit by separating it into specific modules according to semantics: - couch_replicator_docs : Handle read and writing to replicator dbs. It includes updating state fields, parsing options from documents, and making sure replication VDU design document is in sync. - couch_replicator_filters : Fetch and manipulate replication filters. - couch_replicator_ids : Calculate replication IDs. Handles versioning and Pretty formatting of IDs. Filtered replications using user filter functions incorporate a filter code hash into the calculation, in that case call couch_replicator_filters module to fetch the filter from the source. Jira: COUCHDB-3324
Document processor listens for `_replicator` db document updates, parses those changes then tries to add replication jobs to the scheduler. Listening for changes happens in `couch_multidb_changes module`. That module is generic and is set up to listen to shards with `_replicator` suffix by `couch_replicator_db_changes`. Updates are then passed to the document processor's `process_change/2` function. Document replication ID calculation, which can involve fetching filter code from the source DB, and addition to the scheduler, is done in a separate worker process: `couch_replicator_doc_processor_worker`. Before couch replicator manager did most of this work. There are a few improvement over previous implementation: * Invalid (malformed) replication documents are immediately failed and will not be continuously retried. * Replication manager message queue backups is unfortunately a common issue in production. This is because processing document updates is a serial (blocking) operation. Most of that blocking code was moved to separate worker processes. * Failing filter fetches have an exponential backoff. * Replication documents don't have to be deleted first then re-added in order update the replication. Document processor on update will compare new and previous replication related document fields and update the replication job if those changed. Users can freely update unlrelated (custom) fields in their replication docs. * In case of filtered replications using custom functions, document processor will periodically check if filter code on the source has changed. Filter code contents is factored into replication ID calculation. If filter code changes replication ID will change as well. Jira: COUCHDB-3324
Glue together all the scheduling replicator pieces. Scheduler is the main component. It can run a large number of replication jobs by switching between them, stopping and starting some periodically. Jobs which fail are backed off exponentially. Normal (non-continuous) jobs will be allowed to run to completion to preserve their current semantics. Scheduler behavior can configured by these configuration options in `[replicator]` sections: * `max_jobs` : Number of actively running replications. Making this too high could cause performance issues. Making it too low could mean replications jobs might not have enough time to make progress before getting unscheduled again. This parameter can be adjusted at runtime and will take effect during next reschudling cycle. * `interval` : Scheduling interval in milliseconds. During each reschedule cycle scheduler might start or stop up to "max_churn" number of jobs. * `max_churn` : Maximum number of replications to start and stop during rescheduling. This parameter along with "interval" defines the rate of job replacement. During startup, however a much larger number of jobs could be started (up to max_jobs) in short period of time. Replication jobs are added to the scheduler by the document processor or from the `couch_replicator:replicate/2` function when called from `_replicate` HTTP endpoint handler. Document processor listens for updates via couch_mutlidb_changes module then tries to add replication jobs to the scheduler. Sometimes translating a document update to a replication job could fail, either permantly (if document is malformed and missing some expected fields for example) or temporarily if it is a filtered replication and filter cannot be fetched. A failed filter fetch will be retried with an exponential backoff. couch_replicator_clustering is in charge of monitoring cluster membership changes. When membership changes, after a configurable quiet period, a rescan will be initiated. Rescan will shufle replication jobs to make sure a replication job is running on only one node. A new set of stats were added to introspect scheduler and doc processor internals. The top replication supervisor structure is `rest_for_one`. This means if a child crashes, all children to the "right" of it will be restarted (if visualized supervisor hierarchy as an upside-down tree). Clustering, connection pool and rate limiter are towards the "left" as they are more fundamental, if clustering child crashes, most other components will be restart. Doc process or and multi-db changes children are towards the "right". If they crash, they can be safely restarted without affecting already running replication or components like clustering or connection pool. Jira: COUCHDB-3324
The `_scheduler/jobs` endpoint provides a view of all replications managed by the scheduler. This endpoint includes more information on the replication than the `_scheduler/docs` endpoint, including the history of state transitions of the replication. This part was implemented by Benjamin Bastian. The `_scheduler/docs` endpoint provides a view of all replicator docs which have been seen by the scheduler. This endpoint includes useful information such as the state of the replication and the coordinator node. The implemention of `_scheduler/docs` mimics closely `_all_docs` behavior: similar pagination, HTTP request processing and fabric / rexi setup. The algorithm is roughly as follows: * http endpoint: - parses query args like it does for any view query - parses states to filter by, states are kept in the `extra` query arg * Call is made to couch_replicator_fabric. This is equivalent to fabric:all_docs. Here the typical fabric / rexi setup is happening. * Fabric worker is in `couch_replicator_fabric_rpc:docs/3`. This worker is similar to fabric_rpc's all_docs handler. However it is a bit more intricate to handle both replication document in terminal state as well as those which are active. - Before emitting it queries the state of the document to see if it is in a terminal state. If it is, it filters it and decides if it should be emitted or not. - If the document state cannot be found from the document. It tries to fetch active state from local node's doc processor via key based lookup. If it finds, it can also filter it based on state and emit it or skip. - If the document cannot be found in the node's local doc processor ETS table, the row is emitted with a doc value of `undecided`. This will let the coordinator fetch the state by possibly querying other nodes's doc processors. * Coordinator then starts handling messages. This also mostly mimics all_docs. At this point the most interesting thing is handling `undecided` docs. If one is found, then `replicator:active_doc/2` is queried. There, all nodes where document shards live are queries. This is better than a previous implementation where all nodes were queries all the time. * The final work happens in `couch_replicator_httpd` where the emitting callback is. There we only the doc is emitted (not keys, rows, values). Another thing that happens is the `Total` value is decremented to account for the always-present _design doc. Because of this a bunch of stuff was removed. Including an extra view which was build and managed by the previous implementation. As a bonus, other view-related parameters such as skip and limit seems to work out of the box and don't have to be implemented ad-hoc. Also, most importantly many thanks to Paul Davis for suggesting this approach. Jira: COUCHDB-3324
\o/ congrats! so exciting. |
Sorry, I haven't had time to follow up. I'll try to test this soon, but
please don't hold this up. In general though, I was hoping that these
changes would make it so that you could more efficiently implement
replication with less connections/handles so that you could support more
databases/users (with replications) on a single box. I'm not sure if these
enhancements target this detail.
…On Fri, Apr 28, 2017, 15:50 Joan Touzet ***@***.***> wrote:
\o/ congrats! so exciting.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#470 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/ADOPKn3BpC-XvvbHv5OG-FvtNziBAhpgks5r0m0rgaJpZM4Mzi0E>
.
|
The good news is that yes, this commit DOES more efficiently handle replication with less connections, but only for multiple replications between the same hosts. So it's great for clusters and data center backups and so on, but doesn't really help the database-and-replication-per-user model very much. |
Was this actually finished and released? Struggling to understand the git history, seems to have been reverted. Also struggling to find in release notes or related documentation regarding settings. Possible in just not doing a good job of looking. |
@badbod99 Yes, the scheduling replicator was merged and released in 2.1.0. https://docs.couchdb.org/en/latest/whatsnew/2.1.html#version-2-1-0 |
Great, thanks. Not sure how I missed that! |
Introduce Scheduling CouchDB Replicator
Jira: https://issues.apache.org/jira/browse/COUCHDB-3324
The core of the new replicator is a scheduler. It which allows running a large number of replication jobs by switching between them, stopping some and starting others periodically. Jobs which fail are backed off exponentially. There is also an improved inspection and querying API:
_scheduler/jobs
and_scheduler/docs
.Replication protocol hasn't change so it is possible to replicate between CouchDB 1.x, 2.x, PouchDB, and other implementations of CouchDB replication protocol.
Scheduler
Scheduler allows running a large number of replication jobs. Tested with up to 100k replication jobs in 3 node cluster. Replication jobs are run in a fair, round-robin fashion. Scheduler behavior can be configured by these configuration options in
[replicator]
sections:max_jobs
: Number of actively running replications. Making this too highcould cause performance issues. Making it too low could mean replications jobs
might not have enough time to make progress before getting unscheduled again.
This parameter can be adjusted at runtime and will take effect during next
rescheduling cycle.
interval
: Scheduling interval in milliseconds. During each reschedulecycle scheduler might start or stop up to "max_churn" number of jobs.
max_churn
: Maximum number of replications to start and stop duringrescheduling. This parameter along with "interval" defines the rate of job
replacement. During startup, however a much larger number of jobs could be
started (up to max_jobs) in a short period of time.
_scheduler/{jobs,docs} API
There is an improved replication state querying API, with a focus on ease of use and performance. The new API avoids having to update the replication document with transient state updates. In production that can lead to conflicts and performance issues. The two new APIs are:
_scheduler/jobs
: This endpoint shows active replication jobs. These are jobs managed by the scheduler. Some of them might be running, some might be waiting to run, or backed off (penalized) because they crashed too many times. Semantically this is somewhat equivalent to_active_tasks
but focuses only on replications. Jobs which have completed or which were never created because of malformed replication document will not be shown here as they are not managed by the scheduler._scheduler/docs
: This endpoint is an improvement on having to go back and re-read replication document to query their state. It represents the state of all the replications started from documents in_replicator
dbs. Unlike_scheduler/jobs
it will also show jobs which have failed or completed (that is, which are not managed by the scheduler anymore).Compatibility Mode
Understandably some customers are using the document-based API to query replication states (
triggered
,error
,completed
etc). To ease the upgrade path, there is a compatibility configuration setting:It defaults to
false
but when set totrue
it will continue updating replication document with the state of the replication jobs.Other Improvements
Network resource usage and performance was improved by implementing a common connection pool. This should help in cases of a large number of connections to the same sources or target. Previously connection pools were shared only withing a single replication job.
Improved rate limiting handling. Replicator requests will auto-discover rate limit capacity on targets and sources based on a proven Additive Increase / Multiplicative Decrease feedback control algorithm.
Improved performance by avoiding repeatedly retrying failing replication jobs. Instead use exponential backoff. In a large multi-user cluster, quite a few replication jobs are invalid, are crashing or failing (for various reasons such as inability to checkpoint to source, mismatched credentials, missing databases). Penalizing failing replication will free up system resources for more useful work.
Improved recovery from long but temporary network failure. Currently if replications jobs fail to start 10 times in a row, they will not be retried anymore. This is sometimes desirable, but in some scenarios (for example, after a sustained DNS failure which eventually recovers), replications reach their retry limit and cease to work. Previously it required operator intervention to continue. Scheduling replicator will never give up retrying a valid scheduled replication job. So it should recover automatically.
Better handling of filtered replications: Failing to fetch filters could block couch replicator manager, lead to message queue backups and memory exhaustion. Also, when replication filter code changes update replication accordingly (replication job ID should change in that case). This PR fixes both of those issues. Filters and replication ID calculation are managed in separate worker threads (in couch replicator doc processor). Also, when filter code changes on the source, replication jobs will restart and re-calculate their new ID automatically.
Related PR:
Documentation PR: apache/couchdb-documentation#123
Tests
EUnit test coverage. Some modules such as multidb_changes, have close to 100% coverage. Some have less. All previous replication tests have been updated to work with the new scheduling replicator.
Except for one modification, existing Javascript tests pass.
Additional integration tests. To validate, test and benchmark some edge cases, an additional toolkit was created: https://github.com/cloudant-labs/couchdyno/blob/master/README_tests.md which allows testing scenarios where nodes fail, creation of large number of replication jobs, manipulation of cluster configurations, and setting up long running (soak) tests.