Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ping pong heartbeat for signal connection #377

Merged
merged 10 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fresh-terms-train.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Add ping pong heartbeat for signal connection
89 changes: 83 additions & 6 deletions src/api/SignalClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ export class SignalClient {

ws?: WebSocket;

private pingTimeout: ReturnType<typeof setTimeout> | undefined;

private pingTimeoutDuration: number | undefined;

private pingIntervalDuration: number | undefined;

private pingInterval: ReturnType<typeof setInterval> | undefined;

constructor(useJSON: boolean = false) {
this.isConnected = false;
this.isReconnecting = false;
Expand Down Expand Up @@ -155,6 +163,8 @@ export class SignalClient {

async reconnect(url: string, token: string): Promise<void> {
this.isReconnecting = true;
// clear ping interval and restart it once reconnected
this.clearPingInterval();
await this.connect(url, token, {
reconnect: true,
});
Expand Down Expand Up @@ -215,29 +225,41 @@ export class SignalClient {
if (opts.reconnect) {
// upon reconnection, there will not be additional handshake
this.isConnected = true;
// restart ping interval as it's cleared for reconnection
this.startPingInterval();
resolve();
}
};

ws.onmessage = async (ev: MessageEvent) => {
// not considered connected until JoinResponse is received
let msg: SignalResponse;
let resp: SignalResponse;
if (typeof ev.data === 'string') {
const json = JSON.parse(ev.data);
msg = SignalResponse.fromJSON(json);
resp = SignalResponse.fromJSON(json);
} else if (ev.data instanceof ArrayBuffer) {
msg = SignalResponse.decode(new Uint8Array(ev.data));
resp = SignalResponse.decode(new Uint8Array(ev.data));
} else {
log.error(`could not decode websocket message: ${typeof ev.data}`);
return;
}

if (!this.isConnected) {
// handle join message only
if (msg.message?.$case === 'join') {
if (resp.message?.$case === 'join') {
this.isConnected = true;
abortSignal?.removeEventListener('abort', abortHandler);
resolve(msg.message.join);
this.pingTimeoutDuration = resp.message.join.pingTimeout;
this.pingIntervalDuration = resp.message.join.pingInterval;

if (this.pingTimeoutDuration && this.pingTimeoutDuration > 0) {
log.debug('ping config', {
timeout: this.pingTimeoutDuration,
interval: this.pingIntervalDuration,
});
this.startPingInterval();
}
resolve(resp.message.join);
} else {
reject(new ConnectionError('did not receive join response'));
}
Expand All @@ -247,7 +269,7 @@ export class SignalClient {
if (this.signalLatency) {
await sleep(this.signalLatency);
}
this.handleSignalResponse(msg);
this.handleSignalResponse(resp);
};

ws.onclose = (ev: CloseEvent) => {
Expand All @@ -268,6 +290,7 @@ export class SignalClient {
if (this.ws) this.ws.onclose = null;
this.ws?.close();
this.ws = undefined;
this.clearPingInterval();
}

// initial offer after joining
Expand Down Expand Up @@ -364,6 +387,13 @@ export class SignalClient {
});
}

sendPing() {
this.sendRequest({
$case: 'ping',
ping: Date.now(),
});
}

async sendLeave() {
await this.sendRequest({
$case: 'leave',
Expand Down Expand Up @@ -475,6 +505,8 @@ export class SignalClient {
if (this.onLocalTrackUnpublished) {
this.onLocalTrackUnpublished(msg.trackUnpublished);
}
} else if (msg.$case === 'pong') {
this.resetPingTimeout();
} else {
log.debug('unsupported message', msg);
}
Expand All @@ -493,6 +525,51 @@ export class SignalClient {
private handleWSError(ev: Event) {
log.error('websocket error', ev);
}

private resetPingTimeout() {
this.clearPingTimeout();
if (!this.pingTimeoutDuration) {
log.warn('ping timeout duration not set');
return;
}
this.pingTimeout = setTimeout(() => {
log.warn(
`ping timeout triggered. last pong received at: ${new Date(
Date.now() - this.pingTimeoutDuration! * 1000,
).toUTCString()}`,
);
if (this.onClose) {
this.onClose('ping timeout');
}
}, this.pingTimeoutDuration * 1000);
}

private clearPingTimeout() {
if (this.pingTimeout) {
clearTimeout(this.pingTimeout);
}
}

private startPingInterval() {
this.clearPingInterval();
this.resetPingTimeout();
if (!this.pingIntervalDuration) {
log.warn('ping interval duration not set');
return;
}
log.debug('start ping interval');
this.pingInterval = setInterval(() => {
this.sendPing();
}, this.pingIntervalDuration * 1000);
}

private clearPingInterval() {
log.debug('clearing ping interval');
this.clearPingTimeout();
if (this.pingInterval) {
clearInterval(this.pingInterval);
}
}
}

function fromProtoSessionDescription(sd: SessionDescription): RTCSessionDescriptionInit {
Expand Down
69 changes: 69 additions & 0 deletions src/proto/livekit_models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ export enum DisconnectReason {
PARTICIPANT_REMOVED = 4,
ROOM_DELETED = 5,
STATE_MISMATCH = 6,
JOIN_FAILURE = 7,
UNRECOGNIZED = -1,
}

Expand All @@ -252,6 +253,9 @@ export function disconnectReasonFromJSON(object: any): DisconnectReason {
case 6:
case 'STATE_MISMATCH':
return DisconnectReason.STATE_MISMATCH;
case 7:
case 'JOIN_FAILURE':
return DisconnectReason.JOIN_FAILURE;
case -1:
case 'UNRECOGNIZED':
default:
Expand All @@ -275,6 +279,8 @@ export function disconnectReasonToJSON(object: DisconnectReason): string {
return 'ROOM_DELETED';
case DisconnectReason.STATE_MISMATCH:
return 'STATE_MISMATCH';
case DisconnectReason.JOIN_FAILURE:
return 'JOIN_FAILURE';
case DisconnectReason.UNRECOGNIZED:
default:
return 'UNRECOGNIZED';
Expand Down Expand Up @@ -627,6 +633,11 @@ export interface RTPStats_GapHistogramEntry {
value: number;
}

export interface TimedVersion {
unixMicro: number;
ticks: number;
}

function createBaseRoom(): Room {
return {
sid: '',
Expand Down Expand Up @@ -2654,6 +2665,64 @@ export const RTPStats_GapHistogramEntry = {
},
};

function createBaseTimedVersion(): TimedVersion {
return { unixMicro: 0, ticks: 0 };
}

export const TimedVersion = {
encode(message: TimedVersion, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.unixMicro !== 0) {
writer.uint32(8).int64(message.unixMicro);
}
if (message.ticks !== 0) {
writer.uint32(16).int32(message.ticks);
}
return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): TimedVersion {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseTimedVersion();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
message.unixMicro = longToNumber(reader.int64() as Long);
break;
case 2:
message.ticks = reader.int32();
break;
default:
reader.skipType(tag & 7);
break;
}
}
return message;
},

fromJSON(object: any): TimedVersion {
return {
unixMicro: isSet(object.unixMicro) ? Number(object.unixMicro) : 0,
ticks: isSet(object.ticks) ? Number(object.ticks) : 0,
};
},

toJSON(message: TimedVersion): unknown {
const obj: any = {};
message.unixMicro !== undefined && (obj.unixMicro = Math.round(message.unixMicro));
message.ticks !== undefined && (obj.ticks = Math.round(message.ticks));
return obj;
},

fromPartial<I extends Exact<DeepPartial<TimedVersion>, I>>(object: I): TimedVersion {
const message = createBaseTimedVersion();
message.unixMicro = object.unixMicro ?? 0;
message.ticks = object.ticks ?? 0;
return message;
},
};

declare var self: any | undefined;
declare var window: any | undefined;
declare var global: any | undefined;
Expand Down
Loading