Skip to content

Commit

Permalink
[client-app] (deltas P4) Fixup DeltaStramingConnection + retry on close
Browse files Browse the repository at this point in the history
Summary:
This commit completely refactors `DeltaStreamingConnection`, notably
introducing the following changes:

- Right now, `AccountDeltaConnection` establishes both delta connections to the cloud api and to the `client-sync` database (K2). This class is meant to disapper in favor of splitting into two different classes meant for syncing with the n1Cloud api and the local database. Specifically, `DeltaStreamingConnection`'s only responsibility is now to connect to the n1Cloud API and establish an http streaming connection for metadata deltas, etc. This class no longer unecessarily inherits from `NylasLongConnection`, which removes a lot of unecessary callback indirection.
- The statuses of the n1Cloud delta streaming connections are now stored in as JSONBlobs in edgehill.db under new keys. This commit ensures that the data is correctly migrated from the old key (`NylasSyncWorker:<account_id>`).
- The `DeltaStreamingConnection` now correctly retries when closed or errors. This logic previously existed, but was removed during the K2 transition: https://github.com/nylas/nylas-mail/blob/n1-pro/internal_packages/worker-sync/lib/nylas-sync-worker.coffee#L67
- Delta connection retries now backoff using the `ExponentialBackoffScheduler`
- Attempt to restore the delta connection whenever the app comes back online

Depends on D4119

Test Plan: manual + planned unit tests in upcoming diff

Reviewers: halla, mark, evan, spang

Reviewed By: evan

Differential Revision: https://phab.nylas.com/D4120
  • Loading branch information
jstejada committed Mar 8, 2017
1 parent b4d3da1 commit 8bc2ec5
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,14 @@ const BASE_RETRY_DELAY = 1000;
*/
export default class AccountDeltaConnection {


constructor(account) {
// TODO This class is in the process of being ripped apart, and replaced by
// DeltaStreamingConnection, and will disappear in
// the next diff, but for the purposes of making this diff smaller, I
// haven't removed it yet.
this._n1CloudConn = new DeltaStreamingConnection(account)

this._state = { deltaCursors: {}, deltaStatus: {} }
this.retryDelay = BASE_RETRY_DELAY;
this._writeStateDebounced = _.debounce(this._writeState, 100)
Expand Down Expand Up @@ -77,6 +84,7 @@ export default class AccountDeltaConnection {

start = () => {
try {
this._n1CloudConn.start()
this._refreshingCaches.map(c => c.start());
_.map(this._deltaStreams, s => s.start())
} catch (err) {
Expand All @@ -92,11 +100,7 @@ export default class AccountDeltaConnection {

_setupDeltaStreams = (account) => {
const localSync = new DeltaStreamingInMemoryConnection(account.id, this._deltaStreamOpts("localSync"));

const n1Cloud = new DeltaStreamingConnection(N1CloudAPI,
account.id, this._deltaStreamOpts("n1Cloud"));

return {localSync, n1Cloud};
return {localSync};
}

_deltaStreamOpts = (streamName) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,48 +1,179 @@
import _ from 'underscore'
import {NylasLongConnection, DatabaseStore} from 'nylas-exports'

class DeltaStreamingConnection extends NylasLongConnection {
constructor(api, accountId, opts = {}) {
// TODO FYI this whole class is changing in an upcoming diff
opts.api = api
opts.accountId = accountId
opts.throttleResultsInterval = 1000
opts.closeIfDataStopsInterval = 15 * 1000

// Update cursor when deltas received
opts.onResuls = (deltas = []) => {
if (opts.onDeltas) opts.onDeltas(deltas, {source: "n1Cloud"});
const last = _.last(deltas);
if (last && last.cursor) {
this._setCursor(last.cursor)
import {ExponentialBackoffScheduler} from 'isomorphic-core'
import {
Actions,
Account,
APIError,
N1CloudAPI,
DatabaseStore,
OnlineStatusStore,
NylasLongConnection,
} from 'nylas-exports';
import DeltaProcessor from './delta-processor'


const MAX_RETRY_DELAY = 5 * 60 * 1000; // 5 minutes
const BASE_RETRY_DELAY = 1000;

class DeltaStreamingConnection {
constructor(account) {
this._account = account
this._state = {cursor: null, status: null}
this._longConnection = null
this._writeStateDebounced = _.debounce(this._writeState, 100)
this._unsubscribers = []
this._backoffScheduler = new ExponentialBackoffScheduler({
baseDelay: BASE_RETRY_DELAY,
maxDelay: MAX_RETRY_DELAY,
})

this._setupListeners()
NylasEnv.onBeforeUnload = (readyToUnload) => {
this._writeState().finally(readyToUnload)
}
}

start() {
try {
const {cursor = 0} = this._state
this._longConnection = new NylasLongConnection({
api: N1CloudAPI,
accountId: this._account.id,
path: `/delta/streaming?cursor=${cursor}`,
throttleResultsInterval: 1000,
closeIfDataStopsInterval: 15 * 1000,
onError: this._onError,
onResults: this._onResults,
onStatusChanged: this._onStatusChanged,
})
this._longConnection.start()
} catch (err) {
this._onError(err)
}
}

restart() {
try {
this._restarting = true
this.close();
this._disposeListeners()
this._setupListeners()
this.start();
} finally {
this._restarting = false
}
}

close() {
this._disposeListeners()
this._longConnection.close()
}

end() {
this._disposeListeners()
this._longConnection.end()
}

async loadStateFromDatabase() {
let json = await DatabaseStore.findJSONBlob(`DeltaStreamingConnectionStatus:${this._account.id}`)

if (!json) {
// Migrate from old storage key
const oldState = await DatabaseStore.findJSONBlob(`NylasSyncWorker:${this._account.id}`)
if (!oldState) { return; }
const {deltaCursors = {}, deltaStatus = {}} = oldState
json = {
cursor: deltaCursors.n1Cloud || null,
status: deltaStatus.n1Cloud || null,
}
}
super(opts)

this._onError = opts.onError || (() => {})
if (!json) { return }
this._state = json;
}

const {getCursor, setCursor} = opts
this._getCursor = getCursor
this._setCursor = setCursor
_setupListeners() {
this._unsubscribers = [
Actions.retryDeltaConnection.listen(this.restart, this),
OnlineStatusStore.listen(this._onOnlineStatusChanged, this),
]
}

_deltaStreamingPath(cursor) {
return `/delta/streaming?cursor=${cursor}`
_disposeListeners() {
this._unsubscribers.forEach(usub => usub())
this._unsubscribers = []
}

onError(err = {}) {
_writeState() {
return DatabaseStore.inTransaction(t =>
t.persistJSONBlob(`DeltaStreamingConnectionStatus:${this._account.id}`, this._state)
);
}

_setCursor = (cursor) => {
this._state.cursor = cursor;
this._writeStateDebounced();
}

_onOnlineStatusChanged = () => {
if (OnlineStatusStore.isOnline()) {
this.restart()
}
}

_onStatusChanged = (status) => {
if (this._restarting) { return; }
this._state.status = status;
this._writeStateDebounced();
const {Closed, Connected} = NylasLongConnection.Status
if (status === Connected) {
this._backoffScheduler.reset()
}
if (status === Closed) {
setTimeout(() => this.restart(), this._backoffScheduler.nextDelay());
}
}

_onResults = (deltas = []) => {
this._backoffScheduler.reset()

const last = _.last(deltas);
if (last && last.cursor) {
this._setCursor(last.cursor)
}
DeltaProcessor.process(deltas, {source: 'n1Cloud'})
}

_onError = (err = {}) => {
if (err.message && err.message.includes('Invalid cursor')) {
const error = new Error('Delta Connection: Cursor is invalid. Need to blow away local cache.');
// TODO is this still necessary?
const error = new Error('DeltaStreamingConnection: Cursor is invalid. Need to blow away local cache.');
NylasEnv.reportError(error)
this._setCursor(0)
DatabaseStore._handleSetupError(error)
return
}
this._onError(err)
}

start() {
this._path = this._deltaStreamingPath(this._getCursor() || 0)
super.start()
if (err instanceof APIError && err.statusCode === 401) {
Actions.updateAccount(this._account.id, {
syncState: Account.SYNC_STATE_AUTH_FAILED,
syncError: err.toJSON(),
})
}

err.message = `Error connecting to delta stream: ${err.message}`
const ignorableStatusCodes = [
0, // When errors like ETIMEDOUT, ECONNABORTED or ESOCKETTIMEDOUT occur from the client
404, // Don't report not-founds
408, // Timeout error code
429, // Too many requests
]
if (!ignorableStatusCodes.includes(err.statusCode)) {
NylasEnv.reportError(err)
}
this.close()

setTimeout(() => this.restart(), this._backoffScheduler.nextDelay());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ _ = require 'underscore'
DeltaStreamingConnection = require('../lib/delta-streaming-connection').default
AccountDeltaConnection = require('../lib/account-delta-connection').default

# TODO these are badly out of date, we need to rewrite them
xdescribe "AccountDeltaConnection", ->
beforeEach ->
@apiRequests = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class DeveloperBarStore extends NylasStore
_onDeltaConnectionStatusChanged: ->
@_longPollStates = {}
_.forEach DeltaConnectionStatusStore.getDeltaConnectionStates(), (state, accountId) =>
@_longPollStates[accountId] = state.deltaStatus
@_longPollStates[accountId] = state.status
@trigger()

_onLongPollDeltas: (deltas) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class DeveloperBar extends React.Component
</div>

_renderDeltaStates: =>
_.map @state.longPollStates, (deltaStatus, accountId) =>
_.map @state.longPollStates, (status, accountId) =>
<div className="delta-state-wrap" key={accountId} >
<div title={"Account #{accountId} - Cloud State: #{deltaStatus?.n1Cloud}"} key={"#{accountId}-n1Cloud"} className={"activity-status-bubble state-" + deltaStatus?.n1Cloud}></div>
<div title={"Account #{accountId} - Cloud State: #{status}"} key={"#{accountId}-n1Cloud"} className={"activity-status-bubble state-" + status}></div>
</div>

_sectionContent: =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,8 @@ import DatabaseStore from './database-store'
* The sync state for any given account has the following shape:
*
* {
* deltaCursors: {
* localSync,
* n1Cloud,
* },
* deltaStatus: {
* localSync,
* n1Cloud,
* },
* cursor: 0,
* status: 'connected',
* }
*
*/
Expand All @@ -44,9 +38,13 @@ class DeltaConnectionStatusStore extends NylasStore {
_setupAccountSubscriptions(accountIds) {
accountIds.forEach((accountId) => {
if (this._accountSubscriptions.has(accountId)) { return; }
const query = DatabaseStore.findJSONBlob(`NylasSyncWorker:${accountId}`)
const query = DatabaseStore.findJSONBlob(`DeltaStreamingConnectionStatus:${accountId}`)
const sub = Rx.Observable.fromQuery(query)
.subscribe((json) => this._updateState(accountId, json))
.subscribe((json) => {
// We need to copy `json` otherwise the query observable will mutate
// the reference to that object
this._updateState(accountId, {...json})
})
this._accountSubscriptions.set(accountId, sub)
})
}
Expand Down

0 comments on commit 8bc2ec5

Please sign in to comment.