Skip to content

Commit

Permalink
Add quality of service option to subscriptions
Browse files Browse the repository at this point in the history
Fixes #323, #14, #334
Fixes #132
  • Loading branch information
thomasnordquist committed Apr 20, 2020
1 parent e87a711 commit b72fc48
Show file tree
Hide file tree
Showing 19 changed files with 901 additions and 212 deletions.
6 changes: 5 additions & 1 deletion app/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
"scripts": {
"build": "yarn rebuild && webpack --mode production",
"dev": "node_modules/.bin/webpack-dev-server --mode development --progress",
"rebuild": "cd node_modules/heapdump && node-gyp rebuild --target=7.1.1 --arch=x64 --dist-url=https://atom.io/download/electron || echo Could not build heapdump; cd -"
"rebuild": "cd node_modules/heapdump && node-gyp rebuild --target=7.1.1 --arch=x64 --dist-url=https://atom.io/download/electron || echo Could not build heapdump; cd -",
"test": "TS_NODE_PROJECT=test/tsconfig.json yarn mochatest",
"mochatest": "mocha --require ts-node/register src/**/*.spec.ts"
},
"author": "",
"license": "CC-BY-ND-4.0",
Expand Down Expand Up @@ -68,10 +70,12 @@
"@types/uuid": "^7.0.2",
"@types/vis": "^4.21.9",
"awesome-typescript-loader": "^5.2.1",
"chai": "^4.2.0",
"css-loader": "^3.0.0",
"hard-source-webpack-plugin": "^0.13.1",
"heapdump": "^0.3.12",
"html-webpack-plugin": "^4.0.0-beta.5",
"mocha": "^7.1.1",
"node-loader": "^0.6.0",
"source-map-loader": "^0.2.4",
"style-loader": "^1",
Expand Down
42 changes: 11 additions & 31 deletions app/src/actions/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import { showError } from './Global'
import { remote } from 'electron'
import { promises as fsPromise } from 'fs'
import * as path from 'path'

import { ActionTypes, Action } from '../reducers/ConnectionManager'
import { Subscription } from '../../../backend/src/DataSource/MqttSource'
import { connectionsMigrator } from './migrations/Connection'

interface ConnectionDictionary {
export interface ConnectionDictionary {
[s: string]: ConnectionOptions
}
const storedConnectionsIdentifier: StorageIdentifier<ConnectionDictionary> = {
Expand All @@ -27,6 +28,12 @@ export const loadConnectionSettings = () => async (dispatch: Dispatch<any>, getS
try {
await ensureConnectionsHaveBeenInitialized()
connections = await persistentStorage.load(storedConnectionsIdentifier)

// Apply migrations
if (connections && connectionsMigrator.isMigrationNecessary(connections)) {
connections = connectionsMigrator.applyMigrations(connections)
await persistentStorage.store(storedConnectionsIdentifier, connections)
}
} catch (error) {
dispatch(showError(error))
}
Expand Down Expand Up @@ -101,13 +108,13 @@ export const updateConnection = (connectionId: string, changeSet: Partial<Connec
type: ActionTypes.CONNECTION_MANAGER_UPDATE_CONNECTION,
})

export const addSubscription = (subscription: string, connectionId: string): Action => ({
export const addSubscription = (subscription: Subscription, connectionId: string): Action => ({
connectionId,
subscription,
type: ActionTypes.CONNECTION_MANAGER_ADD_SUBSCRIPTION,
})

export const deleteSubscription = (subscription: string, connectionId: string): Action => ({
export const deleteSubscription = (subscription: Subscription, connectionId: string): Action => ({
connectionId,
subscription,
type: ActionTypes.CONNECTION_MANAGER_DELETE_SUBSCRIPTION,
Expand Down Expand Up @@ -174,31 +181,4 @@ async function ensureConnectionsHaveBeenInitialized() {

clearLegacyConnectionOptions()
}

// Migrate connections, rewrite dictionary to "keep" it "ordered" (dictionaries do not have a guaranteed order)
const mayNeedMigrations = connections && connections['iot.eclipse.org']
if (connections && mayNeedMigrations) {
const newConnections = {}
for (const connection of Object.values(connections)) {
addMigratedConnection(newConnections, connection)
}

await persistentStorage.store(storedConnectionsIdentifier, newConnections)
}
}

function addMigratedConnection(newConnections: { [key: string]: ConnectionOptions }, connection: ConnectionOptions) {
// The host has been renamed, only change the host if it has not been changed
// Also check for ssl since SSL is not yet working
if (
connection.id === 'iot.eclipse.org' &&
connection.host === 'iot.eclipse.org' &&
connection.port === 1883 &&
!connection.encryption
) {
connection.id = 'mqtt.eclipse.org'
connection.host = 'mqtt.eclipse.org'
connection.name = 'mqtt.eclipse.org'
}
newConnections[connection.id] = connection
}
94 changes: 94 additions & 0 deletions app/src/actions/migrations/Connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { ConfigMigrator, Migration } from '../../utils/ConfigMigrator'
import { ConnectionDictionary } from '../ConnectionManager'
import { ConnectionOptions } from '../../model/ConnectionOptions'

export interface ConnectionOptionsV0 {
type: 'mqtt'
id: string
host: string
protocol: 'mqtt' | 'ws'
basePath?: string
port: number
name: string
username?: string
password?: string
encryption: boolean
certValidation: boolean
// selfSignedCertificate?: CertificateParameters
// clientCertificate?: CertificateParameters
// clientKey?: CertificateParameters
clientId?: string
subscriptions: Array<string>
}

let migrations: Migration[] = [
// iot.eclipse.org ha moved to mqtt.eclipse.org
{
from: undefined,
apply: (connection: ConnectionOptionsV0): ConnectionOptionsV0 => {
if (connection.id == 'iot.eclipse.org' && connection.host == 'iot.eclipse.org' && connection.port == 1883) {
return {
...connection,
id: 'mqtt.eclipse.org',
host: 'mqtt.eclipse.org',
name: 'mqtt.eclipse.org',
}
}

return {
...connection,
}
},
},
// Remove stored clientId if it is the default generated client id. This allows to connect multiple instances of mqtt explorer to the same broker.
// A randomly generated clientId will be used if no clientId is set.
{
from: undefined,
apply: (connection: ConnectionOptionsV0): ConnectionOptionsV0 => {
if (connection.clientId && /mqtt-explorer-[0-9a-f]{8}/.test(connection.clientId)) {
return {
...connection,
clientId: undefined,
}
}

return {
...connection,
}
},
},
// Added QoS level to subscription options
{
from: undefined,
apply: (connection: ConnectionOptionsV0): ConnectionOptions => {
return {
...connection,
configVersion: 1,
subscriptions: connection.subscriptions.map(topic => ({ topic, qos: 0 })),
}
},
},
]

const connectionMigrator = new ConfigMigrator(migrations)

function isMigrationNecessary(connections: ConnectionDictionary): boolean {
return Object.values(connections)
.map(connection => connectionMigrator.isMigrationNecessary(connection))
.reduce((a, b) => a || b, false)
}

function applyMigrations(connections: ConnectionDictionary): ConnectionDictionary {
let newConnectionDictionary: ConnectionDictionary = {}
Object.keys(connections).forEach(key => {
let newConnection = connectionMigrator.applyMigrations(connections[key]) as any
newConnectionDictionary[newConnection.id] = newConnection
})

return newConnectionDictionary
}

export const connectionsMigrator = {
isMigrationNecessary,
applyMigrations,
}
Loading

0 comments on commit b72fc48

Please sign in to comment.