Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(app-shell, app-shell-odd): Refactor app to use unsubscribe flags #14633

Closed
wants to merge 14 commits into from
Closed
14 changes: 0 additions & 14 deletions app-shell-odd/src/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ import {
VALUE_UPDATED,
VIEW_PROTOCOL_SOURCE_FOLDER,
NOTIFY_SUBSCRIBE,
NOTIFY_UNSUBSCRIBE,
ROBOT_MASS_STORAGE_DEVICE_ADDED,
ROBOT_MASS_STORAGE_DEVICE_ENUMERATED,
ROBOT_MASS_STORAGE_DEVICE_REMOVED,
Expand Down Expand Up @@ -105,7 +104,6 @@ import type {
AppRestartAction,
NotifySubscribeAction,
NotifyTopic,
NotifyUnsubscribeAction,
ReloadUiAction,
RobotMassStorageDeviceAdded,
RobotMassStorageDeviceEnumerated,
Expand Down Expand Up @@ -428,18 +426,6 @@ export const notifySubscribeAction = (
meta: { shell: true },
})

export const notifyUnsubscribeAction = (
hostname: string,
topic: NotifyTopic
): NotifyUnsubscribeAction => ({
type: NOTIFY_UNSUBSCRIBE,
payload: {
hostname,
topic,
},
meta: { shell: true },
})

export function startDiscovery(
timeout: number | null = null
): StartDiscoveryAction {
Expand Down
2 changes: 0 additions & 2 deletions app-shell-odd/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,6 @@ export const ROBOT_MASS_STORAGE_DEVICE_ENUMERATED: 'shell:ROBOT_MASS_STORAGE_DEV
'shell:ROBOT_MASS_STORAGE_DEVICE_ENUMERATED'
export const NOTIFY_SUBSCRIBE: 'shell:NOTIFY_SUBSCRIBE' =
'shell:NOTIFY_SUBSCRIBE'
export const NOTIFY_UNSUBSCRIBE: 'shell:NOTIFY_UNSUBSCRIBE' =
'shell:NOTIFY_UNSUBSCRIBE'

// copy
// TODO(mc, 2020-05-11): i18n
Expand Down
172 changes: 70 additions & 102 deletions app-shell-odd/src/notify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,27 @@ import type { Action, Dispatch } from './types'
// TODO(jh, 2024-03-01): after refactoring notify connectivity and subscription logic, uncomment logs.

// Manages MQTT broker connections via a connection store, establishing a connection to the broker only if a connection does not
// already exist, and disconnects from the broker when the app is not subscribed to any topics for the given broker.
// A redundant connection to the same broker results in the older connection forcibly closing, which we want to avoid.
// already exist. A redundant connection to the same broker results in the older connection forcibly closing, which we want to avoid.
// However, redundant subscriptions are permitted and result in the broker sending the retained message for that topic.
// To mitigate redundant connections, the connection manager eagerly adds the host, removing the host if the connection fails.

const FAILURE_STATUSES = {
ECONNREFUSED: 'ECONNREFUSED',
ECONNFAILED: 'ECONNFAILED',
} as const

const LOCALHOST: '127.0.0.1' = '127.0.0.1'

interface ConnectionStore {
[hostname: string]: {
client: mqtt.MqttClient | null
subscriptions: Record<NotifyTopic, number>
subscriptions: Set<NotifyTopic>
pendingSubs: Set<NotifyTopic>
}
}

const connectionStore: ConnectionStore = {}
const unreachableHosts = new Set<string>()
const pendingUnsubs = new Set<NotifyTopic>()
const log = createLogger('notify')
// MQTT is somewhat particular about the clientId format and will connect erratically if an unexpected string is supplied.
// This clientId is derived from the mqttjs library.
Expand Down Expand Up @@ -66,14 +67,7 @@ export function registerNotify(
return subscribe({
...action.payload,
browserWindow: mainWindow,
hostname: '127.0.0.1',
})

case 'shell:NOTIFY_UNSUBSCRIBE':
return unsubscribe({
...action.payload,
browserWindow: mainWindow,
hostname: '127.0.0.1',
hostname: LOCALHOST,
})
}
}
Expand Down Expand Up @@ -103,8 +97,7 @@ function subscribe(notifyParams: NotifyParams): Promise<void> {
else if (connectionStore[hostname] == null) {
connectionStore[hostname] = {
client: null,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
subscriptions: { [topic]: 1 } as Record<NotifyTopic, number>,
subscriptions: new Set(),
pendingSubs: new Set(),
}
return connectAsync(`mqtt:https://${hostname}`)
Expand Down Expand Up @@ -149,27 +142,24 @@ function subscribe(notifyParams: NotifyParams): Promise<void> {
return waitUntilActiveOrErrored('client')
.then(() => {
const { client, subscriptions, pendingSubs } = connectionStore[hostname]
const activeClient = client as mqtt.Client
const isNotActiveSubscription = (subscriptions[topic] ?? 0) <= 0
if (!pendingSubs.has(topic) && isNotActiveSubscription) {
pendingSubs.add(topic)
return new Promise<void>(() => {
activeClient.subscribe(topic, subscribeOptions, subscribeCb)
pendingSubs.delete(topic)
})
} else {
void waitUntilActiveOrErrored('subscription')
.then(() => {
subscriptions[topic] += 1
const activeClient = client as mqtt.MqttClient
if (!subscriptions.has(topic)) {
if (!pendingSubs.has(topic)) {
pendingSubs.add(topic)
return new Promise<void>(() => {
activeClient.subscribe(topic, subscribeOptions, subscribeCb)
pendingSubs.delete(topic)
})
.catch(() => {
} else {
void waitUntilActiveOrErrored('subscription').catch(() => {
sendToBrowserDeserialized({
browserWindow,
hostname,
topic,
message: FAILURE_STATUSES.ECONNFAILED,
})
})
}
}
})
.catch(() => {
Expand All @@ -191,18 +181,9 @@ function subscribe(notifyParams: NotifyParams): Promise<void> {
topic,
message: FAILURE_STATUSES.ECONNFAILED,
})
setTimeout(() => {
if (Object.keys(connectionStore[hostname].subscriptions).length <= 0) {
connectionStore[hostname].client?.end()
}
}, RENDER_TIMEOUT)
} else {
// log.info(`Successfully subscribed on ${hostname} to topic: ${topic}`)
if (subscriptions[topic] > 0) {
subscriptions[topic] += 1
} else {
subscriptions[topic] = 1
}
subscriptions.add(topic)
}
}

Expand All @@ -218,7 +199,7 @@ function subscribe(notifyParams: NotifyParams): Promise<void> {
const hasReceivedAck =
connection === 'client'
? host?.client != null
: host?.subscriptions[topic] > 0
: host?.subscriptions.has(topic)
if (hasReceivedAck) {
clearInterval(intervalId)
resolve()
Expand All @@ -235,43 +216,42 @@ function subscribe(notifyParams: NotifyParams): Promise<void> {
}
}

// Because subscription logic is directly tied to the component lifecycle, it is possible
// for a component to trigger an unsubscribe event on dismount while a new component mounts and
// triggers a subscribe event. For the connection store and MQTT to reflect correct topic subscriptions,
// do not unsubscribe and close connections before newly mounted components have had time to update the connection store.
const RENDER_TIMEOUT = 10000 // 10 seconds
function checkForUnsubscribeFlag(
deserializedMessage: string | Record<string, unknown>,
hostname: string,
topic: NotifyTopic
): void {
const messageContainsUnsubFlag =
typeof deserializedMessage !== 'string' &&
'unsubscribe' in deserializedMessage

if (messageContainsUnsubFlag) void unsubscribe(hostname, topic)
}

function unsubscribe(notifyParams: NotifyParams): Promise<void> {
const { hostname, topic } = notifyParams
function unsubscribe(hostname: string, topic: NotifyTopic): Promise<void> {
return new Promise<void>(() => {
setTimeout(() => {
if (hostname in connectionStore) {
const { client } = connectionStore[hostname]
const subscriptions = connectionStore[hostname]?.subscriptions
const isLastSubscription = subscriptions[topic] <= 1

if (isLastSubscription) {
client?.unsubscribe(topic, {}, (error, result) => {
if (error != null) {
// log.warn(
// `Failed to unsubscribe on ${hostname} from topic: ${topic}`
// )
} else {
// log.info(
// `Successfully unsubscribed on ${hostname} from topic: ${topic}`
// )
handleDecrementSubscriptionCount(hostname, topic)
}
})
if (hostname in connectionStore && !pendingUnsubs.has(topic)) {
pendingUnsubs.add(topic)
const { client } = connectionStore[hostname]
client?.unsubscribe(topic, {}, (error, result) => {
if (error != null) {
// log.warn(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might want this back but definitely don't want it commented - let's uncomment or delete

// `Failed to unsubscribe on ${hostname} from topic: ${topic}`
// )
} else {
subscriptions[topic] -= 1
log.debug(
`Successfully unsubscribed on ${hostname} from topic: ${topic}`
)
const { subscriptions } = connectionStore[hostname]
subscriptions.delete(topic)
pendingUnsubs.delete(topic)
}
} else {
// log.info(
// `Attempted to unsubscribe from unconnected hostname: ${hostname}`
// )
}
}, RENDER_TIMEOUT)
})
} else {
// log.info(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's uncomment or delete

// `Attempted to unsubscribe from unconnected hostname: ${hostname}`
// )
}
})
}

Expand Down Expand Up @@ -310,26 +290,6 @@ function connectAsync(brokerURL: string): Promise<mqtt.Client> {
})
}

function handleDecrementSubscriptionCount(
hostname: string,
topic: NotifyTopic
): void {
const host = connectionStore[hostname]
if (host) {
const { client, subscriptions } = host
if (topic in subscriptions) {
subscriptions[topic] -= 1
if (subscriptions[topic] <= 0) {
delete subscriptions[topic]
}
}

if (Object.keys(subscriptions).length <= 0) {
client?.end()
}
}
}

interface ListenerParams {
client: mqtt.MqttClient
browserWindow: BrowserWindow
Expand All @@ -344,12 +304,15 @@ function establishListeners({
client.on(
'message',
(topic: NotifyTopic, message: Buffer, packet: mqtt.IPublishPacket) => {
sendToBrowserDeserialized({
browserWindow,
const deserializedMessage = deserialize(message.toString())
checkForUnsubscribeFlag(deserializedMessage, LOCALHOST, topic)

browserWindow.webContents.send(
'notify',
hostname,
topic,
message: message.toString(),
})
deserializedMessage
)
}
)

Expand Down Expand Up @@ -419,13 +382,7 @@ function sendToBrowserDeserialized({
topic,
message,
}: SendToBrowserParams): void {
let deserializedMessage: string | Object

try {
deserializedMessage = JSON.parse(message)
} catch {
deserializedMessage = message
}
const deserializedMessage = deserialize(message)

// log.info('Received notification data from main via IPC', {
// hostname,
Expand All @@ -434,3 +391,14 @@ function sendToBrowserDeserialized({

browserWindow.webContents.send('notify', hostname, topic, deserializedMessage)
}

function deserialize(message: string): string | Record<string, unknown> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a case where this will be (a) valid and (b) not json? if so can we remove that case, and make this be a Promise<Record<string, unknown>> or even better a Promise<NotifyMessage> with a small list of valid models that rejects if it doesn't parse?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point

let deserializedMessage: string | Record<string, unknown>

try {
deserializedMessage = JSON.parse(message)
} catch {
deserializedMessage = message
}
return deserializedMessage
}
14 changes: 0 additions & 14 deletions app-shell/src/config/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ import {
VALUE_UPDATED,
VIEW_PROTOCOL_SOURCE_FOLDER,
NOTIFY_SUBSCRIBE,
NOTIFY_UNSUBSCRIBE,
ROBOT_MASS_STORAGE_DEVICE_ADDED,
ROBOT_MASS_STORAGE_DEVICE_ENUMERATED,
ROBOT_MASS_STORAGE_DEVICE_REMOVED,
Expand All @@ -99,7 +98,6 @@ import type {
AppRestartAction,
NotifySubscribeAction,
NotifyTopic,
NotifyUnsubscribeAction,
ReloadUiAction,
RobotMassStorageDeviceAdded,
RobotMassStorageDeviceEnumerated,
Expand Down Expand Up @@ -421,15 +419,3 @@ export const notifySubscribeAction = (
},
meta: { shell: true },
})

export const notifyUnsubscribeAction = (
hostname: string,
topic: NotifyTopic
): NotifyUnsubscribeAction => ({
type: NOTIFY_UNSUBSCRIBE,
payload: {
hostname,
topic,
},
meta: { shell: true },
})
2 changes: 0 additions & 2 deletions app-shell/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,6 @@ export const ROBOT_MASS_STORAGE_DEVICE_ENUMERATED: 'shell:ROBOT_MASS_STORAGE_DEV
'shell:ROBOT_MASS_STORAGE_DEVICE_ENUMERATED'
export const NOTIFY_SUBSCRIBE: 'shell:NOTIFY_SUBSCRIBE' =
'shell:NOTIFY_SUBSCRIBE'
export const NOTIFY_UNSUBSCRIBE: 'shell:NOTIFY_UNSUBSCRIBE' =
'shell:NOTIFY_UNSUBSCRIBE'

// copy
// TODO(mc, 2020-05-11): i18n
Expand Down
Loading
Loading