% Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the % License for the specific language governing permissions and limitations under % the License. -module(couch_replicator_api_wrap). % This module wraps the native erlang API, and allows for performing % operations on a remote vs. local databases via the same API. % % Notes: % Many options and apis aren't yet supported here, they are added as needed. -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). -include("couch_replicator_api_wrap.hrl"). -export([ db_open/1, db_open/3, db_close/1, get_db_info/1, get_pending_count/2, get_view_info/3, update_doc/3, update_doc/4, update_docs/3, update_docs/4, ensure_full_commit/1, get_missing_revs/2, bulk_get/3, open_doc/3, open_doc_revs/6, changes_since/5, db_uri/1, normalize_db/1 ]). -import(couch_replicator_httpc, [ send_req/3 ]). -import(couch_util, [ encode_doc_id/1, get_value/2, get_value/3 ]). -define(MAX_WAIT, 5 * 60 * 1000). -define(MAX_URL_LEN, 7000). -define(MIN_URL_LEN, 200). db_uri(#httpdb{url = Url}) -> couch_util:url_strip_password(Url). db_open(Db) -> db_open(Db, false, []). db_open(#httpdb{} = Db1, Create, CreateParams) -> {ok, Db} = couch_replicator_httpc:setup(Db1), try case Create of false -> ok; true -> Db2 = maybe_append_create_query_params(Db, CreateParams), send_req( Db2, [{method, put}], fun (401, _, _) -> throw({unauthorized, ?l2b(db_uri(Db2))}); (403, _, _) -> throw({forbidden, ?l2b(db_uri(Db2))}); (_, _, _) -> ok end ) end, send_req( Db, [{method, get}], fun (200, _, {Props}) -> UpdateSeq = get_value(<<"update_seq">>, Props), InstanceStart = get_value(<<"instance_start_time">>, Props), case {UpdateSeq, InstanceStart} of {undefined, _} -> throw({db_not_found, ?l2b(db_uri(Db))}); {_, undefined} -> throw({db_not_found, ?l2b(db_uri(Db))}); _ -> {ok, Db} end; (200, _, _Body) -> throw({db_not_found, ?l2b(db_uri(Db))}); (401, _, _) -> throw({unauthorized, ?l2b(db_uri(Db))}); (403, _, _) -> throw({forbidden, ?l2b(db_uri(Db))}); (_, _, _) -> throw({db_not_found, ?l2b(db_uri(Db))}) end ) catch throw:Error -> db_close(Db), throw(Error); error:Error -> db_close(Db), erlang:error(Error); exit:Error -> db_close(Db), erlang:exit(Error) end. db_close(#httpdb{httpc_pool = Pool} = HttpDb) -> couch_replicator_auth:cleanup(HttpDb), unlink(Pool), ok = couch_replicator_httpc_pool:stop(Pool). get_db_info(#httpdb{} = Db) -> send_req( Db, [], fun(200, _, {Props}) -> {ok, Props} end ). get_pending_count(#httpdb{} = Db, Seq) when is_number(Seq) -> % Source looks like Apache CouchDB and not Cloudant so we fall % back to using update sequence differences. send_req(Db, [], fun(200, _, {Props}) -> case get_value(<<"update_seq">>, Props) of UpdateSeq when is_number(UpdateSeq) -> {ok, UpdateSeq - Seq}; _ -> {ok, null} end end); get_pending_count(#httpdb{} = Db, Seq) -> EncodedSeq = couch_replicator_utils:seq_encode(Seq), Options = [{path, "_changes"}, {qs, [{"since", EncodedSeq}, {"limit", "0"}]}], send_req(Db, Options, fun(200, _, {Props}) -> {ok, couch_util:get_value(<<"pending">>, Props, null)} end). get_view_info(#httpdb{} = Db, DDocId, ViewName) -> Path = io_lib:format("~s/_view/~s/_info", [DDocId, ViewName]), send_req( Db, [{path, Path}], fun(200, _, {Props}) -> {VInfo} = couch_util:get_value(<<"view_index">>, Props, {[]}), {ok, VInfo} end ). ensure_full_commit(#httpdb{} = Db) -> send_req( Db, [ {method, post}, {path, "_ensure_full_commit"}, {headers, [{"Content-Type", "application/json"}]} ], fun (201, _, {Props}) -> {ok, get_value(<<"instance_start_time">>, Props)}; (_, _, {Props}) -> {error, get_value(<<"error">>, Props)} end ). get_missing_revs(#httpdb{} = Db, IdRevs) -> JsonBody = {[{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs]}, send_req( Db, [ {method, post}, {path, "_revs_diff"}, {body, ?JSON_ENCODE(JsonBody)}, {headers, [{"Content-Type", "application/json"}]} ], fun (200, _, {Props}) -> ConvertToNativeFun = fun({Id, {Result}}) -> MissingRevs = couch_doc:parse_revs( get_value(<<"missing">>, Result) ), PossibleAncestors = couch_doc:parse_revs( get_value(<<"possible_ancestors">>, Result, []) ), {Id, MissingRevs, PossibleAncestors} end, {ok, lists:map(ConvertToNativeFun, Props)}; (ErrCode, _, ErrMsg) when is_integer(ErrCode) -> {error, {revs_diff_failed, ErrCode, ErrMsg}} end ). bulk_get(#httpdb{} = Db, #{} = IdRevs, Options) -> FoldFun = fun({Id, Rev}, PAs, Acc) -> [{Id, Rev, PAs} | Acc] end, ReqDocsList = lists:sort(maps:fold(FoldFun, [], IdRevs)), MapFun = fun({Id, Rev, PAs}) -> #{ <<"id">> => Id, <<"rev">> => couch_doc:rev_to_str(Rev), <<"atts_since">> => couch_doc:revs_to_strs(PAs) } end, ReqDocsMaps = lists:map(MapFun, ReqDocsList), % We are also sending request parameters in the doc body with the hopes % that at some point in the future we could make that the default, instead % of having to send query parameters with a POST request as we do today Body = options_to_json_map(Options, #{<<"docs">> => ReqDocsMaps}), Req = [ {method, post}, {path, "_bulk_get"}, {qs, options_to_query_args(Options, [])}, {body, ?JSON_ENCODE(Body)}, {headers, [ {"Content-Type", "application/json"}, {"Accept", "application/json"} ]} ], try send_req(Db, Req, fun (200, _, {[{<<"results">>, Res}]}) when is_list(Res) -> Zip = lists:zipwith(fun bulk_get_zip/2, ReqDocsList, Res), {ok, maps:from_list(Zip)}; (200, _, _) -> {error, {bulk_get_failed, invalid_results}}; (ErrCode, _, _) when is_integer(ErrCode) -> % On older Apache CouchDB instances where _bulk_get is not % implemented we would hit the POST db/doc form uploader % handler. When that fails the request body is not consumed and % we'd end up recycling a worker with an unsent body in the % connection stream. Instead of waiting for it to blow up % eventually and consuming an extra retry attempt, proactively % advise httpc logic to stop this worker and not return back to % the pool. couch_replicator_httpc:stop_http_worker(), {error, {bulk_get_failed, ErrCode}} end) catch exit:{http_request_failed, _, _, {error, {code, ErrCode}}} -> % We are being a bit more tolerant of _bulk_get errors as we can % always fallback to individual fetches {error, {bulk_get_failed, ErrCode}} end. bulk_get_zip({Id, Rev, _}, {[_ | _] = Props}) -> Docs = couch_util:get_value(<<"docs">>, Props), ResId = couch_util:get_value(<<"id">>, Props), % "docs" is a one item list, either [{"ok": Doc}] or [{"error": Error}] case Docs of [{[{<<"ok">>, {[_ | _]} = Doc}]}] when ResId =:= Id -> {{Id, Rev}, couch_doc:from_json_obj(Doc)}; [{[{<<"error">>, {[_ | _] = Err}}]}] when ResId =:= Id -> Tag = couch_util:get_value(<<"error">>, Err), Reason = couch_util:get_value(<<"reason">>, Err), couch_log:debug("~p bulk_get zip error ~p:~p", [?MODULE, Tag, Reason]), {{Id, Rev}, {error, {Tag, Reason}}}; Other -> couch_log:debug("~p bulk_get zip other error:~p", [?MODULE, Other]), {{Id, Rev}, {error, {unexpected_bulk_get_response, Other}}} end. -spec open_doc_revs(#httpdb{}, binary(), list(), list(), function(), any()) -> no_return(). open_doc_revs(#httpdb{retries = 0} = HttpDb, Id, Revs, Options, _Fun, _Acc) -> Path = encode_doc_id(Id), QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | Options]), Url = couch_util:url_strip_password( couch_replicator_httpc:full_url(HttpDb, [{path, Path}, {qs, QS}]) ), couch_log:error("Replication crashing because GET ~s failed", [Url]), exit(kaboom); open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) -> Path = encode_doc_id(Id), QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | Options]), {Pid, Ref} = spawn_monitor(fun() -> Self = self(), Callback = fun (200, Headers, StreamDataFun) -> remote_open_doc_revs_streamer_start(Self), {<<"--">>, _, _} = couch_httpd:parse_multipart_request( header_value("Content-Type", Headers), StreamDataFun, fun mp_parse_mixed/1 ); (414, _, _) -> exit(request_uri_too_long) end, Streamer = spawn_link(fun() -> Params = [ {path, Path}, {qs, QS}, {ibrowse_options, [{stream_to, {self(), once}}]}, {headers, [{"Accept", "multipart/mixed"}]} ], % We're setting retries to 0 here to avoid the case where the % Streamer retries the request and ends up jumbling together two % different response bodies. Retries are handled explicitly by % open_doc_revs itself. send_req(HttpDb#httpdb{retries = 0}, Params, Callback) end), % If this process dies normally we can leave % the Streamer process hanging around keeping an % HTTP connection open. This is a bit of a % hammer approach to making sure it releases % that connection back to the pool. spawn(fun() -> Ref = erlang:monitor(process, Self), receive {'DOWN', Ref, process, Self, normal} -> exit(Streamer, {streamer_parent_died, Self}); {'DOWN', Ref, process, Self, _} -> ok end end), receive {started_open_doc_revs, Ref} -> Ret = receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc), exit({exit_ok, Ret}) end end), receive {'DOWN', Ref, process, Pid, {exit_ok, Ret}} -> Ret; {'DOWN', Ref, process, Pid, {{nocatch, missing_doc}, _}} -> throw(missing_doc); {'DOWN', Ref, process, Pid, {{nocatch, {missing_stub, _} = Stub}, _}} -> throw(Stub); {'DOWN', Ref, process, Pid, {http_request_failed, _, _, max_backoff}} -> exit(max_backoff); {'DOWN', Ref, process, Pid, request_uri_too_long} -> NewMaxLen = get_value(max_url_len, Options, ?MAX_URL_LEN) div 2, case NewMaxLen < ?MIN_URL_LEN of true -> throw(request_uri_too_long); false -> couch_log:info( "Reducing url length to ~B because of" " 414 response", [NewMaxLen] ), Options1 = lists:keystore( max_url_len, 1, Options, {max_url_len, NewMaxLen} ), open_doc_revs(HttpDb, Id, Revs, Options1, Fun, Acc) end; {'DOWN', Ref, process, Pid, Else} -> Url = couch_util:url_strip_password( couch_replicator_httpc:full_url(HttpDb, [{path, Path}, {qs, QS}]) ), #httpdb{retries = Retries, wait = Wait0} = HttpDb, Wait = 2 * erlang:min(Wait0 * 2, ?MAX_WAIT), couch_log:notice( "Retrying GET to ~s in ~p seconds due to error ~w", [Url, Wait / 1000, error_reason(Else)] ), ok = timer:sleep(Wait), RetryDb = HttpDb#httpdb{ retries = Retries - 1, wait = Wait }, open_doc_revs(RetryDb, Id, Revs, Options, Fun, Acc) end. error_reason({http_request_failed, "GET", _Url, {error, timeout}}) -> timeout; error_reason({http_request_failed, "GET", _Url, {error, {_, req_timedout}}}) -> req_timedout; error_reason({http_request_failed, "GET", _Url, Error}) -> Error; error_reason(Else) -> Else. open_doc(#httpdb{} = Db, Id, Options) -> send_req( Db, [{path, encode_doc_id(Id)}, {qs, options_to_query_args(Options, [])}], fun (200, _, Body) -> {ok, couch_doc:from_json_obj(Body)}; (_, _, {Props}) -> {error, get_value(<<"error">>, Props)} end ). update_doc(Db, Doc, Options) -> update_doc(Db, Doc, Options, ?INTERACTIVE_EDIT). update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) -> QArgs = case Type of ?REPLICATED_CHANGES -> [{"new_edits", "false"}]; _ -> [] end ++ options_to_query_args(Options, []), Boundary = couch_uuids:random(), JsonBytes = ?JSON_ENCODE( couch_doc:to_json_obj( Doc, [revs, attachments, follows, att_encoding_info | Options] ) ), {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream( Boundary, JsonBytes, Doc#doc.atts, true ), Headers = case lists:member(delay_commit, Options) of true -> [{"X-Couch-Full-Commit", "false"}]; false -> [] end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}], Body = {fun stream_doc/1, {JsonBytes, Doc#doc.atts, Boundary, Len}}, send_req( % A crash here bubbles all the way back up to run_user_fun inside % open_doc_revs, which will retry the whole thing. That's the % appropriate course of action, since we've already started streaming % the response body from the GET request. HttpDb#httpdb{retries = 0}, [ {method, put}, {path, encode_doc_id(DocId)}, {qs, QArgs}, {headers, Headers}, {body, Body} ], fun (Code, _, {Props}) when Code =:= 200 orelse Code =:= 201 orelse Code =:= 202 -> {ok, couch_doc:parse_rev(get_value(<<"rev">>, Props))}; (409, _, _) -> throw(conflict); (Code, _, {Props}) -> case {Code, get_value(<<"error">>, Props)} of {401, <<"unauthorized">>} -> throw({unauthorized, get_value(<<"reason">>, Props)}); {403, <<"forbidden">>} -> throw({forbidden, get_value(<<"reason">>, Props)}); {412, <<"missing_stub">>} -> throw({missing_stub, get_value(<<"reason">>, Props)}); {413, _} -> {error, request_body_too_large}; {_, Error} -> {error, Error} end end ). update_docs(Db, DocList, Options) -> update_docs(Db, DocList, Options, ?INTERACTIVE_EDIT). update_docs(_Db, [], _Options, _UpdateType) -> {ok, []}; update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) -> FullCommit = atom_to_list(not lists:member(delay_commit, Options)), Prefix = case UpdateType of ?REPLICATED_CHANGES -> <<"{\"new_edits\":false,\"docs\":[">>; ?INTERACTIVE_EDIT -> <<"{\"docs\":[">> end, Suffix = <<"]}">>, % Note: nginx and other servers don't like PUT/POST requests without % a Content-Length header, so we can't do a chunked transfer encoding % and JSON encode each doc only before sending it through the socket. {Docs, Len} = lists:mapfoldl( fun (#doc{} = Doc, Acc) -> Json = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])), {Json, Acc + iolist_size(Json)}; (Doc, Acc) -> {Doc, Acc + iolist_size(Doc)} end, byte_size(Prefix) + byte_size(Suffix) + length(DocList) - 1, DocList ), BodyFun = fun (eof) -> eof; ([]) -> {ok, Suffix, eof}; ([prefix | Rest]) -> {ok, Prefix, Rest}; ([Doc]) -> {ok, Doc, []}; ([Doc | RestDocs]) -> {ok, [Doc, ","], RestDocs} end, Headers = [ {"Content-Length", Len}, {"Content-Type", "application/json"}, {"X-Couch-Full-Commit", FullCommit} ], send_req( HttpDb, [ {method, post}, {path, "_bulk_docs"}, {body, {BodyFun, [prefix | Docs]}}, {headers, Headers} ], fun (201, _, Results) when is_list(Results) -> {ok, bulk_results_to_errors(Results)}; (413, _, _) -> {error, request_body_too_large}; (417, _, Results) when is_list(Results) -> {ok, bulk_results_to_errors(Results)}; (ErrCode, _, ErrMsg) when is_integer(ErrCode) -> {error, {bulk_docs_failed, ErrCode, ErrMsg}} end ). changes_since( #httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb, Style, StartSeq, UserFun, Options ) -> Timeout = erlang:max(1000, InactiveTimeout div 3), EncodedSeq = couch_replicator_utils:seq_encode(StartSeq), BaseQArgs = case get_value(continuous, Options, false) of false -> [{"feed", "normal"}]; true -> [{"feed", "continuous"}] end ++ [ {"style", atom_to_list(Style)}, {"since", EncodedSeq}, {"timeout", integer_to_list(Timeout)} ], DocIds = get_value(doc_ids, Options), Selector = get_value(selector, Options), {QArgs, Method, Body, Headers} = case {DocIds, Selector} of {undefined, undefined} -> QArgs1 = maybe_add_changes_filter_q_args(BaseQArgs, Options), {QArgs1, get, [], Headers1}; {undefined, _} when is_tuple(Selector) -> Headers2 = [{"Content-Type", "application/json"} | Headers1], JsonSelector = ?JSON_ENCODE({[{<<"selector">>, Selector}]}), {[{"filter", "_selector"} | BaseQArgs], post, JsonSelector, Headers2}; {_, undefined} when is_list(DocIds) -> Headers2 = [{"Content-Type", "application/json"} | Headers1], JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}), {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2} end, try send_req( HttpDb, [ {method, Method}, {path, "_changes"}, {qs, QArgs}, {headers, Headers}, {body, Body}, {ibrowse_options, [{stream_to, {self(), once}}]} ], fun (200, _, DataStreamFun) -> parse_changes_feed(Options, UserFun, DataStreamFun); (405, _, _) when is_list(DocIds) -> % CouchDB versions < 1.1.0 don't have the builtin % _changes feed filter "_doc_ids" neither support POST send_req( HttpDb, [ {method, get}, {path, "_changes"}, {qs, BaseQArgs}, {headers, Headers1}, {ibrowse_options, [{stream_to, {self(), once}}]} ], fun(200, _, DataStreamFun2) -> UserFun2 = fun (#doc_info{id = Id} = DocInfo) -> case lists:member(Id, DocIds) of true -> UserFun(DocInfo); false -> ok end; (LastSeq) -> UserFun(LastSeq) end, parse_changes_feed( Options, UserFun2, DataStreamFun2 ) end ); (ErrCode, _, ErrMsg) when is_integer(ErrCode) -> throw({retry_limit, {changes_req_failed, ErrCode, ErrMsg}}) end ) catch exit:{http_request_failed, _, _, max_backoff} -> exit(max_backoff); exit:{http_request_failed, _, _, {error, {connection_closed, mid_stream}}} -> throw(retry_no_limit); exit:{http_request_failed, _, _, _} = Error -> throw({retry_limit, Error}) end. % internal functions maybe_add_changes_filter_q_args(BaseQS, Options) -> case get_value(filter, Options) of undefined -> BaseQS; FilterName -> %% get list of view attributes ViewFields0 = [atom_to_list(F) || F <- record_info(fields, mrargs)], ViewFields = ["key" | ViewFields0], {Params} = get_value(query_params, Options, {[]}), [ {"filter", ?b2l(FilterName)} | lists:foldl( fun({K, V}, QSAcc) -> Ks = couch_util:to_list(K), case lists:keymember(Ks, 1, QSAcc) of true -> QSAcc; false when FilterName =:= <<"_view">> -> V1 = case lists:member(Ks, ViewFields) of true -> ?JSON_ENCODE(V); false -> couch_util:to_list(V) end, [{Ks, V1} | QSAcc]; false -> [{Ks, couch_util:to_list(V)} | QSAcc] end end, BaseQS, Params ) ] end. parse_changes_feed(Options, UserFun, DataStreamFun) -> case get_value(continuous, Options, false) of true -> continuous_changes(DataStreamFun, UserFun); false -> EventFun = fun(Ev) -> changes_ev1(Ev, fun(DocInfo, _) -> UserFun(DocInfo) end, []) end, json_stream_parse:events(DataStreamFun, EventFun) end. options_to_query_args(HttpDb, Path, Options0) -> case lists:keytake(max_url_len, 1, Options0) of false -> MaxLen = ?MAX_URL_LEN, Options = Options0; {value, {max_url_len, MaxLen}, Options} -> ok end, case lists:keytake(atts_since, 1, Options) of false -> options_to_query_args(Options, []); {value, {atts_since, []}, Options2} -> options_to_query_args(Options2, []); {value, {atts_since, PAs}, Options2} -> QueryArgs1 = options_to_query_args(Options2, []), FullUrl = couch_replicator_httpc:full_url( HttpDb, [{path, Path}, {qs, QueryArgs1}] ), RevList = atts_since_arg( length("GET " ++ FullUrl ++ " HTTP/1.1\r\n") + % +6 = % encoded [ and ] length("&atts_since=") + 6, PAs, MaxLen, [] ), [{"atts_since", ?b2l(iolist_to_binary(?JSON_ENCODE(RevList)))} | QueryArgs1] end. options_to_query_args([], Acc) -> lists:reverse(Acc); options_to_query_args([ejson_body | Rest], Acc) -> options_to_query_args(Rest, Acc); options_to_query_args([delay_commit | Rest], Acc) -> options_to_query_args(Rest, Acc); options_to_query_args([revs | Rest], Acc) -> options_to_query_args(Rest, [{"revs", "true"} | Acc]); options_to_query_args([{open_revs, all} | Rest], Acc) -> options_to_query_args(Rest, [{"open_revs", "all"} | Acc]); options_to_query_args([latest | Rest], Acc) -> options_to_query_args(Rest, [{"latest", "true"} | Acc]); options_to_query_args([{open_revs, Revs} | Rest], Acc) -> JsonRevs = ?b2l(iolist_to_binary(?JSON_ENCODE(couch_doc:revs_to_strs(Revs)))), options_to_query_args(Rest, [{"open_revs", JsonRevs} | Acc]); options_to_query_args([{attachments, Bool} | Rest], Acc) when is_atom(Bool) -> BoolStr = atom_to_list(Bool), options_to_query_args(Rest, [{"attachments", BoolStr} | Acc]). options_to_json_map([], #{} = Acc) -> Acc; options_to_json_map([latest | Rest], #{} = Acc) -> options_to_json_map(Rest, Acc#{<<"latest">> => true}); options_to_json_map([revs | Rest], #{} = Acc) -> options_to_json_map(Rest, Acc#{<<"revs">> => true}); options_to_json_map([{attachments, Bool} | Rest], #{} = Acc) when is_atom(Bool) -> options_to_json_map(Rest, Acc#{<<"attachments">> => Bool}). atts_since_arg(_UrlLen, [], _MaxLen, Acc) -> lists:reverse(Acc); atts_since_arg(UrlLen, [PA | Rest], MaxLen, Acc) -> RevStr = couch_doc:rev_to_str(PA), NewUrlLen = case Rest of [] -> % plus 2 double quotes (% encoded) UrlLen + size(RevStr) + 6; _ -> % plus 2 double quotes and a comma (% encoded) UrlLen + size(RevStr) + 9 end, case NewUrlLen >= MaxLen of true -> lists:reverse(Acc); false -> atts_since_arg(NewUrlLen, Rest, MaxLen, [RevStr | Acc]) end. % TODO: A less verbose, more elegant and automatic restart strategy for % the exported open_doc_revs/6 function. The restart should be % transparent to the caller like any other Couch API function exported % by this module. receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc) -> try % Left only for debugging purposes via an interactive or remote shell erlang:put(open_doc_revs, {Id, Revs, Ref, Streamer}), receive_docs(Streamer, Fun, Ref, Acc) catch error:{restart_open_doc_revs, NewRef} -> receive_docs_loop(Streamer, Fun, Id, Revs, NewRef, Acc) end. receive_docs(Streamer, UserFun, Ref, UserAcc) -> Streamer ! {get_headers, Ref, self()}, receive {started_open_doc_revs, NewRef} -> restart_remote_open_doc_revs(Ref, NewRef); {headers, Ref, Headers} -> case header_value("content-type", Headers) of {"multipart/related", _} = ContentType -> % Skip document body and attachment size limits validation here % since these should be validated by the replication target case couch_doc:doc_from_multi_part_stream( ContentType, fun() -> receive_doc_data(Streamer, Ref) end, Ref, _ValidateDocLimits = false ) of {ok, Doc, WaitFun, Parser} -> case run_user_fun(UserFun, {ok, Doc}, UserAcc, Ref) of {ok, UserAcc2} -> ok; {skip, UserAcc2} -> couch_httpd_multipart:abort_multipart_stream(Parser) end, WaitFun(), receive_docs(Streamer, UserFun, Ref, UserAcc2) end; {"application/json", []} -> Doc = couch_doc:from_json_obj( ?JSON_DECODE(receive_all(Streamer, Ref, [])) ), {_, UserAcc2} = run_user_fun(UserFun, {ok, Doc}, UserAcc, Ref), receive_docs(Streamer, UserFun, Ref, UserAcc2); {"application/json", [{"error", "true"}]} -> {ErrorProps} = ?JSON_DECODE(receive_all(Streamer, Ref, [])), Rev = get_value(<<"missing">>, ErrorProps), Result = {{not_found, missing}, couch_doc:parse_rev(Rev)}, {_, UserAcc2} = run_user_fun(UserFun, Result, UserAcc, Ref), receive_docs(Streamer, UserFun, Ref, UserAcc2) end; {done, Ref} -> {ok, UserAcc} end. run_user_fun(UserFun, Arg, UserAcc, OldRef) -> {Pid, Ref} = spawn_monitor(fun() -> try UserFun(Arg, UserAcc) of Resp -> exit({exit_ok, Resp}) catch throw:Reason -> exit({exit_throw, Reason}); error:Reason -> exit({exit_error, Reason}); exit:Reason -> exit({exit_exit, Reason}) end end), receive {started_open_doc_revs, NewRef} -> erlang:demonitor(Ref, [flush]), exit(Pid, kill), restart_remote_open_doc_revs(OldRef, NewRef); {'DOWN', Ref, process, Pid, {exit_ok, Ret}} -> Ret; {'DOWN', Ref, process, Pid, {exit_throw, Reason}} -> throw(Reason); {'DOWN', Ref, process, Pid, {exit_error, Reason}} -> erlang:error(Reason); {'DOWN', Ref, process, Pid, {exit_exit, Reason}} -> erlang:exit(Reason) end. restart_remote_open_doc_revs(Ref, NewRef) -> receive {body_bytes, Ref, _} -> restart_remote_open_doc_revs(Ref, NewRef); {body_done, Ref} -> restart_remote_open_doc_revs(Ref, NewRef); {done, Ref} -> restart_remote_open_doc_revs(Ref, NewRef); {headers, Ref, _} -> restart_remote_open_doc_revs(Ref, NewRef) after 0 -> erlang:error({restart_open_doc_revs, NewRef}) end. remote_open_doc_revs_streamer_start(Parent) -> receive {get_headers, _Ref, Parent} -> remote_open_doc_revs_streamer_start(Parent); {next_bytes, _Ref, Parent} -> remote_open_doc_revs_streamer_start(Parent) after 0 -> Parent ! {started_open_doc_revs, make_ref()} end. receive_all(Streamer, Ref, Acc) -> Streamer ! {next_bytes, Ref, self()}, receive {started_open_doc_revs, NewRef} -> restart_remote_open_doc_revs(Ref, NewRef); {body_bytes, Ref, Bytes} -> receive_all(Streamer, Ref, [Bytes | Acc]); {body_done, Ref} -> lists:reverse(Acc) end. mp_parse_mixed(eof) -> receive {get_headers, Ref, From} -> From ! {done, Ref} end; mp_parse_mixed({headers, H}) -> receive {get_headers, Ref, From} -> From ! {headers, Ref, H} end, fun mp_parse_mixed/1; mp_parse_mixed({body, Bytes}) -> receive {next_bytes, Ref, From} -> From ! {body_bytes, Ref, Bytes} end, fun mp_parse_mixed/1; mp_parse_mixed(body_end) -> receive {next_bytes, Ref, From} -> From ! {body_done, Ref}; {get_headers, Ref, From} -> self() ! {get_headers, Ref, From} end, fun mp_parse_mixed/1. receive_doc_data(Streamer, Ref) -> Streamer ! {next_bytes, Ref, self()}, receive {body_bytes, Ref, Bytes} -> {Bytes, fun() -> receive_doc_data(Streamer, Ref) end}; {body_done, Ref} -> {<<>>, fun() -> receive_doc_data(Streamer, Ref) end} end. changes_ev1(object_start, UserFun, UserAcc) -> fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end. changes_ev2({key, <<"results">>}, UserFun, UserAcc) -> fun(Ev) -> changes_ev3(Ev, UserFun, UserAcc) end; changes_ev2(_, UserFun, UserAcc) -> fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end. changes_ev3(array_start, UserFun, UserAcc) -> fun(Ev) -> changes_ev_loop(Ev, UserFun, UserAcc) end. changes_ev_loop(object_start, UserFun, UserAcc) -> fun(Ev) -> json_stream_parse:collect_object( Ev, fun(Obj) -> UserAcc2 = UserFun(json_to_doc_info(Obj), UserAcc), fun(Ev2) -> changes_ev_loop(Ev2, UserFun, UserAcc2) end end ) end; changes_ev_loop(array_end, _UserFun, _UserAcc) -> fun(_Ev) -> changes_ev_done() end. changes_ev_done() -> fun(_Ev) -> changes_ev_done() end. continuous_changes(DataFun, UserFun) -> {DataFun2, _, Rest} = json_stream_parse:events( DataFun, fun(Ev) -> parse_changes_line(Ev, UserFun) end ), continuous_changes(fun() -> {Rest, DataFun2} end, UserFun). parse_changes_line(object_start, UserFun) -> fun(Ev) -> json_stream_parse:collect_object( Ev, fun(Obj) -> UserFun(json_to_doc_info(Obj)) end ) end. json_to_doc_info({Props}) -> case get_value(<<"changes">>, Props) of undefined -> {last_seq, get_value(<<"last_seq">>, Props)}; Changes -> RevsInfo0 = lists:map( fun({Change}) -> Rev = couch_doc:parse_rev(get_value(<<"rev">>, Change)), Del = couch_replicator_utils:is_deleted(Change), #rev_info{rev = Rev, deleted = Del} end, Changes ), RevsInfo = case get_value(<<"removed">>, Props) of true -> [_ | RevsInfo1] = RevsInfo0, RevsInfo1; _ -> RevsInfo0 end, #doc_info{ id = get_value(<<"id">>, Props), high_seq = get_value(<<"seq">>, Props), revs = RevsInfo } end. bulk_results_to_errors(Results) -> lists:reverse( lists:foldl( fun({Props}, Acc) -> case get_value(<<"error">>, Props, get_value(error, Props)) of undefined -> Acc; Error -> Id = get_value(<<"id">>, Props, get_value(id, Props)), Rev = get_value(<<"rev">>, Props, get_value(rev, Props)), Reason = get_value(<<"reason">>, Props, get_value(reason, Props)), [ {[ {id, Id}, {rev, rev_to_str(Rev)}, {error, Error}, {reason, Reason} ]} | Acc ] end end, [], Results ) ). rev_to_str({_Pos, _Id} = Rev) -> couch_doc:rev_to_str(Rev); rev_to_str(Rev) -> Rev. write_fun() -> fun(Data) -> receive {get_data, Ref, From} -> From ! {data, Ref, Data} end end. stream_doc({JsonBytes, Atts, Boundary, Len}) -> case erlang:erase({doc_streamer, Boundary}) of Pid when is_pid(Pid) -> unlink(Pid), exit(Pid, kill); _ -> ok end, DocStreamer = spawn_link( couch_doc, doc_to_multi_part_stream, [Boundary, JsonBytes, Atts, write_fun(), true] ), erlang:put({doc_streamer, Boundary}, DocStreamer), {ok, <<>>, {Len, Boundary}}; stream_doc({0, Id}) -> erlang:erase({doc_streamer, Id}), eof; stream_doc({LenLeft, Id}) when LenLeft > 0 -> Ref = make_ref(), erlang:get({doc_streamer, Id}) ! {get_data, Ref, self()}, receive {data, Ref, Data} -> {ok, Data, {LenLeft - iolist_size(Data), Id}} end. header_value(Key, Headers) -> header_value(Key, Headers, undefined). header_value(Key, Headers, Default) -> Headers1 = [{string:to_lower(K), V} || {K, V} <- Headers], case lists:keyfind(string:to_lower(Key), 1, Headers1) of {_, Value} -> Value; _ -> Default end. % Normalize an #httpdb{} or #db{} record such that it can be used for % comparisons. This means remove things like pids and also sort options / props. normalize_db(#httpdb{} = HttpDb) -> #httpdb{ url = HttpDb#httpdb.url, auth_props = lists:sort(HttpDb#httpdb.auth_props), headers = lists:keysort(1, HttpDb#httpdb.headers), timeout = HttpDb#httpdb.timeout, ibrowse_options = lists:keysort(1, HttpDb#httpdb.ibrowse_options), retries = HttpDb#httpdb.retries, http_connections = HttpDb#httpdb.http_connections }; normalize_db(<>) -> DbName. maybe_append_create_query_params(Db, []) -> Db; maybe_append_create_query_params(Db, CreateParams) -> NewUrl = Db#httpdb.url ++ "?" ++ mochiweb_util:urlencode(CreateParams), Db#httpdb{url = NewUrl}. -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). normalize_http_db_test() -> HttpDb = #httpdb{ url = "http://host/db", auth_props = [{"key", "val"}], headers = [{"k2", "v2"}, {"k1", "v1"}], timeout = 30000, ibrowse_options = [{k2, v2}, {k1, v1}], retries = 10, http_connections = 20 }, Expected = HttpDb#httpdb{ headers = [{"k1", "v1"}, {"k2", "v2"}], ibrowse_options = [{k1, v1}, {k2, v2}] }, ?assertEqual(Expected, normalize_db(HttpDb)), ?assertEqual(<<"local">>, normalize_db(<<"local">>)). -endif.