From 9b997226bd608200cb583f0dd410f513a2f6fce7 Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Thu, 1 Mar 2018 13:43:33 -0600 Subject: [PATCH 1/3] Remove unused code for starting compactions This was left over from an earlier attempt at being a bit more strict on removing access to the #db record. Its not used as is obvious by the fact that the 2-arity version isn't even exported from the module. --- src/couch/src/couch_db.erl | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 2ea94b9742a..93ea07e6596 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -224,18 +224,7 @@ monitor(#db{main_pid=MainPid}) -> erlang:monitor(process, MainPid). start_compact(#db{} = Db) -> - start_compact(Db, []). - -start_compact(#db{} = Db, Opts) -> - case lists:keyfind(notify, 1, Opts) of - {notify, Pid, Term} -> - % We fake a gen_server call here which sends the - % response back to the specified pid. - Db#db.main_pid ! {'$gen_call', {Pid, Term}, start_compact}, - ok; - _ -> - gen_server:call(Db#db.main_pid, start_compact) - end. + gen_server:call(Db#db.main_pid, start_compact). cancel_compact(#db{main_pid=Pid}) -> gen_server:call(Pid, cancel_compact). From 4a73d035dc77c4074f5e3c8d8bcf900138c91eb6 Mon Sep 17 00:00:00 2001 From: Jan Lehnardt Date: Fri, 16 Feb 2018 16:12:36 +0100 Subject: [PATCH 2/3] re-enable "flaky" test in quest to nail down #745 --- ...plicator_small_max_request_size_target.erl | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/couch_replicator/test/couch_replicator_small_max_request_size_target.erl b/src/couch_replicator/test/couch_replicator_small_max_request_size_target.erl index 6f3308c3919..af3a285f5c0 100644 --- a/src/couch_replicator/test/couch_replicator_small_max_request_size_target.erl +++ b/src/couch_replicator/test/couch_replicator_small_max_request_size_target.erl @@ -61,8 +61,8 @@ reduce_max_request_size_test_() -> % attachment which exceed maximum request size are simply % closed instead of returning a 413 request. That makes these % tests flaky. - % ++ [{Pair, fun should_replicate_one_with_attachment/2} - % || Pair <- Pairs] + ++ [{Pair, fun should_replicate_one_with_attachment/2} + || Pair <- Pairs] } }. @@ -90,12 +90,12 @@ should_replicate_one({From, To}, {_Ctx, {Source, Target}}) -> % POST-ing individual documents directly and skip bulk_docs. Test that case % separately % See note in main test function why this was disabled. -% should_replicate_one_with_attachment({From, To}, {_Ctx, {Source, Target}}) -> -% {lists:flatten(io_lib:format("~p -> ~p", [From, To])), -% {inorder, [should_populate_source_one_large_attachment(Source), -% should_populate_source(Source), -% should_replicate(Source, Target), -% should_compare_databases(Source, Target, [<<"doc0">>])]}}. +should_replicate_one_with_attachment({From, To}, {_Ctx, {Source, Target}}) -> + {lists:flatten(io_lib:format("~p -> ~p", [From, To])), + {inorder, [should_populate_source_one_large_attachment(Source), + should_populate_source(Source), + should_replicate(Source, Target), + should_compare_databases(Source, Target, [<<"doc0">>])]}}. should_populate_source({remote, Source}) -> @@ -112,11 +112,11 @@ should_populate_source_one_large_one_small(Source) -> {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_one_small(Source, 12000, 3000))}. -% should_populate_source_one_large_attachment({remote, Source}) -> -% should_populate_source_one_large_attachment(Source); +should_populate_source_one_large_attachment({remote, Source}) -> + should_populate_source_one_large_attachment(Source); -% should_populate_source_one_large_attachment(Source) -> -% {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_attachment(Source, 70000, 70000))}. +should_populate_source_one_large_attachment(Source) -> + {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_attachment(Source, 70000, 70000))}. should_replicate({remote, Source}, Target) -> @@ -156,8 +156,8 @@ one_large_one_small(DbName, Large, Small) -> add_doc(DbName, <<"doc1">>, Small, 0). -% one_large_attachment(DbName, Size, AttSize) -> -% add_doc(DbName, <<"doc0">>, Size, AttSize). +one_large_attachment(DbName, Size, AttSize) -> + add_doc(DbName, <<"doc0">>, Size, AttSize). add_doc(DbName, DocId, Size, AttSize) when is_binary(DocId) -> From eedb540854d2425720e74d39503b7cf27f1a4b30 Mon Sep 17 00:00:00 2001 From: Nick Vatamaniuc Date: Tue, 13 Feb 2018 11:54:52 -0500 Subject: [PATCH 3/3] Implement pluggable authentication and session support for replicator Previously replicator used only basic authentication. It was simple and straightforward. However, with PBKDF2 hashing becoming the default it would be nice not to do all the password verification work with every single request, and instead take advantage of session (cookie) based authentication. This commit implements session based authentication via a plugin mechanism. The list of available replicator auth modules is configurable. For example: ``` [replicator] auth_plugins = couch_replicator_auth_session,couch_replicator_auth_basic ``` The plugins will be tried in order. The first one to successfully initialize will end up being used for that endpoint (source or target). During the initialization callback, a plugin could decide it cannot be used in the current context. In that case it signals to be "ignored". The plugin framework will then skip over it and try to initialize the next on in the list. `couch_replicator_auth_basic` effectively implements the old behavior. This plugin should normally be used as a default catch-all at the end of the plugin list. In some cases, it might be useful to enforce exclusive use of session-based auth and fail replication jobs if it is not available. `couch_replicator_auth_session` does most of the work of handling session based authentication. On initialization, it strips away basic auth credentials from headers and url to avoid basic auth being used on the server. Then it is in charge of periodically issuing POST requests to `_session`, updating the headers of each request with the latest cookie value, and possibly picking up new session cookie if the server can issue them along with reglar responses. Currently session based auth plugin is not enabled by default and is an opt-in feature. That is, users would have to explicitly add the session module to the list of auth_plugins. In a future, session might be used by default. As discussed in #1153 this work also removes OAuth 1.0 support. After server-side support was removed, it had stopped working anyway since the main oauth app was removed. However, with the plugin framework in place it would be possible for someone to implement it as a separate module not entangled with the rest of the replicator code. Fixes #1153 --- rel/overlay/etc/default.ini | 15 + .../couch_replicator_api_wrap.hrl | 13 +- src/couch_replicator/src/couch_replicator.erl | 2 +- .../src/couch_replicator_api_wrap.erl | 7 +- .../src/couch_replicator_auth.erl | 99 +++ .../src/couch_replicator_auth_noop.erl | 52 ++ .../src/couch_replicator_auth_session.erl | 692 ++++++++++++++++++ .../src/couch_replicator_changes_reader.erl | 2 +- .../src/couch_replicator_docs.erl | 33 +- .../src/couch_replicator_httpc.erl | 59 +- .../src/couch_replicator_ids.erl | 51 +- .../src/couch_replicator_scheduler.erl | 2 +- .../src/couch_replicator_scheduler_job.erl | 2 +- .../src/couch_replicator_utils.erl | 90 ++- .../src/couch_replicator_worker.erl | 2 +- .../test/couch_replicator_proxy_tests.erl | 2 +- 16 files changed, 999 insertions(+), 124 deletions(-) rename src/couch_replicator/{src => include}/couch_replicator_api_wrap.hrl (86%) create mode 100644 src/couch_replicator/src/couch_replicator_auth.erl create mode 100644 src/couch_replicator/src/couch_replicator_auth_noop.erl create mode 100644 src/couch_replicator/src/couch_replicator_auth_session.erl diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 4017a0c228f..03f4d14e060 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -430,6 +430,21 @@ ssl_certificate_max_depth = 3 ; Re-check cluster state at least every cluster_quiet_period seconds ; cluster_quiet_period = 60 +; List of replicator client authentication plugins to try. Plugins will be +; tried in order. The first to initialize successfully will be used for that +; particular endpoint (source or target). Normally couch_replicator_auth_noop +; would be used at the end of the list as a "catch-all". It doesn't do anything +; and effectively implements the previous behavior of using basic auth. +; There are currently two plugins available: +; couch_replicator_auth_session - use _session cookie authentication +; couch_replicator_auth_noop - use basic authentication (previous default) +; Currently previous default behavior is still the default. To start using +; session auth, use this as the list of plugins: +; `couch_replicator_auth_session,couch_replicator_auth_noop`. +; In a future release the session plugin might be used by default. +;auth_plugins = couch_replicator_auth_noop + + [compaction_daemon] ; The delay, in seconds, between each check for which database and view indexes ; need to be compacted. diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.hrl b/src/couch_replicator/include/couch_replicator_api_wrap.hrl similarity index 86% rename from src/couch_replicator/src/couch_replicator_api_wrap.hrl rename to src/couch_replicator/include/couch_replicator_api_wrap.hrl index d2e0fdff5cd..0f8213c5155 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.hrl +++ b/src/couch_replicator/include/couch_replicator_api_wrap.hrl @@ -14,7 +14,7 @@ -record(httpdb, { url, - oauth = nil, + auth_props = [], headers = [ {"Accept", "application/json"}, {"User-Agent", "CouchDB-Replicator/" ++ couch_server:get_version()} @@ -26,13 +26,6 @@ httpc_pool = nil, http_connections, first_error_timestamp = nil, - proxy_url -}). - --record(oauth, { - consumer_key, - token, - token_secret, - consumer_secret, - signature_method + proxy_url, + auth_context = nil }). diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl index 8b7cd5cb197..39141c30178 100644 --- a/src/couch_replicator/src/couch_replicator.erl +++ b/src/couch_replicator/src/couch_replicator.erl @@ -25,7 +25,7 @@ -include_lib("couch/include/couch_db.hrl"). -include("couch_replicator.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). -include_lib("mem3/include/mem3.hrl"). diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl index b5ea57c3c9b..44c290d33b6 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -142,7 +142,8 @@ db_open(DbName, Options, Create, _CreateParams) -> throw({unauthorized, DbName}) end. -db_close(#httpdb{httpc_pool = Pool}) -> +db_close(#httpdb{httpc_pool = Pool} = HttpDb) -> + couch_replicator_auth:cleanup(HttpDb), unlink(Pool), ok = couch_replicator_httpc_pool:stop(Pool); db_close(DbName) -> @@ -1009,7 +1010,7 @@ header_value(Key, Headers, Default) -> normalize_db(#httpdb{} = HttpDb) -> #httpdb{ url = HttpDb#httpdb.url, - oauth = HttpDb#httpdb.oauth, + auth_props = lists:sort(HttpDb#httpdb.auth_props), headers = lists:keysort(1, HttpDb#httpdb.headers), timeout = HttpDb#httpdb.timeout, ibrowse_options = lists:keysort(1, HttpDb#httpdb.ibrowse_options), @@ -1037,7 +1038,7 @@ maybe_append_create_query_params(Db, CreateParams) -> normalize_http_db_test() -> HttpDb = #httpdb{ url = "http://host/db", - oauth = #oauth{}, + auth_props = [{"key", "val"}], headers = [{"k2","v2"}, {"k1","v1"}], timeout = 30000, ibrowse_options = [{k2, v2}, {k1, v1}], diff --git a/src/couch_replicator/src/couch_replicator_auth.erl b/src/couch_replicator/src/couch_replicator_auth.erl new file mode 100644 index 00000000000..1c9a4972320 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_auth.erl @@ -0,0 +1,99 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_auth). + + +-export([ + initialize/1, + update_headers/2, + handle_response/3, + cleanup/1 +]). + + +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). + + +-type headers() :: [{string(), string()}]. +-type code() :: non_neg_integer(). + + +-define(DEFAULT_PLUGINS, "couch_replicator_auth_noop"). + + +% Behavior API + +-callback initialize(#httpdb{}) -> {ok, #httpdb{}, term()} | ignore. + +-callback update_headers(term(), headers()) -> {headers(), term()}. + +-callback handle_response(term(), code(), headers()) -> + {continue | retry, term()}. + +-callback cleanup(term()) -> ok. + + +% Main API + +-spec initialize(#httpdb{}) -> {ok, #httpdb{}} | {error, term()}. +initialize(#httpdb{auth_context = nil} = HttpDb) -> + case try_initialize(get_plugin_modules(), HttpDb) of + {ok, Mod, HttpDb1, Context} -> + {ok, HttpDb1#httpdb{auth_context = {Mod, Context}}}; + {error, Error} -> + {error, Error} + end. + + +-spec update_headers(#httpdb{}, headers()) -> {headers(), #httpdb{}}. +update_headers(#httpdb{auth_context = {Mod, Context}} = HttpDb, Headers) -> + {Headers1, Context1} = Mod:update_headers(Context, Headers), + {Headers1, HttpDb#httpdb{auth_context = {Mod, Context1}}}. + + +-spec handle_response(#httpdb{}, code(), headers()) -> + {continue | retry, term()}. +handle_response(#httpdb{} = HttpDb, Code, Headers) -> + {Mod, Context} = HttpDb#httpdb.auth_context, + {Res, Context1} = Mod:handle_response(Context, Code, Headers), + {Res, HttpDb#httpdb{auth_context = {Mod, Context1}}}. + + +-spec cleanup(#httpdb{}) -> #httpdb{}. +cleanup(#httpdb{auth_context = {Module, Context}} = HttpDb) -> + ok = Module:cleanup(Context), + HttpDb#httpdb{auth_context = nil}. + + +% Private helper functions + +-spec get_plugin_modules() -> [atom()]. +get_plugin_modules() -> + Plugins1 = config:get("replicator", "auth_plugins", ?DEFAULT_PLUGINS), + [list_to_atom(Plugin) || Plugin <- string:tokens(Plugins1, ",")]. + + +try_initialize([], _HttpDb) -> + {error, no_more_auth_plugins_left_to_try}; +try_initialize([Mod | Modules], HttpDb) -> + try Mod:initialize(HttpDb) of + {ok, HttpDb1, Context} -> + {ok, Mod, HttpDb1, Context}; + ignore -> + try_initialize(Modules, HttpDb); + {error, Error} -> + {error, Error} + catch + error:undef -> + {error, {could_not_load_plugin_module, Mod}} + end. diff --git a/src/couch_replicator/src/couch_replicator_auth_noop.erl b/src/couch_replicator/src/couch_replicator_auth_noop.erl new file mode 100644 index 00000000000..5dbf13335c9 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_auth_noop.erl @@ -0,0 +1,52 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_auth_noop). + + +-behavior(couch_replicator_auth). + + +-export([ + initialize/1, + update_headers/2, + handle_response/3, + cleanup/1 +]). + + +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). + + +-type headers() :: [{string(), string()}]. +-type code() :: non_neg_integer(). + + +-spec initialize(#httpdb{}) -> {ok, #httpdb{}, term()} | ignore. +initialize(#httpdb{} = HttpDb) -> + {ok, HttpDb, nil}. + + +-spec update_headers(term(), headers()) -> {headers(), term()}. +update_headers(Context, Headers) -> + {Headers, Context}. + + +-spec handle_response(term(), code(), headers()) -> + {continue | retry, term()}. +handle_response(Context, _Code, _Headers) -> + {continue, Context}. + + +-spec cleanup(term()) -> ok. +cleanup(_Context) -> + ok. diff --git a/src/couch_replicator/src/couch_replicator_auth_session.erl b/src/couch_replicator/src/couch_replicator_auth_session.erl new file mode 100644 index 00000000000..3fff295725a --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_auth_session.erl @@ -0,0 +1,692 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +% This is the replicator session auth plugin. It implements session based +% authentication for the replicator. The only public API are the functions from +% the couch_replicator_auth behaviour. Most of the logic and state is in the +% gen_server. An instance of a gen_server could be spawned for the source and +% target endpoints of each replication jobs. +% +% The workflow is roughly this: +% +% * On initialization, try to get a cookie in `refresh/1` If an error occurs, +% the crash. If `_session` endpoint fails with a 404 (not found), return +% `ignore` assuming session authentication is not support or we simply hit a +% non-CouchDb server. +% +% * Before each request, auth framework calls `update_headers` API function. +% Before updating the headers and returning, check if need to refresh again. +% The check looks `next_refresh` time. If that time is set (not `infinity`) +% and just expired, then obtain a new cookie, then update headers and +% return. +% +% * After each request, auth framework calls `handle_response` function. If +% request was successful check if a new cookie was sent by the server in the +% `Set-Cookie` header. If it was then then that becomes the current cookie. +% +% * If last request has an auth failure, check if request used a stale cookie +% In this case nothing is done, and the client is told to retry. Next time +% it updates its headers befor the request it should pick up the latest +% cookie. +% +% * If last request failed and cookie was the latest known cookie, schedule a +% refresh and tell client to retry. However, if the cookie was just updated, +% tell the client to continue such that it will handle the auth failure on +% its own via a set of retries with exponential backoffs. This is it to +% ensure if something goes wrong and one of the endpoints issues invalid +% cookies, replicator won't be stuck in a busy loop refreshing them. + + +-module(couch_replicator_auth_session). + + +-behaviour(couch_replicator_auth). +-behaviour(gen_server). + + +-export([ + initialize/1, + update_headers/2, + handle_response/3, + cleanup/1 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3, + format_status/2 +]). + + +-include_lib("ibrowse/include/ibrowse.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). + + +-type headers() :: [{string(), string()}]. +-type code() :: non_neg_integer(). +-type creds() :: {string() | undefined, string() | undefined}. + + +-define(MIN_UPDATE_INTERVAL, 5). + + +-record(state, { + epoch = 0 :: non_neg_integer(), + cookie :: string() | undefined, + user :: string() | undefined, + pass :: string() | undefined, + httpdb_timeout :: integer(), + httpdb_pool :: pid(), + httpdb_ibrowse_options = [] :: list(), + session_url :: string(), + next_refresh = infinity :: infinity | non_neg_integer(), + refresh_tstamp = 0 :: non_neg_integer() +}). + + +% Behavior API callbacks + +-spec initialize(#httpdb{}) -> {ok, #httpdb{}, term()} | ignore. +initialize(#httpdb{} = HttpDb) -> + case init_state(HttpDb) of + {ok, HttpDb1, State} -> + {ok, Pid} = gen_server:start_link(?MODULE, [State], []), + Epoch = State#state.epoch, + Timeout = State#state.httpdb_timeout, + {ok, HttpDb1, {Pid, Epoch, Timeout}}; + {error, Error} -> + {error, Error}; + ignore -> + ignore + end. + + +-spec update_headers(term(), headers()) -> {headers(), term()}. +update_headers({Pid, Epoch, Timeout}, Headers) -> + Args = {update_headers, Headers, Epoch}, + {Headers1, Epoch1} = gen_server:call(Pid, Args, Timeout * 10), + {Headers1, {Pid, Epoch1, Timeout}}. + + +-spec handle_response(term(), code(), headers()) -> + {continue | retry, term()}. +handle_response({Pid, Epoch, Timeout}, Code, Headers) -> + Args = {handle_response, Code, Headers, Epoch}, + {Retry, Epoch1} = gen_server:call(Pid, Args, Timeout * 10), + {Retry, {Pid, Epoch1, Timeout}}. + + +-spec cleanup(term()) -> ok. +cleanup({Pid, _Epoch, Timeout}) -> + gen_server:call(Pid, stop, Timeout * 10). + + +%% gen_server functions + +init([#state{} = State]) -> + {ok, State}. + + +terminate(_Reason, _State) -> + ok. + + +handle_call({update_headers, Headers, _Epoch}, _From, State) -> + case maybe_refresh(State) of + {ok, State1} -> + Cookie = "AuthSession=" ++ State1#state.cookie, + Headers1 = [{"Cookie", Cookie} | Headers], + {reply, {Headers1, State1#state.epoch}, State1}; + {error, Error} -> + LogMsg = "~p: Stopping session auth plugin because of error ~p", + couch_log:error(LogMsg, [?MODULE, Error]), + {stop, Error, State} + end; + +handle_call({handle_response, Code, Headers, Epoch}, _From, State) -> + {Retry, State1} = process_response(Code, Headers, Epoch, State), + {reply, {Retry, State1#state.epoch}, State1}; + +handle_call(stop, _From, State) -> + {stop, normal, ok, State}. + + +handle_cast(Msg, State) -> + couch_log:error("~p: Received un-expected cast ~p", [?MODULE, Msg]), + {noreply, State}. + + +handle_info(Msg, State) -> + couch_log:error("~p : Received un-expected message ~p", [?MODULE, Msg]), + {noreply, State}. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +format_status(_Opt, [_PDict, State]) -> + [ + {epoch, State#state.epoch}, + {user, State#state.user}, + {session_url, State#state.session_url}, + {refresh_tstamp, State#state.refresh_tstamp} + ]. + + +%% Private helper functions + + +-spec init_state(#httpdb{}) -> + {ok, #httpdb{}, #state{}} | {error, term()} | ignore. +init_state(#httpdb{} = HttpDb) -> + case extract_creds(HttpDb) of + {ok, User, Pass, HttpDb1} -> + State = #state{ + user = User, + pass = Pass, + session_url = get_session_url(HttpDb1#httpdb.url), + httpdb_pool = HttpDb1#httpdb.httpc_pool, + httpdb_timeout = HttpDb1#httpdb.timeout, + httpdb_ibrowse_options = HttpDb1#httpdb.ibrowse_options + }, + case refresh(State) of + {ok, State1} -> + {ok, HttpDb1, State1}; + {error, {session_not_supported, _, _}} -> + ignore; + {error, Error} -> + {error, Error} + end; + {error, missing_credentials} -> + ignore; + {error, Error} -> + {error, Error} + end. + + +-spec extract_creds(#httpdb{}) -> + {ok, string(), string(), #httpdb{}} | {error, term()}. +extract_creds(#httpdb{url = Url, headers = Headers} = HttpDb) -> + {{HeadersUser, HeadersPass}, HeadersNoCreds} = + couch_replicator_utils:remove_basic_auth_from_headers(Headers), + case extract_creds_from_url(Url) of + {ok, UrlUser, UrlPass, UrlNoCreds} -> + case pick_creds({UrlUser, UrlPass}, {HeadersUser, HeadersPass}) of + {ok, User, Pass} -> + HttpDb1 = HttpDb#httpdb{ + url = UrlNoCreds, + headers = HeadersNoCreds + }, + {ok, User, Pass, HttpDb1}; + {error, Error} -> + {error, Error} + end; + {error, Error} -> + {error, Error} + end. + + +% Credentials could be specified in the url and/or in the headers. +% * If no credentials specified return error. +% * If specified in url but not in headers, pick url creds. +% * Otherwise pick headers creds. +% +-spec pick_creds(creds(), creds()) -> + {ok, string(), string()} | {error, missing_credentials}. +pick_creds({undefined, _}, {undefined, _}) -> + {error, missing_credentials}; +pick_creds({UrlUser, UrlPass}, {undefined, _}) -> + {ok, UrlUser, UrlPass}; +pick_creds({_, _}, {HeadersUser, HeadersPass}) -> + {ok, HeadersUser, HeadersPass}. + + +-spec extract_creds_from_url(string()) -> + {ok, string() | undefined, string() | undefined, string()} | + {error, term()}. +extract_creds_from_url(Url) -> + case ibrowse_lib:parse_url(Url) of + {error, Error} -> + {error, Error}; + #url{username = undefined, password = undefined} -> + {ok, undefined, undefined, Url}; + #url{protocol = Proto, username = User, password = Pass} -> + % Excise user and pass parts from the url. Try to keep the host, + % port and path as they were in the original. + Prefix = lists:concat([Proto, "://", User, ":", Pass, "@"]), + Suffix = lists:sublist(Url, length(Prefix) + 1, length(Url) + 1), + NoCreds = lists:concat([Proto, "://", Suffix]), + {ok, User, Pass, NoCreds} + end. + + +-spec process_response(non_neg_integer(), headers(), + non_neg_integer(), #state{}) -> {retry | continue, #state{}}. +process_response(403, _Headers, Epoch, State) -> + process_auth_failure(Epoch, State); +process_response(401, _Headers, Epoch, State) -> + process_auth_failure(Epoch, State); +process_response(Code, Headers, _Epoch, State) when Code >= 200, Code < 300 -> + % If server noticed cookie is about to time out it can send a new cookie in + % the response headers. Take advantage of that and refresh the cookie. + State1 = case maybe_update_cookie(Headers, State) of + {ok, UpdatedState} -> + UpdatedState; + {error, cookie_not_found} -> + State; + {error, Other} -> + LogMsg = "~p : Could not parse cookie from response headers ~p", + couch_log:error(LogMsg, [?MODULE, Other]), + State + end, + {continue, State1}; +process_response(_Code, _Headers, _Epoch, State) -> + {continue, State}. + + +-spec process_auth_failure(non_neg_integer(), #state{}) -> + {retry | continue, #state{}}. +process_auth_failure(Epoch, #state{epoch = StateEpoch} = State) + when StateEpoch > Epoch -> + % This request used an outdated cookie, tell it to immediately retry + % and it will pick up the current cookie when its headers are updated + {retry, State}; +process_auth_failure(Epoch, #state{epoch = Epoch} = State) -> + MinInterval = min_update_interval(), + case cookie_age_sec(State, now_sec()) of + AgeSec when AgeSec < MinInterval -> + % A recently acquired cookie failed. Schedule a refresh and + % return `continue` to let httpc's retry apply a backoff + {continue, schedule_refresh(now_sec() + MinInterval, State)}; + _AgeSec -> + % Current cookie failed auth. Schedule refresh and ask + % httpc to retry the request. + {retry, schedule_refresh(now_sec(), State)} + end. + + +-spec get_session_url(string()) -> string(). +get_session_url(Url) -> + #url{ + protocol = Proto, + host = Host, + port = Port + } = ibrowse_lib:parse_url(Url), + WithPort = lists:concat([Proto, "://", Host, ":", Port]), + case lists:prefix(WithPort, Url) of + true -> + % Explicit port specified in the original url + WithPort ++ "/_session"; + false -> + % Implicit proto default port was used + lists:concat([Proto, "://", Host, "/_session"]) + end. + + +-spec schedule_refresh(non_neg_integer(), #state{}) -> #state{}. +schedule_refresh(T, #state{next_refresh = Tc} = State) when T < Tc -> + State#state{next_refresh = T}; +schedule_refresh(_, #state{} = State) -> + State. + + +-spec maybe_refresh(#state{}) -> {ok, #state{}} | {error, term()}. +maybe_refresh(#state{next_refresh = T} = State) -> + case now_sec() >= T of + true -> + refresh(State#state{next_refresh = infinity}); + false -> + {ok, State} + end. + + +-spec refresh(#state{}) -> {ok, #state{}} | {error, term()}. +refresh(#state{session_url = Url, user = User, pass = Pass} = State) -> + Body = mochiweb_util:urlencode([{name, User}, {password, Pass}]), + Headers = [{"Content-Type", "application/x-www-form-urlencoded"}], + Result = http_request(State, Url, Headers, post, Body), + http_response(Result, State). + + +-spec http_request(#state{}, string(), headers(), atom(), iolist()) -> + {ok, string(), headers(), binary()} | {error, term()}. +http_request(#state{httpdb_pool = Pool} = State, Url, Headers, Method, Body) -> + Timeout = State#state.httpdb_timeout, + Opts = [ + {response_format, binary}, + {inactivity_timeout, Timeout} + | State#state.httpdb_ibrowse_options + ], + {ok, Wrk} = couch_replicator_httpc_pool:get_worker(Pool), + try + ibrowse:send_req_direct(Wrk, Url, Headers, Method, Body, Opts, Timeout) + after + ok = couch_replicator_httpc_pool:release_worker(Pool, Wrk) + end. + + +-spec http_response({ok, string(), headers(), binary()} | {error, term()}, + #state{}) -> {ok, #state{}} | {error, term()}. +http_response({ok, "200", Headers, _}, State) -> + maybe_update_cookie(Headers, State); +http_response({ok, "401", _, _}, #state{session_url = Url, user = User}) -> + {error, {session_request_unauthorized, Url, User}}; +http_response({ok, "403", _, _}, #state{session_url = Url, user = User}) -> + {error, {session_request_forbidden, Url, User}}; +http_response({ok, "404", _, _}, #state{session_url = Url, user = User}) -> + {error, {session_not_supported, Url, User}}; +http_response({ok, Code, _, _}, #state{session_url = Url, user = User}) -> + {error, {session_unexpected_result, Code, Url, User}}; +http_response({error, Error}, #state{session_url = Url, user = User}) -> + {error, {session_request_failed, Url, User, Error}}. + + +-spec parse_cookie(list()) -> {ok, string()} | {error, term()}. +parse_cookie(Headers0) -> + Headers = mochiweb_headers:make(Headers0), + case mochiweb_headers:get_value("Set-Cookie", Headers) of + undefined -> + {error, cookie_not_found}; + CookieHeader -> + CookieKVs = mochiweb_cookies:parse_cookie(CookieHeader), + CaseInsKVs = mochiweb_headers:make(CookieKVs), + case mochiweb_headers:get_value("AuthSession", CaseInsKVs) of + undefined -> + {error, cookie_format_invalid}; + Cookie -> + {ok, Cookie} + end + end. + + +-spec maybe_update_cookie(headers(), #state{}) -> + {ok, string()} | {error, term()}. +maybe_update_cookie(ResponseHeaders, State) -> + case parse_cookie(ResponseHeaders) of + {ok, Cookie} -> + {ok, update_cookie(State, Cookie, now_sec())}; + {error, Error} -> + {error, Error} + end. + + +-spec update_cookie(#state{}, string(), non_neg_integer()) -> #state{}. +update_cookie(#state{cookie = Cookie} = State, Cookie, _) -> + State; +update_cookie(#state{epoch = Epoch} = State, Cookie, NowSec) -> + State#state{ + epoch = Epoch + 1, + cookie = Cookie, + refresh_tstamp = NowSec + }. + + +-spec cookie_age_sec(#state{}, non_neg_integer()) -> non_neg_integer(). +cookie_age_sec(#state{refresh_tstamp = RefreshTs}, Now) -> + max(0, Now - RefreshTs). + + +-spec now_sec() -> non_neg_integer(). +now_sec() -> + {Mega, Sec, _Micro} = os:timestamp(), + Mega * 1000000 + Sec. + + +-spec min_update_interval() -> non_neg_integer(). +min_update_interval() -> + config:get_integer("replicator", "session_min_update_interval", + ?MIN_UPDATE_INTERVAL). + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + + +get_session_url_test_() -> + [?_assertEqual(SessionUrl, get_session_url(Url)) || {Url, SessionUrl} <- [ + {"http://host/db", "http://host/_session"}, + {"http://127.0.0.1/db", "http://127.0.0.1/_session"}, + {"http://host/x/y/z", "http://host/_session"}, + {"http://host:5984/db", "http://host:5984/_session"}, + {"https://host/db?q=1", "https://host/_session"} + ]]. + + +extract_creds_success_test_() -> + DefaultHeaders = (#httpdb{})#httpdb.headers, + [?_assertEqual({ok, User, Pass, HttpDb2}, extract_creds(HttpDb1)) || + {HttpDb1, {User, Pass, HttpDb2}} <- [ + { + #httpdb{url = "http://u:p@x.y/db"}, + {"u", "p", #httpdb{url = "http://x.y/db"}} + }, + { + #httpdb{url = "http://u:p@h:80/db"}, + {"u", "p", #httpdb{url = "http://h:80/db"}} + }, + { + #httpdb{url = "https://u:p@h/db"}, + {"u", "p", #httpdb{url = "https://h/db"}} + }, + { + #httpdb{url = "http://u:p@127.0.0.1:5984/db"}, + {"u", "p", #httpdb{url = "http://127.0.0.1:5984/db"}} + }, + { + #httpdb{url = "http://u:p@[2001:db8:a1b:12f9::1]/db"}, + {"u", "p", #httpdb{url = "http://[2001:db8:a1b:12f9::1]/db"}} + }, + { + #httpdb{url = "http://u:p@[2001:db8:a1b:12f9::1]:81/db"}, + {"u", "p", #httpdb{url = "http://[2001:db8:a1b:12f9::1]:81/db"}} + }, + { + #httpdb{url = "http://u:p@x.y/db/other?query=Z&query=w"}, + {"u", "p", #httpdb{url = "http://x.y/db/other?query=Z&query=w"}} + }, + { + #httpdb{ + url = "http://h/db", + headers = DefaultHeaders ++ [ + {"Authorization", "Basic " ++ b64creds("u", "p")} + ] + }, + {"u", "p", #httpdb{url = "http://h/db"}} + }, + { + #httpdb{ + url = "http://h/db", + headers = DefaultHeaders ++ [ + {"aUthoriZation", "bASIC " ++ b64creds("U", "p")} + ] + }, + {"U", "p", #httpdb{url = "http://h/db"}} + }, + { + #httpdb{ + url = "http://u1:p1@h/db", + headers = DefaultHeaders ++ [ + {"Authorization", "Basic " ++ b64creds("u2", "p2")} + ] + }, + {"u2", "p2", #httpdb{url = "http://h/db"}} + } + ]]. + + +cookie_update_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + t_do_refresh(), + t_dont_refresh(), + t_process_auth_failure(), + t_process_auth_failure_stale_epoch(), + t_process_auth_failure_too_frequent(), + t_process_ok_update_cookie(), + t_process_ok_no_cookie(), + t_init_state_fails_on_401(), + t_init_state_404(), + t_init_state_no_creds(), + t_init_state_http_error() + ] + }. + + +t_do_refresh() -> + ?_test(begin + State = #state{next_refresh = 0}, + {ok, State1} = maybe_refresh(State), + ?assertMatch(#state{ + next_refresh = infinity, + epoch = 1, + cookie = "Abc" + }, State1) + end). + + +t_dont_refresh() -> + ?_test(begin + State = #state{next_refresh = now_sec() + 100}, + {ok, State1} = maybe_refresh(State), + ?assertMatch(State, State1), + State2 = #state{next_refresh = infinity}, + {ok, State3} = maybe_refresh(State2), + ?assertMatch(State2, State3) + end). + + +t_process_auth_failure() -> + ?_test(begin + State = #state{epoch = 1, refresh_tstamp = 0}, + {retry, State1} = process_auth_failure(1, State), + NextRefresh = State1#state.next_refresh, + ?assert(NextRefresh =< now_sec()) + end). + + +t_process_auth_failure_stale_epoch() -> + ?_test(begin + State = #state{epoch = 3}, + ?assertMatch({retry, State}, process_auth_failure(2, State)) + end). + + +t_process_auth_failure_too_frequent() -> + ?_test(begin + State = #state{epoch = 4, refresh_tstamp = now_sec()}, + ?assertMatch({continue, _}, process_auth_failure(4, State)) + end). + + +t_process_ok_update_cookie() -> + ?_test(begin + Headers = [{"set-CookiE", "AuthSession=xyz; Path=/;"}, {"X", "y"}], + Res = process_response(200, Headers, 1, #state{}), + ?assertMatch({continue, #state{cookie = "xyz", epoch = 1}}, Res), + State = #state{cookie = "xyz", refresh_tstamp = 42, epoch = 2}, + Res2 = process_response(200, Headers, 1, State), + ?assertMatch({continue, #state{cookie = "xyz", epoch = 2}}, Res2) + end). + + +t_process_ok_no_cookie() -> + ?_test(begin + Headers = [{"X", "y"}], + State = #state{cookie = "old", epoch = 3, refresh_tstamp = 42}, + Res = process_response(200, Headers, 1, State), + ?assertMatch({continue, State}, Res) + end). + + +t_init_state_fails_on_401() -> + ?_test(begin + mock_http_401_response(), + {error, Error} = init_state(#httpdb{url = "http://u:p@h"}), + SessionUrl = "http://h/_session", + ?assertEqual({session_request_unauthorized, SessionUrl, "u"}, Error) + end). + + +t_init_state_404() -> + ?_test(begin + mock_http_404_response(), + ?assertEqual(ignore, init_state(#httpdb{url = "http://u:p@h"})) + end). + + +t_init_state_no_creds() -> + ?_test(begin + ?_assertEqual(ignore, init_state(#httpdb{url = "http://h"})) + end). + + +t_init_state_http_error() -> + ?_test(begin + mock_http_error_response(), + {error, Error} = init_state(#httpdb{url = "http://u:p@h"}), + SessionUrl = "http://h/_session", + ?assertEqual({session_request_failed, SessionUrl, "u", x}, Error) + end). + + +setup() -> + meck:expect(couch_replicator_httpc_pool, get_worker, 1, {ok, worker}), + meck:expect(couch_replicator_httpc_pool, release_worker, 2, ok), + meck:expect(config, get, fun(_, _, Default) -> Default end), + mock_http_cookie_response("Abc"), + ok. + + +teardown(_) -> + meck:unload(). + + +mock_http_cookie_response(Cookie) -> + Resp = {ok, "200", [{"Set-Cookie", "AuthSession=" ++ Cookie}], []}, + meck:expect(ibrowse, send_req_direct, 7, Resp). + + +mock_http_401_response() -> + meck:expect(ibrowse, send_req_direct, 7, {ok, "401", [], []}). + + +mock_http_404_response() -> + meck:expect(ibrowse, send_req_direct, 7, {ok, "404", [], []}). + + +mock_http_error_response() -> + meck:expect(ibrowse, send_req_direct, 7, {error, x}). + + +extract_creds_error_test_() -> + [?_assertMatch({error, Error}, extract_creds(HttpDb)) || + {HttpDb, Error} <- [ + {#httpdb{url = "some_junk"}, invalid_uri}, + {#httpdb{url = "http://h/db"}, missing_credentials} + ]]. + + +b64creds(User, Pass) -> + base64:encode_to_string(User ++ ":" ++ Pass). + + +-endif. diff --git a/src/couch_replicator/src/couch_replicator_changes_reader.erl b/src/couch_replicator/src/couch_replicator_changes_reader.erl index 3659d95925c..2e4df5365d2 100644 --- a/src/couch_replicator/src/couch_replicator_changes_reader.erl +++ b/src/couch_replicator/src/couch_replicator_changes_reader.erl @@ -19,7 +19,7 @@ -export([read_changes/5]). -include_lib("couch/include/couch_db.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include("couch_replicator.hrl"). -import(couch_util, [ diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl index 1fe91eca4cd..62d21fe126f 100644 --- a/src/couch_replicator/src/couch_replicator_docs.erl +++ b/src/couch_replicator/src/couch_replicator_docs.erl @@ -35,7 +35,7 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("ibrowse/include/ibrowse.hrl"). -include_lib("mem3/include/mem3.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include("couch_replicator.hrl"). -include("couch_replicator_js_functions.hrl"). @@ -396,28 +396,9 @@ parse_rep_db({Props}, Proxy, Options) -> {BinHeaders} = get_value(<<"headers">>, Props, {[]}), Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]), DefaultHeaders = (#httpdb{})#httpdb.headers, - OAuth = case get_value(<<"oauth">>, AuthProps) of - undefined -> - nil; - {OauthProps} -> - #oauth{ - consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)), - token = ?b2l(get_value(<<"token">>, OauthProps)), - token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)), - consumer_secret = ?b2l(get_value(<<"consumer_secret">>, - OauthProps)), - signature_method = - case get_value(<<"signature_method">>, OauthProps) of - undefined -> hmac_sha1; - <<"PLAINTEXT">> -> plaintext; - <<"HMAC-SHA1">> -> hmac_sha1; - <<"RSA-SHA1">> -> rsa_sha1 - end - } - end, #httpdb{ url = Url, - oauth = OAuth, + auth_props = AuthProps, headers = lists:ukeymerge(1, Headers, DefaultHeaders), ibrowse_options = lists:keysort(1, [{socket_options, get_value(socket_options, Options)} | @@ -695,8 +676,7 @@ strip_credentials(Url) when is_binary(Url) -> "http\\1://\\2", [{return, binary}]); strip_credentials({Props}) -> - Props1 = lists:keydelete(<<"oauth">>, 1, Props), - {lists:keydelete(<<"headers">>, 1, Props1)}. + {lists:keydelete(<<"headers">>, 1, Props)}. error_reason({shutdown, Error}) -> @@ -772,10 +752,6 @@ check_strip_credentials_test() -> <<"https://remote_server/database">>, <<"https://foo:bar@remote_server/database">> }, - { - {[{<<"_id">>, <<"foo">>}]}, - {[{<<"_id">>, <<"foo">>}, {<<"oauth">>, <<"bar">>}]} - }, { {[{<<"_id">>, <<"foo">>}]}, {[{<<"_id">>, <<"foo">>}, {<<"headers">>, <<"bar">>}]} @@ -786,8 +762,7 @@ check_strip_credentials_test() -> }, { {[{<<"_id">>, <<"foo">>}]}, - {[{<<"_id">>, <<"foo">>}, {<<"oauth">>, <<"bar">>}, - {<<"headers">>, <<"baz">>}]} + {[{<<"_id">>, <<"foo">>}, {<<"headers">>, <<"baz">>}]} } ]]. diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl index 45472f43104..6e787514bdb 100644 --- a/src/couch_replicator/src/couch_replicator_httpc.erl +++ b/src/couch_replicator/src/couch_replicator_httpc.erl @@ -14,7 +14,7 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("ibrowse/include/ibrowse.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -export([setup/1]). -export([send_req/3]). @@ -51,8 +51,17 @@ setup(Db) -> undefined -> Url; _ when is_list(ProxyURL) -> ProxyURL end, - {ok, Pid} = couch_replicator_httpc_pool:start_link(HttpcURL, [{max_connections, MaxConns}]), - {ok, Db#httpdb{httpc_pool = Pid}}. + {ok, Pid} = couch_replicator_httpc_pool:start_link(HttpcURL, + [{max_connections, MaxConns}]), + case couch_replicator_auth:initialize(Db#httpdb{httpc_pool = Pid}) of + {ok, Db1} -> + {ok, Db1}; + {error, Error} -> + LogMsg = "~p: auth plugin initialization failed ~p ~p", + LogUrl = couch_util:url_strip_password(Url), + couch_log:error(LogMsg, [?MODULE, LogUrl, Error]), + throw({replication_auth_error, Error}) + end. send_req(HttpDb, Params1, Callback) -> @@ -86,11 +95,11 @@ send_req(HttpDb, Params1, Callback) -> end. -send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) -> +send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb0, Params) -> Method = get_value(method, Params, get), UserHeaders = lists:keysort(1, get_value(headers, Params, [])), Headers1 = lists:ukeymerge(1, UserHeaders, BaseHeaders), - Headers2 = oauth_header(HttpDb, Params) ++ Headers1, + {Headers2, HttpDb} = couch_replicator_auth:update_headers(HttpDb0, Headers1), Url = full_url(HttpDb, Params), Body = get_value(body, Params, []), case get_value(path, Params) == "_changes" of @@ -157,6 +166,7 @@ process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) -> Json -> ?JSON_DECODE(Json) end, + process_auth_response(HttpDb, Ok, Headers, Params), Callback(Ok, Headers, EJson); R when R =:= 301 ; R =:= 302 ; R =:= 303 -> backoff_success(HttpDb, Params), @@ -179,8 +189,9 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) -> backoff(HttpDb#httpdb{timeout = Timeout}, Params); Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) -> backoff_success(HttpDb, Params), + HttpDb1 = process_auth_response(HttpDb, Ok, Headers, Params), StreamDataFun = fun() -> - stream_data_self(HttpDb, Params, Worker, ReqId, Callback) + stream_data_self(HttpDb1, Params, Worker, ReqId, Callback) end, put(?STREAM_STATUS, {streaming, Worker}), ibrowse:stream_next(ReqId), @@ -190,9 +201,9 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) -> catch throw:{maybe_retry_req, connection_closed} -> maybe_retry({connection_closed, mid_stream}, - Worker, HttpDb, Params); + Worker, HttpDb1, Params); throw:{maybe_retry_req, Err} -> - maybe_retry(Err, Worker, HttpDb, Params) + maybe_retry(Err, Worker, HttpDb1, Params) end; R when R =:= 301 ; R =:= 302 ; R =:= 303 -> backoff_success(HttpDb, Params), @@ -216,6 +227,16 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) -> end. +process_auth_response(HttpDb, Code, Headers, Params) -> + case couch_replicator_auth:handle_response(HttpDb, Code, Headers) of + {continue, HttpDb1} -> + HttpDb1; + {retry, HttpDb1} -> + log_retry_error(Params, HttpDb1, 0, Code), + throw({retry, HttpDb1, Params}) + end. + + % Only streaming HTTP requests send messages back from % the ibrowse worker process. We can detect that based % on the ibrowse_req_id format. This just drops all @@ -397,28 +418,6 @@ query_args_to_string([{K, V} | Rest], Acc) -> query_args_to_string(Rest, [K ++ "=" ++ couch_httpd:quote(V) | Acc]). -oauth_header(#httpdb{oauth = nil}, _ConnParams) -> - []; -oauth_header(#httpdb{url = BaseUrl, oauth = OAuth}, ConnParams) -> - Consumer = { - OAuth#oauth.consumer_key, - OAuth#oauth.consumer_secret, - OAuth#oauth.signature_method - }, - Method = case get_value(method, ConnParams, get) of - get -> "GET"; - post -> "POST"; - put -> "PUT"; - head -> "HEAD" - end, - QSL = get_value(qs, ConnParams, []), - OAuthParams = oauth:sign(Method, - BaseUrl ++ get_value(path, ConnParams, []), - QSL, Consumer, OAuth#oauth.token, OAuth#oauth.token_secret) -- QSL, - [{"Authorization", - "OAuth " ++ oauth:header_params_encode(OAuthParams)}]. - - do_redirect(_Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, _Cb) -> RedirectUrl = redirect_url(Headers, Url), {HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params), diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl index e7067622bb5..e8faf8ea3f1 100644 --- a/src/couch_replicator/src/couch_replicator_ids.erl +++ b/src/couch_replicator/src/couch_replicator_ids.erl @@ -21,7 +21,7 @@ -include_lib("ibrowse/include/ibrowse.hrl"). -include_lib("couch/include/couch_db.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include("couch_replicator.hrl"). % replication_id/1 and replication_id/2 will attempt to fetch @@ -127,62 +127,25 @@ maybe_append_options(Options, RepOptions) -> end, [], Options). -get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) -> +get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers}) -> DefaultHeaders = (#httpdb{})#httpdb.headers, - case OAuth of - nil -> - {remote, Url, Headers -- DefaultHeaders}; - #oauth{} -> - {remote, Url, Headers -- DefaultHeaders, OAuth} - end; + {remote, Url, Headers -- DefaultHeaders}; get_rep_endpoint(UserCtx, <>) -> {local, DbName, UserCtx}. get_v4_endpoint(UserCtx, #httpdb{} = HttpDb) -> - {Url, Headers, OAuth} = case get_rep_endpoint(UserCtx, HttpDb) of - {remote, U, Hds} -> - {U, Hds, undefined}; - {remote, U, Hds, OA} -> - {U, Hds, OA} - end, - {UserFromHeaders, HeadersWithoutBasicAuth} = remove_basic_auth(Headers), + {remote, Url, Headers} = get_rep_endpoint(UserCtx, HttpDb), + {{UserFromHeaders, _}, HeadersWithoutBasicAuth} = + couch_replicator_utils:remove_basic_auth_from_headers(Headers), {UserFromUrl, Host, NonDefaultPort, Path} = get_v4_url_info(Url), User = pick_defined_value([UserFromUrl, UserFromHeaders]), + OAuth = undefined, % Keep this to ensure checkpoints don't change {remote, User, Host, NonDefaultPort, Path, HeadersWithoutBasicAuth, OAuth}; get_v4_endpoint(UserCtx, <>) -> {local, DbName, UserCtx}. -remove_basic_auth(Headers) -> - case lists:partition(fun is_basic_auth/1, Headers) of - {[], HeadersWithoutBasicAuth} -> - {undefined, HeadersWithoutBasicAuth}; - {[{_, "Basic " ++ Base64} | _], HeadersWithoutBasicAuth} -> - User = get_basic_auth_user(Base64), - {User, HeadersWithoutBasicAuth} - end. - - -is_basic_auth({"Authorization", "Basic " ++ _Base64}) -> - true; -is_basic_auth(_) -> - false. - - -get_basic_auth_user(Base64) -> - try re:split(base64:decode(Base64), ":", [{return, list}, {parts, 2}]) of - [User, _Pass] -> - User; - _ -> - undefined - catch - % Tolerate invalid B64 values here to avoid crashing replicator - error:function_clause -> - undefined - end. - - pick_defined_value(Values) -> case [V || V <- Values, V /= undefined] of [] -> diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl index be956b6a722..0b396346a78 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler.erl @@ -56,7 +56,7 @@ -include("couch_replicator_scheduler.hrl"). -include("couch_replicator.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include_lib("couch/include/couch_db.hrl"). %% types diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 0438249be0e..1467d9f30dd 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -29,7 +29,7 @@ ]). -include_lib("couch/include/couch_db.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include("couch_replicator_scheduler.hrl"). -include("couch_replicator.hrl"). diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl index 01881e4232e..218fcf501c4 100644 --- a/src/couch_replicator/src/couch_replicator_utils.erl +++ b/src/couch_replicator/src/couch_replicator_utils.erl @@ -27,7 +27,8 @@ get_json_value/3, pp_rep_id/1, iso8601/1, - filter_state/3 + filter_state/3, + remove_basic_auth_from_headers/1 ]). -export([ @@ -36,7 +37,7 @@ -include_lib("couch/include/couch_db.hrl"). -include("couch_replicator.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -import(couch_util, [ get_value/2, @@ -174,3 +175,88 @@ filter_state(State, States, Info) -> false -> skip end. + + +remove_basic_auth_from_headers(Headers) -> + Headers1 = mochiweb_headers:make(Headers), + case mochiweb_headers:get_value("Authorization", Headers1) of + undefined -> + {{undefined, undefined}, Headers}; + Auth -> + {Basic, Base64} = lists:splitwith(fun(X) -> X =/= $\s end, Auth), + maybe_remove_basic_auth(string:to_lower(Basic), Base64, Headers1) + end. + + +maybe_remove_basic_auth("basic", " " ++ Base64, Headers) -> + Headers1 = mochiweb_headers:delete_any("Authorization", Headers), + {decode_basic_creds(Base64), mochiweb_headers:to_list(Headers1)}; +maybe_remove_basic_auth(_, _, Headers) -> + {{undefined, undefined}, mochiweb_headers:to_list(Headers)}. + + +decode_basic_creds(Base64) -> + try re:split(base64:decode(Base64), ":", [{return, list}, {parts, 2}]) of + [User, Pass] -> + {User, Pass}; + _ -> + {undefined, undefined} + catch + % Tolerate invalid B64 values here to avoid crashing replicator + error:function_clause -> + {undefined, undefined} + end. + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +remove_basic_auth_from_headers_test_() -> + [?_assertMatch({{User, Pass}, NoAuthHeaders}, + remove_basic_auth_from_headers(Headers)) || + {{User, Pass, NoAuthHeaders}, Headers} <- [ + { + {undefined, undefined, []}, + [] + }, + { + {undefined, undefined, [{"h", "v"}]}, + [{"h", "v"}] + }, + { + {undefined, undefined, [{"Authorization", "junk"}]}, + [{"Authorization", "junk"}] + }, + { + {undefined, undefined, []}, + [{"Authorization", "basic X"}] + }, + { + {"user", "pass", []}, + [{"Authorization", "Basic " ++ b64creds("user", "pass")}] + }, + { + {"user", "pass", []}, + [{"AuThorization", "Basic " ++ b64creds("user", "pass")}] + }, + { + {"user", "pass", []}, + [{"Authorization", "bAsIc " ++ b64creds("user", "pass")}] + }, + { + {"user", "pass", [{"h", "v"}]}, + [ + {"Authorization", "Basic " ++ b64creds("user", "pass")}, + {"h", "v"} + ] + } + ] + ]. + + +b64creds(User, Pass) -> + base64:encode_to_string(User ++ ":" ++ Pass). + + +-endif. diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index db6b72b2e98..e51565866e6 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -22,7 +22,7 @@ -export([handle_call/3, handle_cast/2, handle_info/2]). -include_lib("couch/include/couch_db.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include("couch_replicator.hrl"). % TODO: maybe make both buffer max sizes configurable diff --git a/src/couch_replicator/test/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/couch_replicator_proxy_tests.erl index a40e5b16668..4f545bcb5e4 100644 --- a/src/couch_replicator/test/couch_replicator_proxy_tests.erl +++ b/src/couch_replicator/test/couch_replicator_proxy_tests.erl @@ -14,7 +14,7 @@ -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch_replicator/src/couch_replicator.hrl"). --include_lib("couch_replicator/src/couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). setup() ->