diff --git a/.changeset/curvy-years-listen.md b/.changeset/curvy-years-listen.md new file mode 100644 index 000000000..c3fe39fc1 --- /dev/null +++ b/.changeset/curvy-years-listen.md @@ -0,0 +1,5 @@ +--- +'livekit-client': patch +--- + +Make `unpublishTrack` async, emit TrackUnpublished before TrackPublished within the same update diff --git a/src/room/PCTransport.ts b/src/room/PCTransport.ts index b4a85c81f..0a8650193 100644 --- a/src/room/PCTransport.ts +++ b/src/room/PCTransport.ts @@ -1,3 +1,4 @@ +import EventEmitter from 'events'; import { MediaDescription, parse, write } from 'sdp-transform'; import { debounce } from 'ts-debounce'; import log from '../logger'; @@ -10,8 +11,13 @@ interface TrackBitrateInfo { maxbr: number; } +export const PCEvents = { + NegotiationStarted: 'negotiationStarted', + NegotiationComplete: 'negotiationComplete', +} as const; + /** @internal */ -export default class PCTransport { +export default class PCTransport extends EventEmitter { pc: RTCPeerConnection; pendingCandidates: RTCIceCandidateInit[] = []; @@ -27,6 +33,7 @@ export default class PCTransport { onOffer?: (offer: RTCSessionDescriptionInit) => void; constructor(config?: RTCConfiguration) { + super(); this.pc = new RTCPeerConnection(config); } @@ -56,11 +63,14 @@ export default class PCTransport { if (this.renegotiate) { this.renegotiate = false; this.createAndSendOffer(); + } else if (sd.type === 'answer') { + this.emit(PCEvents.NegotiationComplete); } } // debounced negotiate interface negotiate = debounce((onError?: (e: Error) => void) => { + this.emit(PCEvents.NegotiationStarted); try { this.createAndSendOffer(); } catch (e) { diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 59b7ccc67..720980551 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -29,7 +29,7 @@ import { UnexpectedConnectionState, } from './errors'; import { EngineEvent } from './events'; -import PCTransport from './PCTransport'; +import PCTransport, { PCEvents } from './PCTransport'; import type { ReconnectContext, ReconnectPolicy } from './ReconnectPolicy'; import type LocalTrack from './track/LocalTrack'; import type LocalVideoTrack from './track/LocalVideoTrack'; @@ -964,18 +964,36 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } /** @internal */ - negotiate() { - if (!this.publisher) { - return; - } + negotiate(): Promise { + // observe signal state + return new Promise((resolve, reject) => { + if (!this.publisher) { + reject(new NegotiationError('publisher is not defined')); + return; + } - this.hasPublished = true; + this.hasPublished = true; - this.publisher.negotiate((e) => { - if (e instanceof NegotiationError) { - this.fullReconnectOnNext = true; - } - this.handleDisconnect('negotiation'); + const negotiationTimeout = setTimeout(() => { + reject('negotiation timed out'); + this.handleDisconnect('negotiation'); + }, this.peerConnectionTimeout); + + this.publisher.once(PCEvents.NegotiationStarted, () => { + this.publisher?.once(PCEvents.NegotiationComplete, () => { + clearTimeout(negotiationTimeout); + resolve(); + }); + }); + + this.publisher.negotiate((e) => { + clearTimeout(negotiationTimeout); + reject(e); + if (e instanceof NegotiationError) { + this.fullReconnectOnNext = true; + } + this.handleDisconnect('negotiation'); + }); }); } diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index 5bcd2bd3d..a73c0f185 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -262,7 +262,7 @@ export default class LocalParticipant extends Participant { } else if (track && track.track) { // screenshare cannot be muted, unpublish instead if (source === Track.Source.ScreenShare) { - track = this.unpublishTrack(track.track); + track = await this.unpublishTrack(track.track); const screenAudioTrack = this.getTrack(Track.Source.ScreenShareAudio); if (screenAudioTrack && screenAudioTrack.track) { this.unpublishTrack(screenAudioTrack.track); @@ -694,10 +694,10 @@ export default class LocalParticipant extends Participant { log.debug(`published ${videoCodec} for track ${track.sid}`, { encodings, trackInfo: ti }); } - unpublishTrack( + async unpublishTrack( track: LocalTrack | MediaStreamTrack, stopOnUnpublish?: boolean, - ): LocalTrackPublication | undefined { + ): Promise { // look through all published tracks to find the right ones const publication = this.getPublicationForTrack(track); @@ -744,7 +744,7 @@ export default class LocalParticipant extends Participant { } catch (e) { log.warn('failed to unpublish track', { error: e, method: 'unpublishTrack' }); } finally { - this.engine.negotiate(); + await this.engine.negotiate(); } } @@ -769,15 +769,33 @@ export default class LocalParticipant extends Participant { return publication; } - unpublishTracks(tracks: LocalTrack[] | MediaStreamTrack[]): LocalTrackPublication[] { - const publications: LocalTrackPublication[] = []; - tracks.forEach((track: LocalTrack | MediaStreamTrack) => { - const pub = this.unpublishTrack(track); - if (pub) { - publications.push(pub); + async unpublishTracks( + tracks: LocalTrack[] | MediaStreamTrack[], + ): Promise { + const results = await Promise.all(tracks.map((track) => this.unpublishTrack(track))); + return results.filter( + (track) => track instanceof LocalTrackPublication, + ) as LocalTrackPublication[]; + } + + async republishAllTracks(options?: TrackPublishOptions) { + const localPubs: LocalTrackPublication[] = []; + this.tracks.forEach((pub) => { + if (pub.track) { + if (options) { + pub.options = { ...pub.options, ...options }; + } + localPubs.push(pub); } }); - return publications; + + await Promise.all( + localPubs.map(async (pub) => { + const track = pub.track!; + await this.unpublishTrack(track, false); + await this.publishTrack(track, pub.options); + }), + ); } /** diff --git a/src/room/participant/RemoteParticipant.ts b/src/room/participant/RemoteParticipant.ts index bb5886024..38d37e9d2 100644 --- a/src/room/participant/RemoteParticipant.ts +++ b/src/room/participant/RemoteParticipant.ts @@ -263,11 +263,6 @@ export default class RemoteParticipant extends Participant { validTracks.set(ti.sid, publication); }); - // always emit events for new publications, Room will not forward them unless it's ready - newTracks.forEach((publication) => { - this.emit(ParticipantEvent.TrackPublished, publication); - }); - // detect removed tracks this.tracks.forEach((publication) => { if (!validTracks.has(publication.trackSid)) { @@ -278,6 +273,11 @@ export default class RemoteParticipant extends Participant { this.unpublishTrack(publication.trackSid, true); } }); + + // always emit events for new publications, Room will not forward them unless it's ready + newTracks.forEach((publication) => { + this.emit(ParticipantEvent.TrackPublished, publication); + }); } /** @internal */