Skip to content

Commit

Permalink
Fix missed TrackPublished events. (livekit#275)
Browse files Browse the repository at this point in the history
* Fix missed TrackPublished events.

When initial participant info is sent down along with tracks, we would
miss emitting TrackPublished events for them. This is due an incorrect
assumption that participants would first join without tracks.

Instead of relying on if it's the first update, we now use Room connection
state to determine if those TrackPublished events should be emitted or not.

* defer TrackSubscribed event until after connected to room.

* emit certain events only when connected
  • Loading branch information
davidzhao committed Jun 22, 2022
1 parent 99fba24 commit 591c218
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 40 deletions.
5 changes: 5 additions & 0 deletions .changeset/famous-suits-matter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Fixed TrackPublished events not firing correctly in some cases
40 changes: 18 additions & 22 deletions example/sample.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import {
VideoCaptureOptions,
VideoCodec,
VideoPresets,
VideoQuality,
} from '../src/index';
VideoQuality
} from '../src/index'

const $ = (id: string) => document.getElementById(id);

Expand Down Expand Up @@ -110,12 +110,12 @@ const appActions = {
.on(RoomEvent.LocalTrackPublished, () => {
renderParticipant(room.localParticipant);
updateButtonsForPublishState();
renderScreenShare();
renderScreenShare(room);
})
.on(RoomEvent.LocalTrackUnpublished, () => {
renderParticipant(room.localParticipant);
updateButtonsForPublishState();
renderScreenShare();
renderScreenShare(room);
})
.on(RoomEvent.RoomMetadataChanged, (metadata) => {
appendLog('new metadata for room', metadata);
Expand All @@ -138,9 +138,15 @@ const appActions = {
appendLog('connection quality changed', participant?.identity, quality);
},
)
.on(RoomEvent.TrackSubscribed, (_1, _2, participant: RemoteParticipant) => {
.on(RoomEvent.TrackSubscribed, (_1, pub, participant) => {
appendLog('subscribed to track', pub.trackSid, participant.identity);
renderParticipant(participant);
renderScreenShare();
renderScreenShare(room);
})
.on(RoomEvent.TrackUnsubscribed, (_, pub, participant) => {
appendLog('unsubscribed from track', pub.trackSid);
renderParticipant(participant);
renderScreenShare(room);
})
.on(RoomEvent.SignalConnected, async () => {
if (shouldPublish) {
Expand Down Expand Up @@ -351,16 +357,6 @@ function handleData(msg: Uint8Array, participant?: RemoteParticipant) {
function participantConnected(participant: Participant) {
appendLog('participant', participant.identity, 'connected', participant.metadata);
participant
.on(ParticipantEvent.TrackSubscribed, (_, pub: TrackPublication) => {
appendLog('subscribed to track', pub.trackSid, participant.identity);
renderParticipant(participant);
renderScreenShare();
})
.on(ParticipantEvent.TrackUnsubscribed, (_, pub: TrackPublication) => {
appendLog('unsubscribed from track', pub.trackSid);
renderParticipant(participant);
renderScreenShare();
})
.on(ParticipantEvent.TrackMuted, (pub: TrackPublication) => {
appendLog('track was muted', pub.trackSid, participant.identity);
renderParticipant(participant);
Expand Down Expand Up @@ -391,7 +387,7 @@ function handleRoomDisconnect() {
currentRoom.participants.forEach((p) => {
renderParticipant(p, true);
});
renderScreenShare();
renderScreenShare(currentRoom);

const container = $('participants-area');
if (container) {
Expand Down Expand Up @@ -572,19 +568,19 @@ function renderParticipant(participant: Participant, remove: boolean = false) {
}
}

function renderScreenShare() {
function renderScreenShare(room: Room) {
const div = $('screenshare-area')!;
if (!currentRoom || currentRoom.state !== ConnectionState.Connected) {
if (room.state !== ConnectionState.Connected) {
div.style.display = 'none';
return;
}
let participant: Participant | undefined;
let screenSharePub: TrackPublication | undefined = currentRoom.localParticipant.getTrack(
let screenSharePub: TrackPublication | undefined = room.localParticipant.getTrack(
Track.Source.ScreenShare,
);
let screenShareAudioPub: RemoteTrackPublication | undefined;
if (!screenSharePub) {
currentRoom.participants.forEach((p) => {
room.participants.forEach((p) => {
if (screenSharePub) {
return;
}
Expand All @@ -599,7 +595,7 @@ function renderScreenShare() {
}
});
} else {
participant = currentRoom.localParticipant;
participant = room.localParticipant;
}

if (screenSharePub && participant) {
Expand Down
47 changes: 40 additions & 7 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,20 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
stream: MediaStream,
receiver?: RTCRtpReceiver,
) {
// don't fire onSubscribed when connecting
// WebRTC fires onTrack as soon as setRemoteDescription is called on the offer
// at that time, ICE connectivity has not been established so the track is not
// technically subscribed.
// We'll defer these events until when the room is connected or eventually disconnected.
if (this.state === ConnectionState.Connecting || this.state === ConnectionState.Reconnecting) {
setTimeout(() => {
this.onTrackAdded(mediaTrack, stream, receiver);
}, 10);
return;
}
if (this.state === ConnectionState.Disconnected) {
log.warn('skipping incoming track after Room disconnected');
}
const parts = unpackStreamId(stream.id);
const participantId = parts[0];
let trackId = parts[1];
Expand Down Expand Up @@ -850,7 +864,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
// and remote participant joined the room
participant
.on(ParticipantEvent.TrackPublished, (trackPublication: RemoteTrackPublication) => {
this.emit(RoomEvent.TrackPublished, trackPublication, participant);
this.emitWhenConnected(RoomEvent.TrackPublished, trackPublication, participant);
})
.on(
ParticipantEvent.TrackSubscribed,
Expand All @@ -864,7 +878,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
},
)
.on(ParticipantEvent.TrackUnpublished, (publication: RemoteTrackPublication) => {
this.emit(RoomEvent.TrackUnpublished, publication, participant);
this.emitWhenConnected(RoomEvent.TrackUnpublished, publication, participant);
})
.on(
ParticipantEvent.TrackUnsubscribed,
Expand All @@ -876,23 +890,32 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
this.emit(RoomEvent.TrackSubscriptionFailed, sid, participant);
})
.on(ParticipantEvent.TrackMuted, (pub: TrackPublication) => {
this.emit(RoomEvent.TrackMuted, pub, participant);
this.emitWhenConnected(RoomEvent.TrackMuted, pub, participant);
})
.on(ParticipantEvent.TrackUnmuted, (pub: TrackPublication) => {
this.emit(RoomEvent.TrackUnmuted, pub, participant);
this.emitWhenConnected(RoomEvent.TrackUnmuted, pub, participant);
})
.on(ParticipantEvent.ParticipantMetadataChanged, (metadata: string | undefined) => {
this.emit(RoomEvent.ParticipantMetadataChanged, metadata, participant);
this.emitWhenConnected(RoomEvent.ParticipantMetadataChanged, metadata, participant);
})
.on(ParticipantEvent.ConnectionQualityChanged, (quality: ConnectionQuality) => {
this.emit(RoomEvent.ConnectionQualityChanged, quality, participant);
this.emitWhenConnected(RoomEvent.ConnectionQualityChanged, quality, participant);
})
.on(
ParticipantEvent.ParticipantPermissionsChanged,
(prevPermissions: ParticipantPermission) => {
this.emit(RoomEvent.ParticipantPermissionsChanged, prevPermissions, participant);
this.emitWhenConnected(
RoomEvent.ParticipantPermissionsChanged,
prevPermissions,
participant,
);
},
);

// update info at the end after callbacks have been set up
if (info) {
participant.updateInfo(info);
}
return participant;
}

Expand Down Expand Up @@ -959,6 +982,16 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
return true;
}

private emitWhenConnected<E extends keyof RoomEventCallbacks>(
event: E,
...args: Parameters<RoomEventCallbacks[E]>
): boolean {
if (this.state === ConnectionState.Connected) {
return this.emit(event, ...args);
}
return false;
}

// /** @internal */
emit<E extends keyof RoomEventCallbacks>(
event: E,
Expand Down
16 changes: 5 additions & 11 deletions src/room/participant/RemoteParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ export default class RemoteParticipant extends Participant {

/** @internal */
static fromParticipantInfo(signalClient: SignalClient, pi: ParticipantInfo): RemoteParticipant {
const rp = new RemoteParticipant(signalClient, pi.sid, pi.identity);
rp.updateInfo(pi);
return rp;
return new RemoteParticipant(signalClient, pi.sid, pi.identity);
}

/** @internal */
Expand Down Expand Up @@ -182,8 +180,6 @@ export default class RemoteParticipant extends Participant {

/** @internal */
updateInfo(info: ParticipantInfo) {
const alreadyHasMetadata = this.hasMetadata;

super.updateInfo(info);

// we are getting a list of all available tracks, reconcile in here
Expand Down Expand Up @@ -212,12 +208,10 @@ export default class RemoteParticipant extends Participant {
validTracks.set(ti.sid, publication);
});

// send new tracks
if (alreadyHasMetadata) {
newTracks.forEach((publication) => {
this.emit(ParticipantEvent.TrackPublished, publication);
});
}
// always emit events for new publications, Room will not forward them unless it's ready
newTracks.forEach((publication) => {
this.emit(ParticipantEvent.TrackPublished, publication);
});

// detect removed tracks
this.tracks.forEach((publication) => {
Expand Down

0 comments on commit 591c218

Please sign in to comment.