Skip to content

Commit

Permalink
Add connection check helper (#489)
Browse files Browse the repository at this point in the history
* wip

* streamline workflow

* add connection check helper

* cleanup

* move event emit

* emit id

* add info log

* return checkInfo for each check

* use abstract base class

* move connection helper

* changeset
  • Loading branch information
lukasIO committed Oct 27, 2022
1 parent 2d5335b commit efc2039
Show file tree
Hide file tree
Showing 10 changed files with 464 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .changeset/calm-shrimps-wonder.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Add ConnnectionCheck helper class
90 changes: 90 additions & 0 deletions src/connectionHelper/ConnectionCheck.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import EventEmitter from 'events';
import type TypedEmitter from 'typed-emitter';

import { Checker, CheckInfo, CheckStatus, InstantiableCheck } from './checks/Checker';
import { PublishAudioCheck } from './checks/publishAudio';
import { PublishVideoCheck } from './checks/publishVideo';
import { ReconnectCheck } from './checks/reconnect';
import { TURNCheck } from './checks/turn';
import { WebRTCCheck } from './checks/webrtc';
import { WebSocketCheck } from './checks/websocket';

export type { CheckInfo };

export class ConnectionCheck extends (EventEmitter as new () => TypedEmitter<ConnectionCheckCallbacks>) {
token: string;

url: string;

private checkResults: Map<number, CheckInfo> = new Map();

constructor(url: string, token: string) {
super();
this.url = url;
this.token = token;
}

private getNextCheckId() {
const nextId = this.checkResults.size;
this.checkResults.set(nextId, {
logs: [],
status: CheckStatus.IDLE,
name: '',
description: '',
});
return nextId;
}

private updateCheck(checkId: number, info: CheckInfo) {
this.checkResults.set(checkId, info);
this.emit('checkUpdate', checkId, info);
}

isSuccess() {
return Array.from(this.checkResults.values()).every((r) => r.status !== CheckStatus.FAILED);
}

getResults() {
return Array.from(this.checkResults.values());
}

async createAndRunCheck<T extends Checker>(check: InstantiableCheck<T>) {
const checkId = this.getNextCheckId();
const test = new check(this.url, this.token);
const handleUpdate = (info: CheckInfo) => {
this.updateCheck(checkId, info);
};
test.on('update', handleUpdate);
const result = await test.run();
test.off('update', handleUpdate);
return result;
}

async checkWebsocket() {
return this.createAndRunCheck(WebSocketCheck);
}

async checkWebRTC() {
return this.createAndRunCheck(WebRTCCheck);
}

async checkTURN() {
return this.createAndRunCheck(TURNCheck);
}

async checkReconnect() {
return this.createAndRunCheck(ReconnectCheck);
}

async checkPublishAudio() {
return this.createAndRunCheck(PublishAudioCheck);
}

async checkPublishVideo() {
return this.createAndRunCheck(PublishVideoCheck);
}
}

type ConnectionCheckCallbacks = {
checkUpdate: (id: number, info: CheckInfo) => void;
};
164 changes: 164 additions & 0 deletions src/connectionHelper/checks/Checker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import { EventEmitter } from 'events';
import type TypedEmitter from 'typed-emitter';
import type { RoomConnectOptions, RoomOptions } from '../../options';
import Room, { ConnectionState } from '../../room/Room';
import type RTCEngine from '../../room/RTCEngine';

type LogMessage = {
level: 'info' | 'warning' | 'error';
message: string;
};

export enum CheckStatus {
IDLE,
RUNNING,
SKIPPED,
SUCCESS,
FAILED,
}

export type CheckInfo = {
name: string;
logs: Array<LogMessage>;
status: CheckStatus;
description: string;
};

export interface CheckerOptions {
errorsAsWarnings?: boolean;
roomOptions?: RoomOptions;
connectOptions?: RoomConnectOptions;
}

export abstract class Checker extends (EventEmitter as new () => TypedEmitter<CheckerCallbacks>) {
protected url: string;

protected token: string;

room: Room;

connectOptions?: RoomConnectOptions;

status: CheckStatus = CheckStatus.IDLE;

logs: Array<LogMessage> = [];

errorsAsWarnings: boolean = false;

name: string;

constructor(url: string, token: string, options: CheckerOptions = {}) {
super();
this.url = url;
this.token = token;
this.name = this.constructor.name;
this.room = new Room(options.roomOptions);
this.connectOptions = options.connectOptions;
if (options.errorsAsWarnings) {
this.errorsAsWarnings = options.errorsAsWarnings;
}
}

abstract get description(): string;

protected abstract perform(): Promise<void>;

async run(onComplete?: () => void) {
if (this.status !== CheckStatus.IDLE) {
throw Error('check is running already');
}
this.setStatus(CheckStatus.RUNNING);
this.appendMessage(`${this.name} started.`);

try {
await this.perform();
} catch (err) {
if (err instanceof Error) {
if (this.errorsAsWarnings) {
this.appendWarning(err.message);
} else {
this.appendError(err.message);
}
}
}

await this.disconnect();

// sleep for a bit to ensure disconnect
await new Promise((resolve) => setTimeout(resolve, 500));

// @ts-ignore
if (this.status !== CheckStatus.SKIPPED) {
this.setStatus(this.isSuccess() ? CheckStatus.SUCCESS : CheckStatus.FAILED);
}

if (onComplete) {
onComplete();
}
return this.getInfo();
}

protected isSuccess(): boolean {
return !this.logs.some((l) => l.level === 'error');
}

protected async connect(): Promise<Room> {
if (this.room.state === ConnectionState.Connected) {
return this.room;
}
await this.room.connect(this.url, this.token);
return this.room;
}

protected async disconnect() {
if (this.room && this.room.state !== ConnectionState.Disconnected) {
await this.room.disconnect();
// wait for it to go through
await new Promise((resolve) => setTimeout(resolve, 500));
}
}

protected skip() {
this.setStatus(CheckStatus.SKIPPED);
}

protected appendMessage(message: string) {
this.logs.push({ level: 'info', message });
this.emit('update', this.getInfo());
}

protected appendWarning(message: string) {
this.logs.push({ level: 'warning', message });
this.emit('update', this.getInfo());
}

protected appendError(message: string) {
this.logs.push({ level: 'error', message });
this.emit('update', this.getInfo());
}

protected setStatus(status: CheckStatus) {
this.status = status;
this.emit('update', this.getInfo());
}

protected get engine(): RTCEngine | undefined {
return this.room?.engine;
}

getInfo(): CheckInfo {
return {
logs: this.logs,
name: this.name,
status: this.status,
description: this.description,
};
}
}
export type InstantiableCheck<T extends Checker> = {
new (url: string, token: string, options?: CheckerOptions): T;
};

type CheckerCallbacks = {
update: (info: CheckInfo) => void;
};
33 changes: 33 additions & 0 deletions src/connectionHelper/checks/publishAudio.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { createLocalAudioTrack } from '../../room/track/create';
import { Checker } from './Checker';

export class PublishAudioCheck extends Checker {
get description(): string {
return 'Can publish audio';
}

async perform(): Promise<void> {
const room = await this.connect();

const track = await createLocalAudioTrack();
room.localParticipant.publishTrack(track);
// wait for a few seconds to publish
await new Promise((resolve) => setTimeout(resolve, 3000));

// verify RTC stats that it's publishing
const stats = await track.sender?.getStats();
if (!stats) {
throw new Error('Could not get RTCStats');
}
let numPackets = 0;
stats.forEach((stat) => {
if (stat.type === 'outbound-rtp' && stat.mediaType === 'audio') {
numPackets = stat.packetsSent;
}
});
if (numPackets === 0) {
throw new Error('Could not determine packets are sent');
}
this.appendMessage(`published ${numPackets} audio packets`);
}
}
33 changes: 33 additions & 0 deletions src/connectionHelper/checks/publishVideo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { createLocalVideoTrack } from '../../room/track/create';
import { Checker } from './Checker';

export class PublishVideoCheck extends Checker {
get description(): string {
return 'Can publish video';
}

async perform(): Promise<void> {
const room = await this.connect();

const track = await createLocalVideoTrack();
room.localParticipant.publishTrack(track);
// wait for a few seconds to publish
await new Promise((resolve) => setTimeout(resolve, 3000));

// verify RTC stats that it's publishing
const stats = await track.sender?.getStats();
if (!stats) {
throw new Error('Could not get RTCStats');
}
let numPackets = 0;
stats.forEach((stat) => {
if (stat.type === 'outbound-rtp' && stat.mediaType === 'video') {
numPackets = stat.packetsSent;
}
});
if (numPackets === 0) {
throw new Error('Could not determine packets are sent');
}
this.appendMessage(`published ${numPackets} video packets`);
}
}
45 changes: 45 additions & 0 deletions src/connectionHelper/checks/reconnect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { RoomEvent } from '../../room/events';
import { ConnectionState } from '../../room/Room';
import { Checker } from './Checker';

export class ReconnectCheck extends Checker {
get description(): string {
return 'Resuming connection after interruption';
}

async perform(): Promise<void> {
const room = await this.connect();
let reconnectingTriggered = false;
let reconnected = false;

let reconnectResolver: (value: unknown) => void;
const reconnectTimeout = new Promise((resolve) => {
setTimeout(resolve, 5000);
reconnectResolver = resolve;
});

room
.on(RoomEvent.Reconnecting, () => {
reconnectingTriggered = true;
})
.on(RoomEvent.Reconnected, () => {
reconnected = true;
reconnectResolver(true);
});

room.engine.client.ws?.close();
const onClose = room.engine.client.onClose;
if (onClose) {
onClose('');
}

await reconnectTimeout;

if (!reconnectingTriggered) {
throw new Error('Did not attempt to reconnect');
} else if (!reconnected || room.state !== ConnectionState.Connected) {
this.appendWarning('reconnection is only possible in Redis-based configurations');
throw new Error('Not able to reconnect');
}
}
}
Loading

0 comments on commit efc2039

Please sign in to comment.