Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fabric stream coordinators should exit if idle for a long period #4729

Closed
wants to merge 1 commit into from

Conversation

rnewson
Copy link
Member

@rnewson rnewson commented Aug 14, 2023

Overview

A _find request can run for a very long time (on large databases when the selector matches no index) and this continues even if the client disconnects.

We want to stop the fabric work when there is no client to receive the result. fabric_streams already kills the workers if the coordinating process dies but in this circumstance it does not.

this PR enhances (and renames) the existing cleanup process to be a watchdog. If enabled, the watchdog needs to be kicked regularly (by whatever activity we think indicates its worth continuing) or it will terminate the process it is watching, and kill the worker processes also.

Currently only mango_httpd:handle_find_req enables the watchdog and it only kicks the watchdog when it enqueues a row to be returned (i.e, only on selector matches).

Further work is being considered to directly detect a client disconnect and that might replace this approach. The reason detecting client disconnect is difficult is because the socket is in passive mode (and thus monitor(port, MochiSocket) and other ideas do not work).

Testing recommendations

  • start dev/run with log level debug
  • make a database with millions of empty documents
  • change [fabric] idle_stream_timeout to some low number of seconds
  • in a terminal run curl -i 'foo:bar@localhost:15984/db1/_find' -H content-type:application/json -d '{"selector":{"foo":"bar"}}'

I intend to add automated tests for this but wanted to get the diff in front of the team first.

Related Issues or Pull Requests

N/A

Checklist

  • Code is written and works correctly
  • Changes are covered by tests
  • Any new configurable parameters are documented in rel/overlay/etc/default.ini
  • Documentation changes were made in the src/docs folder
  • Documentation changes were backported (separated PR) to affected branches

@@ -36,6 +37,7 @@
-record(st, {
workers = ets:new(workers, [private, {keypos, #job.worker}]),
clients = ets:new(clients, [private, {keypos, #job.client}]),
client_mon_refs = ets:new(foo, [private, {keypos, #job.client_mon_ref}]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rnewson

Is there a reason to use the atom „foo“? Maybe there is a better name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heh, it doesn't really matter as it's not a named table but I'll change it

@rnewson rnewson force-pushed the fabric_teardown branch 2 times, most recently from cf809b6 to 0c12522 Compare August 15, 2023 10:07
Copy link
Contributor

@nickva nickva left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not always want workers to exit when the client process exits. It's a deliberate design choice for instance in fabric_doc_update:

try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of
{ok, {Health, Results}} when
Health =:= ok; Health =:= accepted; Health =:= error
->
ensure_all_responses(Health, AllDocs, Results);
That's to allow the doc updates to finish writing on all 3 copies, as they mostly likely have just about finished, so it would be somewhat wasteful to kill them, only to then have the internal replicator or read-repair redo all that work.

Remote monitors may have a performance impact, they take extra work and maintenance on both sides of the dist channel so it might be interesting to see if it's anything significant.

There is already an existing worker cleanup mechanism for streaming workers in fabric_streams that doesn't create extra remote monitors, can that be patched or extended instead of having a secondary mechanism to do largely the same task?

The trigger for the cleanup is that the client process dies. If a client sent a request, then is waiting for a response, then it just drops the connection, would the chttpd request process find out about it and exit?

rel/haproxy.cfg Outdated Show resolved Hide resolved
@rnewson
Copy link
Member Author

rnewson commented Aug 18, 2023

good point on fabric_doc_update and possibly others.

I confirmed directly and via haproxy that a client that disconnects (curl and CTRL-C in the first case and simply doing something that hits haproxy's server timeout like _changes?feed=continuous&timeout=10000000) that the mochiweb process is killed, and this triggers the new code above to take down workers.

@nickva
Copy link
Contributor

nickva commented Aug 18, 2023

I confirmed directly and via haproxy that a client that disconnects (curl and CTRL-C in the first case and simply doing something that hits haproxy's server timeout like _changes?feed=continuous&timeout=10000000) that the mochiweb process is killed, and this triggers the new code above to take down workers.

Cleanup happens without this patch already from what I've observed. Here is how I tested on main:

diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index e2de301b2..6c2beba83 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -127,6 +127,8 @@ handle_changes_req1(#httpd{} = Req, Db) ->
         db_open_options = [{user_ctx, couch_db:get_user_ctx(Db)}]
     },
     Max = chttpd:chunked_response_buffer_size(),
+    couch_log:error(" +++TRACING _changes++++ ~p:~p@~B REQPID:~p", [?MODULE, ?FUNCTION_NAME, ?LINE, self()]),
+    dbg:tracer(), dbg:p(self(), p),
     case ChangesArgs#changes_args.feed of
         "normal" ->
             T0 = os:timestamp(),
diff --git a/src/fabric/src/fabric_db_update_listener.erl b/src/fabric/src/fabric_db_update_listener.erl
index 78ccf5a4d..fb508294a 100644
--- a/src/fabric/src/fabric_db_update_listener.erl
+++ b/src/fabric/src/fabric_db_update_listener.erl
@@ -37,6 +37,8 @@
 }).

 go(Parent, ParentRef, DbName, Timeout) ->
+    couch_log:error(" +++TRACING UPDATE NOTIFIER+++ ~p:~p@~B ~p Parent:~p", [?MODULE, ?FUNCTION_NAME, ?LINE, DbName, Parent]),
+    dbg:tracer(), dbg:p(self(), p),
     Shards = mem3:shards(DbName),
     Notifiers = start_update_notifiers(Shards),
     MonRefs = lists:usort([rexi_utils:server_pid(N) || #worker{node = N} <- Notifiers]),
@@ -82,6 +84,8 @@ start_update_notifiers(Shards) ->

 % rexi endpoint
 start_update_notifier(DbNames) ->
+    couch_log:error(" +++TRACING UPDATE NOTIFIER WORKER+++ ~p:~p@~B~p", [?MODULE, ?FUNCTION_NAME, ?LINE, DbNames]),
+    dbg:tracer(), dbg:p(self(), p),
     {Caller, Ref} = get(rexi_from),
     Notify = config:get("couchdb", "maintenance_mode", "false") /= "true",
     State = #cb_state{client_pid = Caller, client_ref = Ref, notify = Notify},
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index fa6ea5116..64fdbf4b5 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -69,6 +69,8 @@ changes(DbName, Args, StartSeq) ->
 changes(DbName, #changes_args{} = Args, StartSeq, DbOptions) ->
     changes(DbName, [Args], StartSeq, DbOptions);
 changes(DbName, Options, StartVector, DbOptions) ->
+    couch_log:error(" ++TRACING CHANGES WORKER+++++ ~p:~p@~B~p", [?MODULE, ?FUNCTION_NAME, ?LINE, DbName]),
+    dbg:tracer(), dbg:p(self(), p),
     set_io_priority(DbName, DbOptions),
     Args0 = lists:keyfind(changes_args, 1, Options),
     #changes_args{dir = Dir, filter_fun = Filter} = Args0,

That's setting up process event traces (spawn, exit, etc) on the HTTP request process, worker process, db updater process and db update worker process. The goal is that these should be cleaned as soon as we attempt any write to the closed socket.

(db is Q=1 empty db)

% curl -i $DB/db/_changes'?feed=continuous&since=now'
HTTP/1.1 200 OK
...

^C
[error] 2023-08-18T22:54:36.654375Z [email protected] <0.949.0> cc6744542c  +++TRACING _changes++++ chttpd_db:handle_changes_req1@130 REQPID:<0.949.0>
(<0.949.0>) spawn <0.974.0> as erlang:apply(#Fun<rexi_monitor.1.77517421>,[])
(<0.949.0>) link <0.974.0>
(<0.949.0>) getting_unlinked <0.974.0>
[error] 2023-08-18T22:54:36.654949Z [email protected] <0.976.0> --------  +++TRACING UPDATE NOTIFIER+++ fabric_db_update_listener:go@40 <<"db">> Parent:<0.949.0>
(<0.949.0>) spawn <0.976.0> as fabric_db_update_listener:go(<0.949.0>,#Ref<0.830586435.801374217.85503>,<<"db">>,60000)
(<0.949.0>) link <0.976.0>
(<0.949.0>) spawn <0.978.0> as erlang:apply(#Fun<rexi_monitor.1.77517421>,[])
(<0.949.0>) link <0.978.0>
(<0.949.0>) spawn <0.979.0> as erlang:apply(#Fun<fabric_streams.3.106508571>,[])
[error] 2023-08-18T22:54:36.655018Z [email protected] <0.977.0> cc6744542c  ++TRACING CHANGES WORKER+++++ fabric_rpc:changes@72<<"shards/00000000-ffffffff/db.1692392006">>
(<0.976.0>) spawn <0.980.0> as erlang:apply(#Fun<rexi_monitor.1.77517421>,[])
[error] 2023-08-18T22:54:36.655123Z [email protected] <0.982.0> --------  +++TRACING UPDATE NOTIFIER WORKER+++ fabric_db_update_listener:start_update_notifier@87[<<"shards/00000000-ffffffff/db.1692392006">>]
(<0.976.0>) link <0.980.0>
(<0.976.0>) spawn <0.981.0> as erlang:apply(#Fun<fabric_db_update_listener.2.111341222>,[])
(<0.977.0>) exit normal
(<0.949.0>) getting_unlinked <0.978.0>

(<0.976.0>) exit normal
(<0.982.0>) exit killed
(<0.949.0>) exit shutdown
  • 0.949.0 request process is killed with a shutdown as expected. Probably after trying to write to a closed socket.
  • 0.976.0 db update notifier exits normal
  • 0.982.0 db update notifier worker is killed by the cleaner process

Since db is empty we're not actually starting any workers but those should also by cleaned by the cleanup helper process in fabric_streams.

@rnewson
Copy link
Member Author

rnewson commented Aug 21, 2023

@nickva 😱 interesting!

What we've observed (a huge rise in interactive ioq requests after a series of _find requests that haproxy classified as sH--, and the new mango_stats showing a mango process that exits some hours after those after having examined the same number of documents as the database has in total) can only currently be explained by these processes simply continuing despite the client having gone away (the client being haproxy here, it closes the connection after its own timeout server interval).

I can confirm what you're seeing, and I have also done so for an unindexed _find request by adding timer:sleeps in various places of the remote side of fabric_view_all_docs.

So I guess we're looking for a gap in the fabric_streams error handling rather than a complete absence of orphaned worker cleanup code.

@rnewson
Copy link
Member Author

rnewson commented Aug 21, 2023

I've built a large local database, 6 million docs, no artificial sleeps, and with dbg:tracer calls in fabric_rpc:all_docs. I have haproxy with a 10s server timeout.

I've observed;

  1. curl responds with a 504 after 10s
  2. haproxy logs this as an sH-- (as response had not started)
  3. cpu and disk continue to churn for ages after, cpu consumed by beam.smp
  4. tracer does not show the processes exiting promptly

@rnewson
Copy link
Member Author

rnewson commented Aug 21, 2023

took me too long to realise, but I think the difference is the attempt to send data (between _changes and _find). that's the only time we notice the socket closed, and thus the processes exit (and all the cleanup you pointed out then happens).

The mochiweb http process does stay alive after the sH-- event. If I kill it (from remsh) then my cleanup kicks in and kills the orphaned workers. without my change the fabric_streams cleanup kills them.

@nickva
Copy link
Contributor

nickva commented Aug 21, 2023

took me too long to realise, but I think the difference is the attempt to send data (between _changes and _find). that's the only time we notice the socket closed, and thus the processes exit (and all the cleanup you pointed out then happens).

Yup that's the key. We have to read, write or find some other way to periodically check the socket state and then terminate the connection process. The stream cleanup mechanism as is will only work as long as the connection process dies. We looked at this before and there was no simple way to detect it out-of-band in Erlang.

We could try to emit empty line heartbeats in _find this might be the simplest solution.

If there was a socket recv with options function in Erlang, we might have been able to periodically try to recv with a MSG_PEEK option, to pretend we're reading the data and see what's there. But recv in Erlang doesn't allow arbitrary (raw) options. Maybe there is some non-portable raw inet:getopts/2 option we can query, but I think just trying periodic newline or empty line pings and flushing them should do it.

@nickva
Copy link
Contributor

nickva commented Aug 21, 2023

In principle at least the experimental socket module does allow peek-ing with recv since OTP 24.0: https://www.erlang.org/doc/man/socket#type-msg_flag

msg_flag() =
    cmsg_cloexec | confirm | ctrunc | dontroute | eor | errqueue |
    more | oob | peek | trunc

But we can't use socket for non-socket connections and it might be a while if/until mochiweb starts using that module. Especially since in some cases socket has not been performing as well as the old inet driver.

@rnewson
Copy link
Member Author

rnewson commented Aug 21, 2023

I've tried tweaking mochiweb to enable {keepalive, true} after it accepts a connection, so it can know if the client disconnects. But it seems not to have any effect, which is odd.

@rnewson
Copy link
Member Author

rnewson commented Aug 21, 2023

hm, not seeing a way to change count/idle/interval, so I bet the macos defaults are quite high.

@nickva
Copy link
Contributor

nickva commented Aug 21, 2023

On localhost keepalive won't behave any differently for disconnects, as network handling is just a direct callback into the other peer's socket code at the kernel level. In other words the socket will find out it's closed anyway. For remote connections it's a different story and keepalive will be useful there.

So keepalive is useful as a step 0 - to allow the socket at the kernel level to become closed. However that is not enough for us, what is missing is at the HTTP protocol stage in mochiweb we do not have a reason to do any socket operations (and we can't do any extra ones outside of mochweb in a sneaky way so far it seems, which is what I was getting at above). We're forced to wait for the mango application to return a response. Without any attempted writes or reads to socket at the mochiweb level won't have a way to find out the socket is closed. Which could come after days of waiting in pathological cases like we saw already.

The only solutions I see so far is to either modify mango to periodically handle a timeout and push a \n between row (after forcing to emit headers on the first timeout) or find some possibly non-portable raw socket getops query to detect that the socket is closed out-of-band as it were. We could enhance the existing fabric_stream monitor to periodically check it or invent some new thing (maybe some new thing in mochiweb even).

@rnewson
Copy link
Member Author

rnewson commented Aug 21, 2023

I'm not entirely giving up on tcp keepalive (it ought to kill the process/port for the socket, which I think will kill the mochiweb process, but subject to testing)

However, I'm beginning to think what we need is application level. There's clearly a modest, finite number for how long we're prepared to wait for a response to start, which Cloudant, for example, already expresses in haproxy timeout server. So I think couchdb needs a similar setting, that if a response hasn't begun in that timeframe, we exit (and fabric_streams cleanup then triggers). The only endpoint that delays the start of the response by a potentially very long time is _find, because it can discard work. A filtered changes response has at least started the response even if nothing passes the filter.

@nickva
Copy link
Contributor

nickva commented Aug 21, 2023

it ought to kill the process/port for the socket, which I think will kill the mochiweb process, but subject to testing

It doesn't hurt to try but I don't see how that could plausibly work. There is no active socket monitoring or polling in mochiweb. If we were in active mode or were in C and used a select/poll/epoll mechanism we'd probably find out about it.

@nickva
Copy link
Contributor

nickva commented Aug 21, 2023

The only ways I see as viable are:

  1. Add a mango application level timeout like changes has so we have to emit a periodic newline between json rows

  2. Add an active custom socket inactivity process with a timeout. Either as part of mochiweb, fabric stream or chttpd. After we receive the request we spawn a process which starts timer. When we send or receive from the socket we reset the timer, if we fail to send or receive anything on a socket the timer expires and blows up killing the connection process. But that involves reseting an extra timer and an extra process so it has performance implications. Maybe we can do something clever with erlang counters there...

@rnewson
Copy link
Member Author

rnewson commented Aug 21, 2023

changed approach, though I don't think this is the final answer either.

this diff adds a deadline by which a delayed response must start or be terminated. This helps when using _find if it takes a long time to find the first match (or if it's plowing through a large database via _all_docs and won't find any matches)

It's not broad enough but the previous effort with monitors was pointless (nick is correct that fabric_streams cleans up if the parent process exits)

@nickva
Copy link
Contributor

nickva commented Aug 22, 2023

I managed to find a way to seemingly detect the connection state change after the local client (curl) disconnects by querying the equivalent of getsockopt(fd(), IPPROTO_TCP, TCP_CONNECTION_INFO, &info, &info_size)

([email protected])22> rp(inet:getopts(#Port<0.24>, [{raw, 6, 262, 1000}])).
{ok,[{raw,6,262,
          <<4,6,6,0,7,0,0,0,0,0,0,0,0,0,0,0,204,63,0,0,0,192,255,
            63,18,0,2,0,0,57,6,0,0,0,0,0,88,58,6,0,1,0,0,0,1,0,0,
            0,0,0,0,0,0,0,0,0,2,0,0,0,0,0,0,0,177,1,0,0,0,0,0,0,
            0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,148,0,0,0,0,0,0,0,0,
            0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>}]}
ok

Ctrl+C...

([email protected])24> inet:info(#Port<0.24>).
#{counters =>
      #{recv_avg => 24,recv_cnt => 6,recv_dvi => 10,recv_max => 51,
        recv_oct => 148,send_avg => 216,send_cnt => 2,
        send_max => 427,send_oct => 433,send_pend => 0},
  input => 0,
  links => [<0.12859.0>],
  memory => 40,monitors => [],output => 433,
  owner => <0.12859.0>,
  states => [connected,open]}

([email protected])25> rp(inet:getopts(#Port<0.24>, [{raw, 6, 262, 1000}])).
{ok,[{raw,6,262,
          <<5,6,6,0,7,0,0,0,0,0,0,0,0,0,0,0,204,63,0,0,0,192,255,
            63,18,0,2,0,0,57,6,0,0,0,0,0,88,58,6,0,59,58,0,0,1,0,
            0,0,0,0,0,0,0,0,0,0,2,0,0,0,0,0,0,0,177,1,0,0,0,0,0,
            0,0,0,0,0,0,0,0,0,2,0,0,0,0,0,0,0,148,0,0,0,0,0,0,0,
            0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>}]}
ok

Looks like Linux has a similar facility so there is some hope, though it does look very ugly

@rnewson
Copy link
Member Author

rnewson commented Aug 22, 2023

updated again. I've expanded fabric_streams existing 'cleanup' process to be a watchdog (in the classic sense).

before starting a fabric_stream you must call fabric_stream:enable_watchdog(). If you do, the cleanup process starts a timer. If the watchdog is not kicked at least once (with fabric_stream:kick_watchdog().) before the timer fires, the watchdog kills the coordinator process (for us, the mochiweb/httpd process) and its workers.

by default this is not active, so all calls to fabric:all_docs etc are unaffected.

in mango_httpd I enable the watchdog and only when adding a row to the response do I kick the watchdog. If we go a long time between rows, the watchdog fires.

similar enable/kick can be done on other endpoints.

@rnewson rnewson changed the title Fabric workers should exit if the client exits Fabric stream coordinators should exit if idle for a long period Aug 22, 2023
@rnewson rnewson force-pushed the fabric_teardown branch 3 times, most recently from d8c1acd to 7859ae6 Compare August 22, 2023 18:15
@iilyak
Copy link
Contributor

iilyak commented Aug 22, 2023

I managed to find a way to seemingly detect the connection state change after the local client (curl) disconnects by querying the equivalent of getsockopt(fd(), IPPROTO_TCP, TCP_CONNECTION_INFO, &info, &info_size)

([email protected])22> rp(inet:getopts(#Port<0.24>, [{raw, 6, 262, 1000}])).
{ok,[{raw,6,262,
          <<4,6,6,0,7,0,0,0,0,0,0,0,0,0,0,0,204,63,0,0,0,192,255,
            63,18,0,2,0,0,57,6,0,0,0,0,0,88,58,6,0,1,0,0,0,1,0,0,
            0,0,0,0,0,0,0,0,0,2,0,0,0,0,0,0,0,177,1,0,0,0,0,0,0,
            0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,148,0,0,0,0,0,0,0,0,
            0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>}]}
ok

This is nice. If we can cleanly encapsulate it in mochiweb then it might be a viable option. I am planing to look into mochiweb code to see if it is doable.

@nickva
Copy link
Contributor

nickva commented Aug 22, 2023

For Linux it might be IPPROTO_TCP / TCP_INFO and I haven't checked exactly which bits conclusively show us the closed state yet.

@iilyak
Copy link
Contributor

iilyak commented Aug 22, 2023

I managed to find a way to seemingly detect the connection state change after the local client (curl) disconnects by querying the equivalent of getsockopt(fd(), IPPROTO_TCP, TCP_CONNECTION_INFO, &info, &info_size)

([email protected])22> rp(inet:getopts(#Port<0.24>, [{raw, 6, 262, 1000}])).
{ok,[{raw,6,262,
          <<4,6,6,0,7,0,0,0,0,0,0,0,0,0,0,0,204,63,0,0,0,192,255,
            63,18,0,2,0,0,57,6,0,0,0,0,0,88,58,6,0,1,0,0,0,1,0,0,
            0,0,0,0,0,0,0,0,0,2,0,0,0,0,0,0,0,177,1,0,0,0,0,0,0,
            0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,148,0,0,0,0,0,0,0,0,
            0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>}]}
ok

This is nice. If we can cleanly encapsulate it in mochiweb then it might be a viable option. I am planing to look into mochiweb code to see if it is doable.

I came to the same conclusion as Nick. It is quite a hack especially given the fact that it only would work on Linux. I also looked into using undocumented function unrecv. Which can be called in a quick succession after a recv.

([email protected])49> f(Port), Port = hd(lists:reverse(erlang:ports())).
#Port<0.54>
([email protected])51> f(Res), Res = inet_tcp:recv(Port, 1, 10).
{error,timeout}
([email protected])52> inet_tcp:unrecv(Port, <<>>).
ok
([email protected])53> f(Res), Res = inet_tcp:recv(Port, 1, 10).
{error,closed}
([email protected])54> inet_tcp:unrecv(Port, <<>>).
ok
still_connected(Socket) ->
    // read one byte with 10ms timeout
    case recv(Socket, 1, 10) of
        {error, timeout} ->
            true;
        {ok, Data} ->
            gen_tcp:unrecv(Socket, Data),
            true;
        {error, closed} ->
            false
    end.

We could use timer:apply_after/4 to call still_connected/1. However the problem is that timer:apply_after would be called concurrently. Therefore there is a chance that the request handler process would call recv in between recv/unrecv. This means we need locking. We could implement locking using ets table, BUT.

The conclusion is it is not easy.

@nickva
Copy link
Contributor

nickva commented Aug 22, 2023

Interesting find @iilyak! Yeah, one worry about the gen_tcp:unrecv/2 function is that it may mess up the HTTP protocol stream in case of any pipelining. We'd be reading data out of band and then putting it back into the Erlang's userland TCP buffer from a different process than mochiweb's request client. At least with TCP_INFO at worse we'd get an unknown state or unsupported OS and we'd just do what we did before without the danger of messing up the HTTP TCP stream.

@nickva
Copy link
Contributor

nickva commented Aug 23, 2023

A bit more of a cleaned up connection script version which should work on macos, linux and freebsd:

#!/usr/bin/env escript

is_closed(Sock) ->
    OsType = os:type(),
    case tcp_info_opt(OsType) of
        {raw, _, _, _} = TcpInfoOpt ->
            case inet:getopts(Sock, [TcpInfoOpt]) of
                {ok, [{raw, _, _, <<State:8/native, _/binary>>}]} ->
                    tcp_fsm_state(State, OsType);
                {ok, []} ->
                    false;
                {error, einval} ->
                    % Already cleaned up
                    true;
                {error, _} ->
                    false
            end;
        undefined ->
            false
    end.

% All OS-es have the tcpi_state (uint8) as first member of tcp_info struct

tcp_info_opt({unix, linux}) ->
    %% include/netinet/in.h
    %%   IPPROTO_TCP = 6
    %%
    %% include/netinet/tcp.h
    %%   #define TCP_INFO 11
    %%
    {raw, 6, 11, 1};
tcp_info_opt({unix, darwin}) ->
    %% include/netinet/in.h
    %%   #define IPPROTO_TCP   6
    %%
    %% netinet/tcp.h
    %%   #define TCP_CONNECTION_INFO  0x106
    %%
    {raw, 6, 16#106, 1};
tcp_info_opt({unix, freebsd}) ->
    %% sys/netinet/in.h
    %%  #define  IPPROTO_TCP  6 /* tcp */
    %%
    %% sys/netinet/tcp.h
    %%  #define  TCP_INFO    32 /* retrieve tcp_info structure */
    %%
    {raw, 6, 32, 1};
tcp_info_opt({_, _}) ->
    undefined.

tcp_fsm_state(State, {unix, linux}) ->
    %% netinet/tcp.h
    %%   enum
    %%   {
    %%     TCP_ESTABLISHED = 1,
    %%     TCP_SYN_SENT,
    %%     TCP_SYN_RECV,
    %%     TCP_FIN_WAIT1,
    %%     TCP_FIN_WAIT2,
    %%     TCP_TIME_WAIT,
    %%     TCP_CLOSE,
    %%     TCP_CLOSE_WAIT,
    %%     TCP_LAST_ACK,
    %%     TCP_LISTEN,
    %%     TCP_CLOSING
    %%   }
    %%
    lists:member(State, [4, 5, 6, 7, 8, 9, 10]);
tcp_fsm_state(State, {unix, darwin}) ->
    %% netinet/tcp_fsm.h
    %%   #define TCPS_CLOSED             0       /* closed */
    %%   #define TCPS_LISTEN             1       /* listening for connection */
    %%   #define TCPS_SYN_SENT           2       /* active, have sent syn */
    %%   #define TCPS_SYN_RECEIVED       3       /* have send and received syn */
    %%   #define TCPS_ESTABLISHED        4       /* established */
    %%   #define TCPS_CLOSE_WAIT         5       /* rcvd fin, waiting for close */
    %%   #define TCPS_FIN_WAIT_1         6       /* have closed, sent fin */
    %%   #define TCPS_CLOSING            7       /* closed xchd FIN; await FIN ACK */
    %%   #define TCPS_LAST_ACK           8       /* had fin and close; await FIN ACK */
    %%   #define TCPS_FIN_WAIT_2         9       /* have closed, fin is acked */
    %%   #define TCPS_TIME_WAIT          10      /* in 2*msl quiet wait after close */
    %%
    lists:member(State, [0, 5, 6, 7, 8, 9, 10]);
tcp_fsm_state(State, {unix, freebsd}) ->
    %% netinet/tcp_fsm.h
    %%   #define TCPS_CLOSED 0 /* closed */
    %%   #define TCPS_LISTEN 1 /* listening for connection */
    %%   #define TCPS_SYN_SENT 2  /* active, have sent syn */
    %%   #define TCPS_SYN_RECEIVED 3 /* have sent and received syn */
    %%   #define TCPS_ESTABLISHED 4 /* established */
    %%   #define TCPS_CLOSE_WAIT 5 /* rcvd fin, waiting for close */
    %%   #define TCPS_FIN_WAIT_1 6 /* have closed, sent fin */
    %%   #define TCPS_CLOSING 7/* closed xchd FIN; await FIN ACK */
    %%   #define TCPS_LAST_ACK 8/* had fin and close; await FIN ACK */
    %%   #define TCPS_FIN_WAIT_2 9/* have closed, fin is acked */
    %%   #define TCPS_TIME_WAIT 10 /* in 2*msl quiet wait after close */
    %%
    lists:member(State, [0, 5, 6, 7, 8, 9, 10]).

monitor_loop(Sock) ->
    timer:sleep(1000),
    io:format("[mon] is_closed: ~p~n", [is_closed(Sock)]),
    monitor_loop(Sock).

client(Port) ->
    timer:sleep(1000),
    io:format("[client] connecting on ~p~n", [Port]),
    {ok, ClientSock} = gen_tcp:connect("127.0.0.1", Port, [{active, false}, binary]),
    io:format("[client] connected ~p sleeping for 5 seconds and exiting~n", [ClientSock]),
    timer:sleep(5000),
    io:format("[client] exiting ~n", []),
    ok.

main(_) ->
    {ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]),
    {ok, Port} = inet:port(ListenSocket),
    io:format("[srv] spawning client to connect to port ~p~n", [Port]),
    spawn_link(fun() -> client(Port) end),
    {ok, ClientSocket} = gen_tcp:accept(ListenSocket),
    io:format("[srv] accepted a connection ~p, sleeping for 10 seconds then exiting ~n", [ClientSocket]),
    spawn_link(fun() -> monitor_loop(ClientSocket) end),
    timer:sleep(10000).
./tcp_server_erlang.escript
[srv] spawning client to connect to port 59121
[client] connecting on 59121
[srv] accepted a connection #Port<0.8>, sleeping for 10 seconds then exiting
[client] connected #Port<0.7> sleeping for 5 seconds and exiting
[mon] is_closed: false
[mon] is_closed: false
[mon] is_closed: false
[mon] is_closed: false
[client] exiting
[mon] is_closed: true
[mon] is_closed: true
[mon] is_closed: true
[mon] is_closed: true
[mon] is_closed: true

"watchdog ~p detected idle interval, killing ~p",
[self(), St#watchdog_state.coordinator]
),
exit(St#watchdog_state.coordinator, kill),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit: wonder if there is a useful difference between kill and other exits (say shutdown) as we do some handler try...catch can logging/cleanup in chttpd an others.

@nickva
Copy link
Contributor

nickva commented Aug 23, 2023

updated again. I've expanded fabric_streams existing 'cleanup' process to be a watchdog (in the classic sense).

before starting a fabric_stream you must call fabric_stream:enable_watchdog(). If you do, the cleanup process starts a timer. If the watchdog is not kicked at least once (with fabric_stream:kick_watchdog().) before the timer fires, the watchdog kills the coordinator process (for us, the mochiweb/httpd process) and its workers.

by default this is not active, so all calls to fabric:all_docs etc are unaffected.

in mango_httpd I enable the watchdog and only when adding a row to the response do I kick the watchdog. If we go a long time between rows, the watchdog fires.

similar enable/kick can be done on other endpoints.

That's a decent solution. It would detect inactivity on the application side (workers are slow or no index defined or if anything else gets stuck and we don't emit rows). The timeout might be a bit low as there might be existing (even if perhaps pathological) queries which could emit rows every 61 seconds, or if we raise to 5 minutes, there might still be a some cases which we might still break. It's sort of a balancing act as if we make it too large, it might reduce the effect of this watchdog (requests might pile up).

I still think it might be better to be able to detect if the client socket has closed so I kept pressing with the tcp_info options check to see how many OS-es we it can cover and to make it a bit less ugly. So far I couldn't figure how to get windows to do it but got all the others I think. Here we can be more bold with a polling interval even at 5 or 10 seconds if needed as we can confidently clean if the client disconnects.

Another direction to take it is to emit newline (heartbeats) between json rows. I couldn't quite see where to do that without disturbing the views and all_docs callback mechanism too much. Maybe have a last timestamp on the cursor in

{no_match, _, {execution_stats, Stats}} ->
Cursor1 = Cursor#cursor{
execution_stats = Stats
},
{ok, Cursor1};
and we can issue a handle_doc(...) callback with a timeout and then in the mango_http we'd emit a \n? That would maintain the connection alive and quit it as soon as a send failure happens? I am not too familiar with mango so that may be completely off, too.

@rnewson
Copy link
Member Author

rnewson commented Aug 23, 2023

I would also prefer to detect the client disconnect directly but I am less keen to see the raw packet inspection stuff in couchdb in general, unless we can be confident that it causes no harm on any platform we haven't handled. That it wouldn't work on Windows isn't a deal breaker for me, we'd need to mention that in the release notes, and perhaps someone more familiar with Windows could add support.

As for heartbeats in the _find response, this obviously changes the response in a way that no client is currently expected. The idea that a blank line (or whitespace in general) will be silently handled is reasonable but requires a lot of testing to be confident.

@pgj
Copy link
Contributor

pgj commented Aug 23, 2023

Maybe this is also off, but let me add that mango_cursor_view implements its own keep-alive mechanism:

-spec maybe_send_mango_ping() -> ok | term().
maybe_send_mango_ping() ->
Current = os:timestamp(),
LastPing = get(mango_last_msg_timestamp),
% Fabric will timeout if it has not heard a response from a worker node
% after 5 seconds. Send a ping every 4 seconds so the timeout doesn't happen.
case timer:now_diff(Current, LastPing) > ?HEARTBEAT_INTERVAL_IN_USEC of
false ->
ok;
true ->
rexi:ping(),
set_mango_msg_timestamp()
end.

Does not it cause problems in this case? Or perhaps this could be utilized to hook up a countermeasure action.

@rnewson
Copy link
Member Author

rnewson commented Aug 23, 2023

I think that's the opposite direction, the coordinator making sure the workers are still alive and exiting if not.

The problem we're trying to address is when the coordinator and the workers are all working but the client, who would receive the http response, is gone, and thus they should all exit.

@rnewson rnewson force-pushed the fabric_teardown branch 2 times, most recently from 501b438 to 2b0feb8 Compare August 23, 2023 14:13
… the

selector matches no index) and this continues even if the client disconnects.

We want to stop the fabric work when there is no client to receive the
result. fabric_streams already kills the workers if the coordinating process
dies but in this circumstance it does not.

this commit enhances (and renames) the existing cleanup process to be a watchdog. If
enabled, the watchdog needs to be kicked regularly (by whatever activity we
think indicates its worth continuing) or it will terminate the process it is
watching, and kill the worker processes also.

Currently only mango_httpd:handle_find_req enables the watchdog and it only
kicks the watchdog when it enqueues a row to be returned (i.e, only on selector
matches).
@nickva
Copy link
Contributor

nickva commented Aug 23, 2023

I am less keen to see the raw packet inspection stuff in couchdb in general, unless we can be confident that it causes no harm on any platform we haven't handled.

Yeah, it would definitely be nicer if inet:monitor/1 or inet:info/1 worked here. However we have a limited set of supported platforms compared to Erlang/OTP itself, so we could test them in CI. On the ones I tried so far (linux: ubuntu on x86_64, ppc64le, z-linux, macos, windows) when it doesn't support the raw option it returns {ok, []} which we interpret as the default socket not closed. It does look ugly but it seems fairly safe as it doesn't alter the packet stream just copies a single byte from the stats, and we can always fallback to the old behavior for unsupported/unexpected platforms.

As for heartbeats in the _find response, this obviously changes the response in a way that no client is currently expected. The idea that a blank line (or whitespace in general) will be silently handled is reasonable but requires a lot of testing to be confident.

One thing this approach has for it is it has some precedent in the _changes feed. I think it's fairly likely that a client library would use the same streaming json parser code for both endpoints. But of course there could be cases of users rolling their own and only ever parsing _find and maybe _all_docs and never _changes feeds with heartbeats...

@nickva
Copy link
Contributor

nickva commented Aug 23, 2023

To see how it would look tried a PR with the tcp_info getopts: #4736

I plugged it into the fabric_stream cleaners (and the db updater one) that part looks small enough. The ugliest thing is the socket options thing but it's confined to the util functions mostly.

@rnewson
Copy link
Member Author

rnewson commented Aug 23, 2023

closing in favour of approach in #4736

@rnewson rnewson closed this Aug 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants