Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Couchdb 3288 mixed cluster upgrade #495

Merged
merged 9 commits into from
Sep 27, 2017
Prev Previous commit
Next Next commit
Update couch_server to not use the db record
This removes introspection of the #db record by couch_server. While its
required for the pluggable storage engine upgrade, its also nice to
remove the hacky overloading of #db record fields for couch_server
logic.

COUCHDB-3288
  • Loading branch information
davisp committed Sep 27, 2017
commit 81705fbbfe506ae900983d30ee5ce545259f3e0a
17 changes: 17 additions & 0 deletions src/couch/src/couch_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
create/2,
open/2,
open_int/2,
incref/1,
reopen/1,
close/1,

Expand All @@ -34,7 +35,9 @@
get_db_info/1,
get_doc_count/1,
get_epochs/1,
get_instance_start_time/1,
get_last_purged/1,
get_pid/1,
get_revs_limit/1,
get_security/1,
get_update_seq/1,
Expand All @@ -46,6 +49,7 @@
increment_update_seq/1,
set_revs_limit/2,
set_security/2,
set_user_ctx/2,

ensure_full_commit/1,
ensure_full_commit/2,
Expand Down Expand Up @@ -181,6 +185,10 @@ reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) ->
{ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}}
end.

incref(#db{fd = Fd} = Db) ->
Ref = erlang:monitor(process, Fd),
{ok, Db#db{fd_monitor = Ref}}.

is_system_db(#db{options = Options}) ->
lists:member(sys_db, Options).

Expand Down Expand Up @@ -381,6 +389,9 @@ get_last_purged(#db{}=Db) ->
couch_file:pread_term(Db#db.fd, Pointer)
end.

get_pid(#db{main_pid = Pid}) ->
Pid.

get_doc_count(Db) ->
{ok, {Count, _, _}} = couch_btree:full_reduce(Db#db.id_tree),
{ok, Count}.
Expand All @@ -393,6 +404,9 @@ get_epochs(#db{}=Db) ->
validate_epochs(Epochs),
Epochs.

get_instance_start_time(#db{instance_start_time = IST}) ->
IST.

get_compacted_seq(#db{}=Db) ->
couch_db_header:compacted_seq(Db#db.header).

Expand Down Expand Up @@ -585,6 +599,9 @@ set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) ->
set_security(_, _) ->
throw(bad_request).

set_user_ctx(#db{} = Db, UserCtx) ->
{ok, Db#db{user_ctx = UserCtx}}.

validate_security_object(SecProps) ->
Admins = couch_util:get_value(<<"admins">>, SecProps, {[]}),
% we fallback to readers here for backwards compatibility
Expand Down
9 changes: 5 additions & 4 deletions src/couch/src/couch_lru.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
-module(couch_lru).
-export([new/0, insert/2, update/2, close/1]).

-include_lib("couch/include/couch_db.hrl").
-include("couch_server_int.hrl").

new() ->
{gb_trees:empty(), dict:new()}.
Expand Down Expand Up @@ -43,16 +43,17 @@ close({Tree, _} = Cache) ->
close_int(none, _) ->
false;
close_int({Lru, DbName, Iter}, {Tree, Dict} = Cache) ->
case ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}) of
case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of
true ->
[#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName),
[#entry{db = Db, pid = Pid}] = ets:lookup(couch_dbs, DbName),
case couch_db:is_idle(Db) of true ->
true = ets:delete(couch_dbs, DbName),
true = ets:delete(couch_dbs_pid_to_name, Pid),
exit(Pid, kill),
{true, {gb_trees:delete(Lru, Tree), dict:erase(DbName, Dict)}};
false ->
true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}),
ElemSpec = {#entry.lock, unlocked},
true = ets:update_element(couch_dbs, DbName, ElemSpec),
couch_stats:increment_counter([couchdb, couch_server, lru_skip]),
close_int(gb_trees:next(Iter), update(DbName, Cache))
end;
Expand Down
135 changes: 70 additions & 65 deletions src/couch/src/couch_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
-export([handle_config_change/5, handle_config_terminate/3]).

-include_lib("couch/include/couch_db.hrl").
-include("couch_server_int.hrl").

-define(MAX_DBS_OPEN, 500).
-define(RELISTEN_DELAY, 5000).
Expand Down Expand Up @@ -74,16 +75,18 @@ sup_start_link() ->
open(DbName, Options0) ->
Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}),
case ets:lookup(couch_dbs, DbName) of
[#db{fd=Fd, fd_monitor=Lock, options=Options} = Db] when Lock =/= locked ->
update_lru(DbName, Options),
{ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}};
[#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked ->
update_lru(DbName, Entry#entry.db_options),
{ok, Db1} = couch_db:incref(Db0),
couch_db:set_user_ctx(Db1, Ctx);
_ ->
Options = maybe_add_sys_db_callbacks(DbName, Options0),
Timeout = couch_util:get_value(timeout, Options, infinity),
Create = couch_util:get_value(create_if_missing, Options, false),
case gen_server:call(couch_server, {open, DbName, Options}, Timeout) of
{ok, #db{fd=Fd} = Db} ->
{ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}};
{ok, Db0} ->
{ok, Db1} = couch_db:incref(Db0),
couch_db:set_user_ctx(Db1, Ctx);
{not_found, no_db_file} when Create ->
couch_log:warning("creating missing database: ~s", [DbName]),
couch_server:create(DbName, Options);
Expand All @@ -104,9 +107,10 @@ close_lru() ->
create(DbName, Options0) ->
Options = maybe_add_sys_db_callbacks(DbName, Options0),
case gen_server:call(couch_server, {create, DbName, Options}, infinity) of
{ok, #db{fd=Fd} = Db} ->
{ok, Db0} ->
Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
{ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}};
{ok, Db1} = couch_db:incref(Db0),
couch_db:set_user_ctx(Db1, Ctx);
Error ->
Error
end.
Expand Down Expand Up @@ -176,9 +180,9 @@ hash_admin_passwords(Persist) ->

close_db_if_idle(DbName) ->
case ets:lookup(couch_dbs, DbName) of
[#db{}] ->
[#entry{}] ->
gen_server:cast(couch_server, {close_db_if_idle, DbName});
_ ->
[] ->
ok
end.

Expand All @@ -197,7 +201,7 @@ init([]) ->
ok = config:listen_for_changes(?MODULE, nil),
ok = couch_file:init_delete_dir(RootDir),
hash_admin_passwords(),
ets:new(couch_dbs, [set, protected, named_table, {keypos, #db.name}]),
ets:new(couch_dbs, [set, protected, named_table, {keypos, #entry.name}]),
ets:new(couch_dbs_pid_to_name, [set, protected, named_table]),
process_flag(trap_exit, true),
{ok, #server{root_dir=RootDir,
Expand All @@ -209,8 +213,9 @@ terminate(Reason, Srv) ->
couch_log:error("couch_server terminating with ~p, state ~2048p",
[Reason,
Srv#server{lru = redacted}]),
ets:foldl(fun(#db{main_pid=Pid}, _) -> couch_util:shutdown_sync(Pid) end,
nil, couch_dbs),
ets:foldl(fun(#entry{db = Db}, _) ->
couch_util:shutdown_sync(couch_db:get_pid(Db))
end, nil, couch_dbs),
ok.

handle_config_change("couchdb", "database_dir", _, _, _) ->
Expand Down Expand Up @@ -316,15 +321,13 @@ open_async(Server, From, DbName, Filepath, Options) ->
true -> create;
false -> open
end,
% icky hack of field values - compactor_pid used to store clients
% and fd used for opening request info
true = ets:insert(couch_dbs, #db{
true = ets:insert(couch_dbs, #entry{
name = DbName,
fd = ReqType,
main_pid = Opener,
compactor_pid = [From],
fd_monitor = locked,
options = Options
pid = Opener,
lock = locked,
waiters = [From],
req_type = ReqType,
db_options = Options
}),
true = ets:insert(couch_dbs_pid_to_name, {Opener, DbName}),
db_opened(Server, Options).
Expand All @@ -348,25 +351,31 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) ->
true = ets:delete(couch_dbs_pid_to_name, FromPid),
OpenTime = timer:now_diff(os:timestamp(), T0) / 1000,
couch_stats:update_histogram([couchdb, db_open_time], OpenTime),
% icky hack of field values - compactor_pid used to store clients
% and fd used to possibly store a creation request
DbPid = couch_db:get_pid(Db),
case ets:lookup(couch_dbs, DbName) of
[] ->
% db was deleted during async open
exit(Db#db.main_pid, kill),
exit(DbPid, kill),
{reply, ok, Server};
[#db{fd=ReqType, compactor_pid=Froms}] ->
link(Db#db.main_pid),
[gen_server:reply(From, {ok, Db}) || From <- Froms],
[#entry{req_type = ReqType, waiters = Waiters} = Entry] ->
link(DbPid),
[gen_server:reply(Waiter, {ok, Db}) || Waiter <- Waiters],
% Cancel the creation request if it exists.
case ReqType of
{create, DbName, _Filepath, _Options, CrFrom} ->
gen_server:reply(CrFrom, file_exists);
_ ->
ok
end,
true = ets:insert(couch_dbs, Db),
true = ets:insert(couch_dbs_pid_to_name, {Db#db.main_pid, DbName}),
true = ets:insert(couch_dbs, #entry{
name = DbName,
db = Db,
pid = DbPid,
lock = unlocked,
db_options = Entry#entry.db_options,
start_time = couch_db:get_instance_start_time(Db)
}),
true = ets:insert(couch_dbs_pid_to_name, {DbPid, DbName}),
Lru = case couch_db:is_system_db(Db) of
false ->
couch_lru:insert(DbName, Server#server.lru);
Expand All @@ -378,13 +387,12 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) ->
handle_call({open_result, T0, DbName, {error, eexist}}, From, Server) ->
handle_call({open_result, T0, DbName, file_exists}, From, Server);
handle_call({open_result, _T0, DbName, Error}, {FromPid, _Tag}, Server) ->
% icky hack of field values - compactor_pid used to store clients
case ets:lookup(couch_dbs, DbName) of
[] ->
% db was deleted during async open
{reply, ok, Server};
[#db{fd=ReqType, compactor_pid=Froms}=Db] ->
[gen_server:reply(From, Error) || From <- Froms],
[#entry{req_type = ReqType, waiters = Waiters} = Entry] ->
[gen_server:reply(Waiter, Error) || Waiter <- Waiters],
couch_log:info("open_result error ~p for ~s", [Error, DbName]),
true = ets:delete(couch_dbs, DbName),
true = ets:delete(couch_dbs_pid_to_name, FromPid),
Expand All @@ -394,7 +402,7 @@ handle_call({open_result, _T0, DbName, Error}, {FromPid, _Tag}, Server) ->
_ ->
Server
end,
{reply, ok, db_closed(NewServer, Db#db.options)}
{reply, ok, db_closed(NewServer, Entry#entry.db_options)}
end;
handle_call({open, DbName, Options}, From, Server) ->
case ets:lookup(couch_dbs, DbName) of
Expand All @@ -412,15 +420,14 @@ handle_call({open, DbName, Options}, From, Server) ->
Error ->
{reply, Error, Server}
end;
[#db{compactor_pid = Froms} = Db] when is_list(Froms) ->
% icky hack of field values - compactor_pid used to store clients
true = ets:insert(couch_dbs, Db#db{compactor_pid = [From|Froms]}),
if length(Froms) =< 10 -> ok; true ->
[#entry{waiters = Waiters} = Entry] when is_list(Waiters) ->
true = ets:insert(couch_dbs, Entry#entry{waiters = [From | Waiters]}),
if length(Waiters) =< 10 -> ok; true ->
Fmt = "~b clients waiting to open db ~s",
couch_log:info(Fmt, [length(Froms), DbName])
couch_log:info(Fmt, [length(Waiters), DbName])
end,
{noreply, Server};
[#db{} = Db] ->
[#entry{db = Db}] ->
{reply, {ok, Db}, Server}
end;
handle_call({create, DbName, Options}, From, Server) ->
Expand All @@ -437,14 +444,13 @@ handle_call({create, DbName, Options}, From, Server) ->
CloseError ->
{reply, CloseError, Server}
end;
[#db{fd=open}=Db] ->
[#entry{req_type = open} = Entry] ->
% We're trying to create a database while someone is in
% the middle of trying to open it. We allow one creator
% to wait while we figure out if it'll succeed.
% icky hack of field values - fd used to store create request
CrOptions = [create | Options],
NewDb = Db#db{fd={create, DbName, Filepath, CrOptions, From}},
true = ets:insert(couch_dbs, NewDb),
Req = {create, DbName, Filepath, CrOptions, From},
true = ets:insert(couch_dbs, Entry#entry{req_type = Req}),
{noreply, Server};
[_AlreadyRunningDb] ->
{reply, file_exists, Server}
Expand All @@ -460,18 +466,17 @@ handle_call({delete, DbName, Options}, _From, Server) ->
Server2 =
case ets:lookup(couch_dbs, DbName) of
[] -> Server;
[#db{main_pid=Pid, compactor_pid=Froms} = Db] when is_list(Froms) ->
% icky hack of field values - compactor_pid used to store clients
[#entry{pid = Pid, waiters = Waiters} = Entry] when is_list(Waiters) ->
true = ets:delete(couch_dbs, DbName),
true = ets:delete(couch_dbs_pid_to_name, Pid),
exit(Pid, kill),
[gen_server:reply(F, not_found) || F <- Froms],
db_closed(Server, Db#db.options);
[#db{main_pid=Pid} = Db] ->
[gen_server:reply(Waiter, not_found) || Waiter <- Waiters],
db_closed(Server, Entry#entry.db_options);
[#entry{pid = Pid} = Entry] ->
true = ets:delete(couch_dbs, DbName),
true = ets:delete(couch_dbs_pid_to_name, Pid),
exit(Pid, kill),
db_closed(Server, Db#db.options)
db_closed(Server, Entry#entry.db_options)
end,

%% Delete any leftover compaction files. If we don't do this a
Expand All @@ -497,11 +502,12 @@ handle_call({delete, DbName, Options}, _From, Server) ->
Error ->
{reply, Error, Server}
end;
handle_call({db_updated, #db{}=Db}, _From, Server0) ->
#db{name = DbName, instance_start_time = StartTime} = Db,
Server = try ets:lookup_element(couch_dbs, DbName, #db.instance_start_time) of
handle_call({db_updated, Db}, _From, Server0) ->
DbName = couch_db:name(Db),
StartTime = couch_db:get_instance_start_time(Db),
Server = try ets:lookup_element(couch_dbs, DbName, #entry.start_time) of
StartTime ->
true = ets:insert(couch_dbs, Db),
true = ets:update_element(couch_dbs, DbName, {#entry.db, Db}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ets:update_element will return false if it could not find the entry and it would crash on a bad match, while previously ets:insert always returned true and inserted the entry in the table (even if it wasn't in the table already).

Is there any change the entry could be deleted between the call to ets:lookup and ets:update_element?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. We just asserted it was in the table when we extracted StartTime and this is the only process that modifies the table so it should be there. Or, rather, if its not there then we should crash hard cause ets has lost its mind.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Makes sense

Lru = case couch_db:is_system_db(Db) of
false -> couch_lru:update(DbName, Server0#server.lru);
true -> Server0#server.lru
Expand All @@ -519,17 +525,19 @@ handle_cast({update_lru, DbName}, #server{lru = Lru, update_lru_on_read=true} =
handle_cast({update_lru, _DbName}, Server) ->
{noreply, Server};
handle_cast({close_db_if_idle, DbName}, Server) ->
case ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}) of
case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of
true ->
[#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName),
[#entry{db = Db, db_options = DbOpts}] = ets:lookup(couch_dbs, DbName),
case couch_db:is_idle(Db) of
true ->
DbPid = couch_db:get_pid(Db),
true = ets:delete(couch_dbs, DbName),
true = ets:delete(couch_dbs_pid_to_name, Pid),
exit(Pid, kill),
{noreply, db_closed(Server, Db#db.options)};
true = ets:delete(couch_dbs_pid_to_name, DbPid),
exit(DbPid, kill),
{noreply, db_closed(Server, DbOpts)};
false ->
true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}),
true = ets:update_element(
couch_dbs, DbName, {#entry.lock, unlocked}),
{noreply, Server}
end;
false ->
Expand All @@ -547,22 +555,19 @@ handle_info({'EXIT', _Pid, config_change}, Server) ->
handle_info({'EXIT', Pid, Reason}, Server) ->
case ets:lookup(couch_dbs_pid_to_name, Pid) of
[{Pid, DbName}] ->
[#db{compactor_pid=Froms}=Db] = ets:lookup(couch_dbs, DbName),
[#entry{waiters = Waiters} = Entry] = ets:lookup(couch_dbs, DbName),
if Reason /= snappy_nif_not_loaded -> ok; true ->
Msg = io_lib:format("To open the database `~s`, Apache CouchDB "
"must be built with Erlang OTP R13B04 or higher.", [DbName]),
couch_log:error(Msg, [])
end,
couch_log:info("db ~s died with reason ~p", [DbName, Reason]),
% icky hack of field values - compactor_pid used to store clients
if is_list(Froms) ->
[gen_server:reply(From, Reason) || From <- Froms];
true ->
ok
if not is_list(Waiters) -> ok; true ->
[gen_server:reply(Waiter, Reason) || Waiter <- Waiters]
end,
true = ets:delete(couch_dbs, DbName),
true = ets:delete(couch_dbs_pid_to_name, Pid),
{noreply, db_closed(Server, Db#db.options)};
{noreply, db_closed(Server, Entry#entry.db_options)};
[] ->
{noreply, Server}
end;
Expand Down
Loading