-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fabric stream coordinators should exit if idle for a long period #4729
Conversation
1a372be
to
b2c707a
Compare
b2c707a
to
ed72935
Compare
src/rexi/src/rexi_server.erl
Outdated
@@ -36,6 +37,7 @@ | |||
-record(st, { | |||
workers = ets:new(workers, [private, {keypos, #job.worker}]), | |||
clients = ets:new(clients, [private, {keypos, #job.client}]), | |||
client_mon_refs = ets:new(foo, [private, {keypos, #job.client_mon_ref}]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to use the atom „foo“? Maybe there is a better name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heh, it doesn't really matter as it's not a named table but I'll change it
cf809b6
to
0c12522
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not always want workers to exit when the client process exits. It's a deliberate design choice for instance in fabric_doc_update
:
couchdb/src/fabric/src/fabric_doc_update.erl
Lines 40 to 44 in e60e275
try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of | |
{ok, {Health, Results}} when | |
Health =:= ok; Health =:= accepted; Health =:= error | |
-> | |
ensure_all_responses(Health, AllDocs, Results); |
Remote monitors may have a performance impact, they take extra work and maintenance on both sides of the dist channel so it might be interesting to see if it's anything significant.
There is already an existing worker cleanup mechanism for streaming workers in fabric_streams that doesn't create extra remote monitors, can that be patched or extended instead of having a secondary mechanism to do largely the same task?
The trigger for the cleanup is that the client process dies. If a client sent a request, then is waiting for a response, then it just drops the connection, would the chttpd request process find out about it and exit?
good point on I confirmed directly and via haproxy that a client that disconnects (curl and CTRL-C in the first case and simply doing something that hits haproxy's server timeout like |
Cleanup happens without this patch already from what I've observed. Here is how I tested on main: diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index e2de301b2..6c2beba83 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -127,6 +127,8 @@ handle_changes_req1(#httpd{} = Req, Db) ->
db_open_options = [{user_ctx, couch_db:get_user_ctx(Db)}]
},
Max = chttpd:chunked_response_buffer_size(),
+ couch_log:error(" +++TRACING _changes++++ ~p:~p@~B REQPID:~p", [?MODULE, ?FUNCTION_NAME, ?LINE, self()]),
+ dbg:tracer(), dbg:p(self(), p),
case ChangesArgs#changes_args.feed of
"normal" ->
T0 = os:timestamp(),
diff --git a/src/fabric/src/fabric_db_update_listener.erl b/src/fabric/src/fabric_db_update_listener.erl
index 78ccf5a4d..fb508294a 100644
--- a/src/fabric/src/fabric_db_update_listener.erl
+++ b/src/fabric/src/fabric_db_update_listener.erl
@@ -37,6 +37,8 @@
}).
go(Parent, ParentRef, DbName, Timeout) ->
+ couch_log:error(" +++TRACING UPDATE NOTIFIER+++ ~p:~p@~B ~p Parent:~p", [?MODULE, ?FUNCTION_NAME, ?LINE, DbName, Parent]),
+ dbg:tracer(), dbg:p(self(), p),
Shards = mem3:shards(DbName),
Notifiers = start_update_notifiers(Shards),
MonRefs = lists:usort([rexi_utils:server_pid(N) || #worker{node = N} <- Notifiers]),
@@ -82,6 +84,8 @@ start_update_notifiers(Shards) ->
% rexi endpoint
start_update_notifier(DbNames) ->
+ couch_log:error(" +++TRACING UPDATE NOTIFIER WORKER+++ ~p:~p@~B~p", [?MODULE, ?FUNCTION_NAME, ?LINE, DbNames]),
+ dbg:tracer(), dbg:p(self(), p),
{Caller, Ref} = get(rexi_from),
Notify = config:get("couchdb", "maintenance_mode", "false") /= "true",
State = #cb_state{client_pid = Caller, client_ref = Ref, notify = Notify},
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index fa6ea5116..64fdbf4b5 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -69,6 +69,8 @@ changes(DbName, Args, StartSeq) ->
changes(DbName, #changes_args{} = Args, StartSeq, DbOptions) ->
changes(DbName, [Args], StartSeq, DbOptions);
changes(DbName, Options, StartVector, DbOptions) ->
+ couch_log:error(" ++TRACING CHANGES WORKER+++++ ~p:~p@~B~p", [?MODULE, ?FUNCTION_NAME, ?LINE, DbName]),
+ dbg:tracer(), dbg:p(self(), p),
set_io_priority(DbName, DbOptions),
Args0 = lists:keyfind(changes_args, 1, Options),
#changes_args{dir = Dir, filter_fun = Filter} = Args0, That's setting up process event traces (spawn, exit, etc) on the HTTP request process, worker process, db updater process and db update worker process. The goal is that these should be cleaned as soon as we attempt any write to the closed socket. (
Since |
@nickva 😱 interesting! What we've observed (a huge rise in interactive ioq requests after a series of I can confirm what you're seeing, and I have also done so for an unindexed So I guess we're looking for a gap in the |
I've built a large local database, 6 million docs, no artificial sleeps, and with dbg:tracer calls in fabric_rpc:all_docs. I have haproxy with a 10s server timeout. I've observed;
|
took me too long to realise, but I think the difference is the attempt to send data (between _changes and _find). that's the only time we notice the socket closed, and thus the processes exit (and all the cleanup you pointed out then happens). The mochiweb http process does stay alive after the sH-- event. If I kill it (from remsh) then my cleanup kicks in and kills the orphaned workers. without my change the fabric_streams cleanup kills them. |
Yup that's the key. We have to read, write or find some other way to periodically check the socket state and then terminate the connection process. The stream cleanup mechanism as is will only work as long as the connection process dies. We looked at this before and there was no simple way to detect it out-of-band in Erlang. We could try to emit empty line heartbeats in If there was a socket recv with options function in Erlang, we might have been able to periodically try to |
In principle at least the experimental
But we can't use socket for non-socket connections and it might be a while if/until mochiweb starts using that module. Especially since in some cases |
I've tried tweaking mochiweb to enable |
hm, not seeing a way to change count/idle/interval, so I bet the macos defaults are quite high. |
On localhost So keepalive is useful as a step 0 - to allow the socket at the kernel level to become closed. However that is not enough for us, what is missing is at the HTTP protocol stage in mochiweb we do not have a reason to do any socket operations (and we can't do any extra ones outside of mochweb in a sneaky way so far it seems, which is what I was getting at above). We're forced to wait for the The only solutions I see so far is to either modify mango to periodically handle a timeout and push a |
I'm not entirely giving up on tcp keepalive (it ought to kill the process/port for the socket, which I think will kill the mochiweb process, but subject to testing) However, I'm beginning to think what we need is application level. There's clearly a modest, finite number for how long we're prepared to wait for a response to start, which Cloudant, for example, already expresses in haproxy |
It doesn't hurt to try but I don't see how that could plausibly work. There is no active socket monitoring or polling in mochiweb. If we were in active mode or were in C and used a select/poll/epoll mechanism we'd probably find out about it. |
The only ways I see as viable are:
|
0c12522
to
8343beb
Compare
changed approach, though I don't think this is the final answer either. this diff adds a deadline by which a delayed response must start or be terminated. This helps when using It's not broad enough but the previous effort with monitors was pointless (nick is correct that fabric_streams cleans up if the parent process exits) |
I managed to find a way to seemingly detect the connection state change after the local client (curl) disconnects by querying the equivalent of
Ctrl+C...
Looks like Linux has a similar facility so there is some hope, though it does look very ugly |
8343beb
to
8ea452b
Compare
updated again. I've expanded fabric_streams existing 'cleanup' process to be a watchdog (in the classic sense). before starting a fabric_stream you must call by default this is not active, so all calls to fabric:all_docs etc are unaffected. in mango_httpd I enable the watchdog and only when adding a row to the response do I kick the watchdog. If we go a long time between rows, the watchdog fires. similar enable/kick can be done on other endpoints. |
d8c1acd
to
7859ae6
Compare
This is nice. If we can cleanly encapsulate it in mochiweb then it might be a viable option. I am planing to look into mochiweb code to see if it is doable. |
For Linux it might be |
I came to the same conclusion as Nick. It is quite a hack especially given the fact that it only would work on Linux. I also looked into using undocumented function unrecv. Which can be called in a quick succession after a recv.
still_connected(Socket) ->
// read one byte with 10ms timeout
case recv(Socket, 1, 10) of
{error, timeout} ->
true;
{ok, Data} ->
gen_tcp:unrecv(Socket, Data),
true;
{error, closed} ->
false
end. We could use The conclusion is it is not easy. |
Interesting find @iilyak! Yeah, one worry about the |
A bit more of a cleaned up connection script version which should work on macos, linux and freebsd: #!/usr/bin/env escript
is_closed(Sock) ->
OsType = os:type(),
case tcp_info_opt(OsType) of
{raw, _, _, _} = TcpInfoOpt ->
case inet:getopts(Sock, [TcpInfoOpt]) of
{ok, [{raw, _, _, <<State:8/native, _/binary>>}]} ->
tcp_fsm_state(State, OsType);
{ok, []} ->
false;
{error, einval} ->
% Already cleaned up
true;
{error, _} ->
false
end;
undefined ->
false
end.
% All OS-es have the tcpi_state (uint8) as first member of tcp_info struct
tcp_info_opt({unix, linux}) ->
%% include/netinet/in.h
%% IPPROTO_TCP = 6
%%
%% include/netinet/tcp.h
%% #define TCP_INFO 11
%%
{raw, 6, 11, 1};
tcp_info_opt({unix, darwin}) ->
%% include/netinet/in.h
%% #define IPPROTO_TCP 6
%%
%% netinet/tcp.h
%% #define TCP_CONNECTION_INFO 0x106
%%
{raw, 6, 16#106, 1};
tcp_info_opt({unix, freebsd}) ->
%% sys/netinet/in.h
%% #define IPPROTO_TCP 6 /* tcp */
%%
%% sys/netinet/tcp.h
%% #define TCP_INFO 32 /* retrieve tcp_info structure */
%%
{raw, 6, 32, 1};
tcp_info_opt({_, _}) ->
undefined.
tcp_fsm_state(State, {unix, linux}) ->
%% netinet/tcp.h
%% enum
%% {
%% TCP_ESTABLISHED = 1,
%% TCP_SYN_SENT,
%% TCP_SYN_RECV,
%% TCP_FIN_WAIT1,
%% TCP_FIN_WAIT2,
%% TCP_TIME_WAIT,
%% TCP_CLOSE,
%% TCP_CLOSE_WAIT,
%% TCP_LAST_ACK,
%% TCP_LISTEN,
%% TCP_CLOSING
%% }
%%
lists:member(State, [4, 5, 6, 7, 8, 9, 10]);
tcp_fsm_state(State, {unix, darwin}) ->
%% netinet/tcp_fsm.h
%% #define TCPS_CLOSED 0 /* closed */
%% #define TCPS_LISTEN 1 /* listening for connection */
%% #define TCPS_SYN_SENT 2 /* active, have sent syn */
%% #define TCPS_SYN_RECEIVED 3 /* have send and received syn */
%% #define TCPS_ESTABLISHED 4 /* established */
%% #define TCPS_CLOSE_WAIT 5 /* rcvd fin, waiting for close */
%% #define TCPS_FIN_WAIT_1 6 /* have closed, sent fin */
%% #define TCPS_CLOSING 7 /* closed xchd FIN; await FIN ACK */
%% #define TCPS_LAST_ACK 8 /* had fin and close; await FIN ACK */
%% #define TCPS_FIN_WAIT_2 9 /* have closed, fin is acked */
%% #define TCPS_TIME_WAIT 10 /* in 2*msl quiet wait after close */
%%
lists:member(State, [0, 5, 6, 7, 8, 9, 10]);
tcp_fsm_state(State, {unix, freebsd}) ->
%% netinet/tcp_fsm.h
%% #define TCPS_CLOSED 0 /* closed */
%% #define TCPS_LISTEN 1 /* listening for connection */
%% #define TCPS_SYN_SENT 2 /* active, have sent syn */
%% #define TCPS_SYN_RECEIVED 3 /* have sent and received syn */
%% #define TCPS_ESTABLISHED 4 /* established */
%% #define TCPS_CLOSE_WAIT 5 /* rcvd fin, waiting for close */
%% #define TCPS_FIN_WAIT_1 6 /* have closed, sent fin */
%% #define TCPS_CLOSING 7/* closed xchd FIN; await FIN ACK */
%% #define TCPS_LAST_ACK 8/* had fin and close; await FIN ACK */
%% #define TCPS_FIN_WAIT_2 9/* have closed, fin is acked */
%% #define TCPS_TIME_WAIT 10 /* in 2*msl quiet wait after close */
%%
lists:member(State, [0, 5, 6, 7, 8, 9, 10]).
monitor_loop(Sock) ->
timer:sleep(1000),
io:format("[mon] is_closed: ~p~n", [is_closed(Sock)]),
monitor_loop(Sock).
client(Port) ->
timer:sleep(1000),
io:format("[client] connecting on ~p~n", [Port]),
{ok, ClientSock} = gen_tcp:connect("127.0.0.1", Port, [{active, false}, binary]),
io:format("[client] connected ~p sleeping for 5 seconds and exiting~n", [ClientSock]),
timer:sleep(5000),
io:format("[client] exiting ~n", []),
ok.
main(_) ->
{ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]),
{ok, Port} = inet:port(ListenSocket),
io:format("[srv] spawning client to connect to port ~p~n", [Port]),
spawn_link(fun() -> client(Port) end),
{ok, ClientSocket} = gen_tcp:accept(ListenSocket),
io:format("[srv] accepted a connection ~p, sleeping for 10 seconds then exiting ~n", [ClientSocket]),
spawn_link(fun() -> monitor_loop(ClientSocket) end),
timer:sleep(10000).
|
src/fabric/src/fabric_streams.erl
Outdated
"watchdog ~p detected idle interval, killing ~p", | ||
[self(), St#watchdog_state.coordinator] | ||
), | ||
exit(St#watchdog_state.coordinator, kill), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor nit: wonder if there is a useful difference between kill
and other exits (say shutdown
) as we do some handler try...catch can logging/cleanup in chttpd an others.
That's a decent solution. It would detect inactivity on the application side (workers are slow or no index defined or if anything else gets stuck and we don't emit rows). The timeout might be a bit low as there might be existing (even if perhaps pathological) queries which could emit rows every 61 seconds, or if we raise to 5 minutes, there might still be a some cases which we might still break. It's sort of a balancing act as if we make it too large, it might reduce the effect of this watchdog (requests might pile up). I still think it might be better to be able to detect if the client socket has closed so I kept pressing with the tcp_info options check to see how many OS-es we it can cover and to make it a bit less ugly. So far I couldn't figure how to get windows to do it but got all the others I think. Here we can be more bold with a polling interval even at 5 or 10 seconds if needed as we can confidently clean if the client disconnects. Another direction to take it is to emit newline (heartbeats) between json rows. I couldn't quite see where to do that without disturbing the views and all_docs callback mechanism too much. Maybe have a last timestamp on the cursor in couchdb/src/mango/src/mango_cursor_view.erl Lines 487 to 491 in e60e275
timeout and then in the mango_http we'd emit a \n ? That would maintain the connection alive and quit it as soon as a send failure happens? I am not too familiar with mango so that may be completely off, too.
|
I would also prefer to detect the client disconnect directly but I am less keen to see the raw packet inspection stuff in couchdb in general, unless we can be confident that it causes no harm on any platform we haven't handled. That it wouldn't work on Windows isn't a deal breaker for me, we'd need to mention that in the release notes, and perhaps someone more familiar with Windows could add support. As for heartbeats in the |
Maybe this is also off, but let me add that couchdb/src/mango/src/mango_cursor_view.erl Lines 455 to 467 in e60e275
Does not it cause problems in this case? Or perhaps this could be utilized to hook up a countermeasure action. |
I think that's the opposite direction, the coordinator making sure the workers are still alive and exiting if not. The problem we're trying to address is when the coordinator and the workers are all working but the client, who would receive the http response, is gone, and thus they should all exit. |
501b438
to
2b0feb8
Compare
… the selector matches no index) and this continues even if the client disconnects. We want to stop the fabric work when there is no client to receive the result. fabric_streams already kills the workers if the coordinating process dies but in this circumstance it does not. this commit enhances (and renames) the existing cleanup process to be a watchdog. If enabled, the watchdog needs to be kicked regularly (by whatever activity we think indicates its worth continuing) or it will terminate the process it is watching, and kill the worker processes also. Currently only mango_httpd:handle_find_req enables the watchdog and it only kicks the watchdog when it enqueues a row to be returned (i.e, only on selector matches).
2b0feb8
to
e7f30bf
Compare
Yeah, it would definitely be nicer if
One thing this approach has for it is it has some precedent in the _changes feed. I think it's fairly likely that a client library would use the same streaming json parser code for both endpoints. But of course there could be cases of users rolling their own and only ever parsing _find and maybe _all_docs and never _changes feeds with heartbeats... |
To see how it would look tried a PR with the tcp_info getopts: #4736 I plugged it into the fabric_stream cleaners (and the db updater one) that part looks small enough. The ugliest thing is the socket options thing but it's confined to the util functions mostly. |
closing in favour of approach in #4736 |
Overview
A
_find
request can run for a very long time (on large databases when the selector matches no index) and this continues even if the client disconnects.We want to stop the fabric work when there is no client to receive the result.
fabric_streams
already kills the workers if the coordinating process dies but in this circumstance it does not.this PR enhances (and renames) the existing cleanup process to be a watchdog. If enabled, the watchdog needs to be kicked regularly (by whatever activity we think indicates its worth continuing) or it will terminate the process it is watching, and kill the worker processes also.
Currently only
mango_httpd:handle_find_req
enables the watchdog and it only kicks the watchdog when it enqueues a row to be returned (i.e, only on selector matches).Further work is being considered to directly detect a client disconnect and that might replace this approach. The reason detecting client disconnect is difficult is because the socket is in passive mode (and thus
monitor(port, MochiSocket)
and other ideas do not work).Testing recommendations
[fabric] idle_stream_timeout
to some low number of secondscurl -i 'foo:bar@localhost:15984/db1/_find' -H content-type:application/json -d '{"selector":{"foo":"bar"}}'
I intend to add automated tests for this but wanted to get the diff in front of the team first.
Related Issues or Pull Requests
N/A
Checklist
rel/overlay/etc/default.ini
src/docs
folder