Skip to content

Commit

Permalink
[phantom-presence] handle stray websockets (instantdb#207)
Browse files Browse the repository at this point in the history
  • Loading branch information
stopachka authored Sep 13, 2024
1 parent c5963f9 commit 5c86302
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 43 deletions.
15 changes: 14 additions & 1 deletion client/packages/core/src/Reactor.js
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,11 @@ export default class Reactor {
);
return;
}
if (this._isManualClose) {
this._isManualClose = false;
log.info("[socket-close] manual close, will not reconnect");
return;
}

log.info("[socket-close] scheduling reconnect", this._reconnectTimeoutMs);
setTimeout(() => {
Expand All @@ -910,7 +915,15 @@ export default class Reactor {
}, this._reconnectTimeoutMs);
};

_ensurePreviousSocketClosed() {
if (this._ws && this._ws.readyState === WS_OPEN_STATUS) {
this._isManualClose = true;
this._ws.close();
}
}

_startSocket() {
this._ensurePreviousSocketClosed();
this._ws = new WebSocket(
`${this.config.websocketURI}?app_id=${this.config.appId}`,
);
Expand Down Expand Up @@ -1206,7 +1219,7 @@ export default class Reactor {
appId: this.config.appId,
refreshToken,
});
} catch (e) {}
} catch (e) { }
}

this.changeCurrentUser(null);
Expand Down
80 changes: 65 additions & 15 deletions server/src/instant/lib/ring/websocket.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
(:refer-clojure :exclude [send])
(:require [ring.adapter.undertow.headers :refer [set-headers]]
[instant.util.json :refer [->json]]
[instant.util.tracer :as tracer])
[instant.util.tracer :as tracer]
[instant.util.delay :as delay])
(:import
[io.undertow.server HttpServerExchange]
[io.undertow.websockets
Expand All @@ -21,43 +22,69 @@
BufferedBinaryMessage
BufferedTextMessage
CloseMessage
WebSocketChannel
StreamSourceFrameChannel WebSocketChannel
WebSockets
WebSocketCallback]
[io.undertow.websockets.spi WebSocketHttpExchange]
[org.xnio ChannelListener]
[ring.adapter.undertow Util]
[clojure.lang IPersistentMap]
[io.undertow.websockets.extensions PerMessageDeflateHandshake]
[java.util.concurrent.locks ReentrantLock]))
[java.util.concurrent.locks ReentrantLock]
[java.util.concurrent.atomic AtomicLong]
[java.nio ByteBuffer]
[org.xnio IoUtils]))

(defn ws-listener
"Creates an `AbstractReceiveListener`. This relays calls to
`on-message`, `on-close-message`, and `on-error` callbacks.
See `ws-callback` for more details."
[{:keys [on-message on-close-message on-error channel-wrapper]}]
[{:keys [on-message on-close-message on-error channel-wrapper atomic-last-received-at]}]
(let [on-message (or on-message (constantly nil))
on-error (or on-error (constantly nil))
on-close-message (or on-close-message (constantly nil))]
(proxy [AbstractReceiveListener] []
(onFullTextMessage [^WebSocketChannel channel ^BufferedTextMessage message]

(.set atomic-last-received-at (System/currentTimeMillis))
(on-message {:channel (channel-wrapper channel)
:data (.getData message)}))
(onFullBinaryMessage [^WebSocketChannel channel ^BufferedBinaryMessage message]
(.set atomic-last-received-at (System/currentTimeMillis))
(let [pooled (.getData message)]
(try
(let [payload (.getResource pooled)]
(on-message {:channel (channel-wrapper channel)
:data (Util/toArray payload)}))
(finally (.free pooled)))))
(onPong [^WebSocketChannel channel ^StreamSourceFrameChannel channel]
(.set atomic-last-received-at (System/currentTimeMillis)))
(onCloseMessage [^CloseMessage message ^WebSocketChannel channel]
(on-close-message {:channel (channel-wrapper channel)
:message message}))
(onError [^WebSocketChannel channel ^Throwable error]
(on-error {:channel (channel-wrapper channel)
:error error})))))

(defonce ping-pool (delay/make-pool!))

(defn straight-jacket-run-ping-job [^WebSocketChannel channel
^AtomicLong atomic-last-received-at
idle-timeout-ms]
(try
(let [now (System/currentTimeMillis)
last-received-at (.get atomic-last-received-at)
ms-since-last-message (- now last-received-at)]
(if (> ms-since-last-message idle-timeout-ms)
(tracer/with-span! {:name "socket/close-inactive"}
(IoUtils/safeClose channel))
(WebSockets/sendPingBlocking (ByteBuffer/allocate 0)
channel)))
(catch Exception e
(tracer/record-exception-span! e {:name "socket/ping-err"
:escaping? false}))))

(defn ws-callback
"Creates a `WebsocketConnectionCallback`. This relays data to the
following callbacks:
Expand Down Expand Up @@ -85,26 +112,49 @@
on-error: Called when the server encounters an error sending a message
:channel - The `WebSocketChannel` object
:error - The error Throwable"
[{:keys [on-open on-close listener]
:or {on-open (constantly nil) on-close (constantly nil)}
:error - The error Throwable
We also kick off a ping worker. It sends a `ping` message every
`ping-interval-ms`. If the client doesn't send any message for
`idle-timeout-ms`, we close the connection.
"
[{:keys [on-open on-close listener ping-interval-ms idle-timeout-ms]
:or {on-open (constantly nil)
on-close (constantly nil)
ping-interval-ms 5000
idle-timeout-ms 15000}
:as ws-opts}]
(let [send-lock (ReentrantLock.)
atomic-last-received-at (AtomicLong. (System/currentTimeMillis))
channel-wrapper (fn [ch]
{:undertow-websocket ch
:send-lock send-lock})
listener (if (instance? ChannelListener listener)
listener
(ws-listener (assoc ws-opts :channel-wrapper channel-wrapper)))
close-task (reify ChannelListener
(handleEvent [_this channel]
(on-close (channel-wrapper channel))))]
(ws-listener (assoc ws-opts
:channel-wrapper channel-wrapper
:atomic-last-received-at atomic-last-received-at)))]

(reify WebSocketConnectionCallback
(^void onConnect [_ ^WebSocketHttpExchange exchange ^WebSocketChannel channel]
(on-open {:exchange exchange :channel (channel-wrapper channel)})
(.addCloseTask channel close-task)
(.set (.getReceiveSetter channel) listener)
(.resumeReceives channel)))))
(let [ping-job (delay/repeat-fn
ping-pool
ping-interval-ms
(fn []
(straight-jacket-run-ping-job channel
atomic-last-received-at
idle-timeout-ms)))

close-task (reify ChannelListener
(handleEvent [_this channel]
(.cancel ping-job false)
(on-close (channel-wrapper channel))))]
(.set atomic-last-received-at (System/currentTimeMillis))
(on-open {:exchange exchange
:channel (channel-wrapper channel)})
(.addCloseTask channel close-task)
(.set (.getReceiveSetter channel) listener)
(.resumeReceives channel))))))

(defn ws-request [^HttpServerExchange exchange ^IPersistentMap headers ^WebSocketConnectionCallback callback]
(let [handler (-> (WebSocketProtocolHandshakeHandler. callback)
Expand Down
38 changes: 11 additions & 27 deletions server/src/instant/reactive/session.clj
Original file line number Diff line number Diff line change
Expand Up @@ -492,15 +492,6 @@
;; -----------------
;; Websocket Interop

(defonce ping-pool (delay/make-pool!))

(defn start-ping-job [store-conn id]
(delay/repeat-fn
ping-pool
5000
(fn []
(rs/try-send-event! store-conn id {:op :ping}))))

(defn on-open [store-conn {:keys [id] :as socket}]
(tracer/with-span! {:name "socket/on-open"
:attributes {:session-id (:id socket)}}
Expand All @@ -521,23 +512,17 @@
(defn on-close [store-conn eph-store-atom {:keys [id pending-handlers]}]
(tracer/with-span! {:name "socket/on-close"
:attributes {:session-id id}}
(let [{:keys [ping-job]} (rs/get-socket @store-conn id)]
(if ping-job
(.cancel ping-job false)
(tracer/record-info! {:name "socket/on-close-no-ping-job"
:attributes {:session-id id}}))

(doseq [{:keys [future silence-exceptions op]} @pending-handlers]
(tracer/with-span! {:name "cancel-pending-handler"
:attributes {:op op}}
(silence-exceptions true)
(future-cancel future)))

(let [app-id (-> (rs/get-auth @store-conn id)
:app
:id)]
(eph/leave-by-session-id! eph-store-atom app-id id)
(rs/remove-session! store-conn id)))))
(doseq [{:keys [future silence-exceptions op]} @pending-handlers]
(tracer/with-span! {:name "cancel-pending-handler"
:attributes {:op op}}
(silence-exceptions true)
(future-cancel future)))

(let [app-id (-> (rs/get-auth @store-conn id)
:app
:id)]
(eph/leave-by-session-id! eph-store-atom app-id id)
(rs/remove-session! store-conn id))))

(defn undertow-config
[store-conn eph-store-atom receive-q {:keys [id]}]
Expand All @@ -548,7 +533,6 @@
:http-req http-req
:ws-conn ws-conn
:receive-q receive-q
:ping-job (start-ping-job store-conn id)
:pending-handlers pending-handlers}]
(on-open store-conn socket)))
:on-message (fn [{:keys [data]}]
Expand Down

0 comments on commit 5c86302

Please sign in to comment.