Skip to content

Commit

Permalink
Emit TrackUnpublished before TrackPublished (#541)
Browse files Browse the repository at this point in the history
* Emit TrackUnpublished before TrackUnpublished within the same update

* changeset

* fix changeset

* LocalParticipant.unpublishTrack to be return a promise

* add negotiation timeout, use pc event obj

* add negotiationStarted event

* fix changeset

Co-authored-by: David Zhao <[email protected]>
  • Loading branch information
lukasIO and davidzhao committed Dec 29, 2022
1 parent 260ad8b commit af223f4
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 28 deletions.
5 changes: 5 additions & 0 deletions .changeset/curvy-years-listen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Make `unpublishTrack` async, emit TrackUnpublished before TrackPublished within the same update
12 changes: 11 additions & 1 deletion src/room/PCTransport.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import EventEmitter from 'events';
import { MediaDescription, parse, write } from 'sdp-transform';
import { debounce } from 'ts-debounce';
import log from '../logger';
Expand All @@ -10,8 +11,13 @@ interface TrackBitrateInfo {
maxbr: number;
}

export const PCEvents = {
NegotiationStarted: 'negotiationStarted',
NegotiationComplete: 'negotiationComplete',
} as const;

/** @internal */
export default class PCTransport {
export default class PCTransport extends EventEmitter {
pc: RTCPeerConnection;

pendingCandidates: RTCIceCandidateInit[] = [];
Expand All @@ -27,6 +33,7 @@ export default class PCTransport {
onOffer?: (offer: RTCSessionDescriptionInit) => void;

constructor(config?: RTCConfiguration) {
super();
this.pc = new RTCPeerConnection(config);
}

Expand Down Expand Up @@ -56,11 +63,14 @@ export default class PCTransport {
if (this.renegotiate) {
this.renegotiate = false;
this.createAndSendOffer();
} else if (sd.type === 'answer') {
this.emit(PCEvents.NegotiationComplete);
}
}

// debounced negotiate interface
negotiate = debounce((onError?: (e: Error) => void) => {
this.emit(PCEvents.NegotiationStarted);
try {
this.createAndSendOffer();
} catch (e) {
Expand Down
40 changes: 29 additions & 11 deletions src/room/RTCEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import {
UnexpectedConnectionState,
} from './errors';
import { EngineEvent } from './events';
import PCTransport from './PCTransport';
import PCTransport, { PCEvents } from './PCTransport';
import type { ReconnectContext, ReconnectPolicy } from './ReconnectPolicy';
import type LocalTrack from './track/LocalTrack';
import type LocalVideoTrack from './track/LocalVideoTrack';
Expand Down Expand Up @@ -964,18 +964,36 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
}

/** @internal */
negotiate() {
if (!this.publisher) {
return;
}
negotiate(): Promise<void> {
// observe signal state
return new Promise<void>((resolve, reject) => {
if (!this.publisher) {
reject(new NegotiationError('publisher is not defined'));
return;
}

this.hasPublished = true;
this.hasPublished = true;

this.publisher.negotiate((e) => {
if (e instanceof NegotiationError) {
this.fullReconnectOnNext = true;
}
this.handleDisconnect('negotiation');
const negotiationTimeout = setTimeout(() => {
reject('negotiation timed out');
this.handleDisconnect('negotiation');
}, this.peerConnectionTimeout);

this.publisher.once(PCEvents.NegotiationStarted, () => {
this.publisher?.once(PCEvents.NegotiationComplete, () => {
clearTimeout(negotiationTimeout);
resolve();
});
});

this.publisher.negotiate((e) => {
clearTimeout(negotiationTimeout);
reject(e);
if (e instanceof NegotiationError) {
this.fullReconnectOnNext = true;
}
this.handleDisconnect('negotiation');
});
});
}

Expand Down
40 changes: 29 additions & 11 deletions src/room/participant/LocalParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ export default class LocalParticipant extends Participant {
} else if (track && track.track) {
// screenshare cannot be muted, unpublish instead
if (source === Track.Source.ScreenShare) {
track = this.unpublishTrack(track.track);
track = await this.unpublishTrack(track.track);
const screenAudioTrack = this.getTrack(Track.Source.ScreenShareAudio);
if (screenAudioTrack && screenAudioTrack.track) {
this.unpublishTrack(screenAudioTrack.track);
Expand Down Expand Up @@ -694,10 +694,10 @@ export default class LocalParticipant extends Participant {
log.debug(`published ${videoCodec} for track ${track.sid}`, { encodings, trackInfo: ti });
}

unpublishTrack(
async unpublishTrack(
track: LocalTrack | MediaStreamTrack,
stopOnUnpublish?: boolean,
): LocalTrackPublication | undefined {
): Promise<LocalTrackPublication | undefined> {
// look through all published tracks to find the right ones
const publication = this.getPublicationForTrack(track);

Expand Down Expand Up @@ -744,7 +744,7 @@ export default class LocalParticipant extends Participant {
} catch (e) {
log.warn('failed to unpublish track', { error: e, method: 'unpublishTrack' });
} finally {
this.engine.negotiate();
await this.engine.negotiate();
}
}

Expand All @@ -769,15 +769,33 @@ export default class LocalParticipant extends Participant {
return publication;
}

unpublishTracks(tracks: LocalTrack[] | MediaStreamTrack[]): LocalTrackPublication[] {
const publications: LocalTrackPublication[] = [];
tracks.forEach((track: LocalTrack | MediaStreamTrack) => {
const pub = this.unpublishTrack(track);
if (pub) {
publications.push(pub);
async unpublishTracks(
tracks: LocalTrack[] | MediaStreamTrack[],
): Promise<LocalTrackPublication[]> {
const results = await Promise.all(tracks.map((track) => this.unpublishTrack(track)));
return results.filter(
(track) => track instanceof LocalTrackPublication,
) as LocalTrackPublication[];
}

async republishAllTracks(options?: TrackPublishOptions) {
const localPubs: LocalTrackPublication[] = [];
this.tracks.forEach((pub) => {
if (pub.track) {
if (options) {
pub.options = { ...pub.options, ...options };
}
localPubs.push(pub);
}
});
return publications;

await Promise.all(
localPubs.map(async (pub) => {
const track = pub.track!;
await this.unpublishTrack(track, false);
await this.publishTrack(track, pub.options);
}),
);
}

/**
Expand Down
10 changes: 5 additions & 5 deletions src/room/participant/RemoteParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,6 @@ export default class RemoteParticipant extends Participant {
validTracks.set(ti.sid, publication);
});

// always emit events for new publications, Room will not forward them unless it's ready
newTracks.forEach((publication) => {
this.emit(ParticipantEvent.TrackPublished, publication);
});

// detect removed tracks
this.tracks.forEach((publication) => {
if (!validTracks.has(publication.trackSid)) {
Expand All @@ -278,6 +273,11 @@ export default class RemoteParticipant extends Participant {
this.unpublishTrack(publication.trackSid, true);
}
});

// always emit events for new publications, Room will not forward them unless it's ready
newTracks.forEach((publication) => {
this.emit(ParticipantEvent.TrackPublished, publication);
});
}

/** @internal */
Expand Down

0 comments on commit af223f4

Please sign in to comment.