Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(app-shell, app-shell-odd): Refactor app to use unsubscribe flags #14633

Closed
wants to merge 14 commits into from
Next Next commit
fix(app-shell): add fallback logic if broker disconnects during activ…
…e subscription
  • Loading branch information
mjhuff committed Mar 4, 2024
commit 356539660346be4d0228f5328ca07cde8f4bc805
84 changes: 56 additions & 28 deletions app-shell/src/notify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ import type { Action, Dispatch } from './types'
// TODO(jh, 2024-03-01): after refactoring notify connectivity and subscription logic, uncomment logs.

// Manages MQTT broker connections via a connection store, establishing a connection to the broker only if a connection does not
// already exist, and disconnects from the broker when the app is not subscribed to any topics for the given broker.
// A redundant connection to the same broker results in the older connection forcibly closing, which we want to avoid.
// already exist. A redundant connection to the same broker results in the older connection forcibly closing, which we want to avoid.
// However, redundant subscriptions are permitted and result in the broker sending the retained message for that topic.
// To mitigate redundant connections, the connection manager eagerly adds the host, removing the host if the connection fails.

const FAILURE_STATUSES = {
ECONNREFUSED: 'ECONNREFUSED',
Expand Down Expand Up @@ -80,7 +78,7 @@ interface NotifyParams {
topic: NotifyTopic
}

function subscribe(notifyParams: NotifyParams): Promise<void> | void {
function subscribe(notifyParams: NotifyParams): Promise<void> {
const { hostname, topic, browserWindow } = notifyParams
if (unreachableHosts.has(hostname)) {
sendToBrowserDeserialized({
Expand All @@ -89,6 +87,7 @@ function subscribe(notifyParams: NotifyParams): Promise<void> | void {
topic,
message: FAILURE_STATUSES.ECONNFAILED,
})
return Promise.resolve()
}
// true if no subscription (and therefore connection) to host exists
else if (connectionStore[hostname] == null) {
Expand Down Expand Up @@ -136,21 +135,37 @@ function subscribe(notifyParams: NotifyParams): Promise<void> | void {
}
// true if the connection store has an entry for the hostname.
else {
return waitUntilActiveOrErrored('client').then(() => {
const { client, subscriptions, pendingSubs } = connectionStore[hostname]
const activeClient = client as mqtt.MqttClient
if (!subscriptions.has(topic)) {
if (!pendingSubs.has(topic)) {
pendingSubs.add(topic)
return new Promise<void>(() => {
activeClient.subscribe(topic, subscribeOptions, subscribeCb)
pendingSubs.delete(topic)
})
} else {
void waitUntilActiveOrErrored('subscription')
return waitUntilActiveOrErrored('client')
.then(() => {
const { client, subscriptions, pendingSubs } = connectionStore[hostname]
const activeClient = client as mqtt.MqttClient
if (!subscriptions.has(topic)) {
if (!pendingSubs.has(topic)) {
pendingSubs.add(topic)
return new Promise<void>(() => {
activeClient.subscribe(topic, subscribeOptions, subscribeCb)
pendingSubs.delete(topic)
})
} else {
void waitUntilActiveOrErrored('subscription').catch(() => {
sendToBrowserDeserialized({
browserWindow,
hostname,
topic,
message: FAILURE_STATUSES.ECONNFAILED,
})
})
}
}
}
})
})
.catch(() => {
sendToBrowserDeserialized({
browserWindow,
hostname,
topic,
message: FAILURE_STATUSES.ECONNFAILED,
})
})
}
function subscribeCb(error: Error, result: mqtt.ISubscriptionGrant[]): void {
const { subscriptions } = connectionStore[hostname]
Expand Down Expand Up @@ -201,6 +216,19 @@ function subscribe(notifyParams: NotifyParams): Promise<void> | void {
})
}
}

function checkForUnsubscribeFlag(
deserializedMessage: string | Record<string, unknown>,
hostname: string,
topic: NotifyTopic
): void {
const messageContainsUnsubFlag =
typeof deserializedMessage !== 'string' &&
'unsubscribe' in deserializedMessage

if (messageContainsUnsubFlag) void unsubscribe(hostname, topic)
}

function unsubscribe(hostname: string, topic: NotifyTopic): Promise<void> {
return new Promise<void>(() => {
if (hostname in connectionStore && !pendingUnsubs.has(topic)) {
Expand Down Expand Up @@ -266,25 +294,19 @@ function connectAsync(brokerURL: string): Promise<mqtt.Client> {
interface ListenerParams {
client: mqtt.MqttClient
browserWindow: BrowserWindow
topic: NotifyTopic
hostname: string
}

function establishListeners({
client,
browserWindow,
hostname,
topic,
}: ListenerParams): void {
client.on(
'message',
(topic: NotifyTopic, message: Buffer, packet: mqtt.IPublishPacket) => {
const deserializedMessage = deserialize(message.toString())
const messageContainsUnsubFlag =
typeof deserializedMessage !== 'string' &&
'unsubscribe' in deserializedMessage

if (messageContainsUnsubFlag) void unsubscribe(hostname, topic)
checkForUnsubscribeFlag(deserializedMessage, hostname, topic)

browserWindow.webContents.send(
'notify',
Expand All @@ -304,7 +326,7 @@ function establishListeners({
sendToBrowserDeserialized({
browserWindow,
hostname,
topic,
topic: 'ALL_TOPICS',
message: FAILURE_STATUSES.ECONNFAILED,
})
client.end()
Expand All @@ -315,13 +337,19 @@ function establishListeners({
if (hostname in connectionStore) delete connectionStore[hostname]
})

client.on('disconnect', packet =>
client.on('disconnect', packet => {
log.warn(
`Disconnected from ${hostname} with code ${
packet.reasonCode ?? 'undefined'
}`
)
)
sendToBrowserDeserialized({
browserWindow,
hostname,
topic: 'ALL_TOPICS',
message: FAILURE_STATUSES.ECONNFAILED,
})
})
}

export function closeAllNotifyConnections(): Promise<unknown[]> {
Expand Down