Skip to content

Commit

Permalink
Optimize receive timestamp processing
Browse files Browse the repository at this point in the history
  • Loading branch information
YorVeX committed Oct 29, 2023
1 parent 9c690db commit 0e6b09d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 20 deletions.
33 changes: 23 additions & 10 deletions src/Beam.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ namespace xObsBeam;

public class Beam
{
const int ReceiveTimestampPairLength = (sizeof(byte) + sizeof(long)) * 2;
public const int ReceiveTimestampLength = (sizeof(byte) + sizeof(long));
public const int MaxReceiveTimestampLength = ReceiveTimestampLength * 4; // sometimes we might get timestamps of the same type in a row, they then shouldn't make us discard the other type

public enum SenderTypes
{
Expand Down Expand Up @@ -52,17 +53,29 @@ public static Type GetBeamType(ReadOnlySequence<byte> sequence)
return (Type)result;
}

public static SequencePosition GetReceiveTimestamp(ReadOnlySequence<byte> sequence, out ReceiveTimestampTypes receiveTimestampType, out ulong timestamp)
public static SequencePosition GetReceiveTimestamps(ReadOnlySequence<byte> sequence, string clientId, out ulong receiveTimestamp, out ulong renderTimestamp)
{
receiveTimestamp = 0;
renderTimestamp = 0;
var reader = new SequenceReader<byte>(sequence);
if (reader.Length > ReceiveTimestampPairLength)
reader.Advance(reader.Length - ReceiveTimestampPairLength); // skip past outdated timestamp information
if (!reader.TryRead(out byte byteResult))
throw new ArgumentException("Failed to read enough data from sequence.");
if (!reader.TryReadLittleEndian(out long longResult))
throw new ArgumentException("Failed to read enough data from sequence.");
timestamp = (ulong)longResult;
receiveTimestampType = (ReceiveTimestampTypes)byteResult;
if (reader.Length > MaxReceiveTimestampLength)
{
Module.Log($"<{clientId}> Discarding excessive timestamp info from receiver ({reader.Length - MaxReceiveTimestampLength} bytes).", ObsLogLevel.Debug);
reader.Advance(reader.Length - MaxReceiveTimestampLength); // skip past outdated timestamp information to keep unnecessary work for the following loop to a minimum
}
while (reader.TryRead(out byte byteResult))
{
if (!reader.TryReadLittleEndian(out long longResult))
{
reader.Rewind(1); // leave the ReceiveTimestampTypes byte in the buffer for the next round
break;
}
// with this loop if a timestamp type is received more than once, only the last one will be used
if ((ReceiveTimestampTypes)byteResult == ReceiveTimestampTypes.Receive)
receiveTimestamp = (ulong)longResult;
else if ((ReceiveTimestampTypes)byteResult == ReceiveTimestampTypes.Render)
renderTimestamp = (ulong)longResult;
}
return reader.Position;
}

Expand Down
20 changes: 10 additions & 10 deletions src/BeamSenderClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async Task ReceiveLoopAsync(PipeReader pipeReader, CancellationToken cancellatio
_ = Task.Run(() => CheckReceiverAliveLoopAsync(cancellationToken), cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{
ReadResult readResult = await pipeReader.ReadAtLeastAsync(sizeof(byte) + sizeof(ulong), cancellationToken);
ReadResult readResult = await pipeReader.ReadAtLeastAsync(Beam.ReceiveTimestampLength, cancellationToken);
if (readResult.IsCanceled || (readResult.Buffer.IsEmpty && readResult.IsCompleted))
{
if (readResult.IsCanceled)
Expand All @@ -136,25 +136,25 @@ async Task ReceiveLoopAsync(PipeReader pipeReader, CancellationToken cancellatio
}
break;
}
pipeReader.AdvanceTo(Beam.GetReceiveTimestamp(readResult.Buffer, out Beam.ReceiveTimestampTypes receiveTimestampType, out ulong timestamp), readResult.Buffer.End);
_lastFrameTime = DateTime.UtcNow;
if (receiveTimestampType == Beam.ReceiveTimestampTypes.Receive)
pipeReader.AdvanceTo(Beam.GetReceiveTimestamps(readResult.Buffer, ClientId, out var receiveTimestamp, out var renderTimestamp), readResult.Buffer.End);
if (receiveTimestamp > 0)
{
var lastSentTimestamp = Interlocked.Read(ref _lastSentTimestamp);
if (lastSentTimestamp >= timestamp) // an offset reset on the receiver side after a reconnect can cause a "future timestamp", ignore those
if (lastSentTimestamp >= receiveTimestamp) // an offset reset on the receiver side after a reconnect can cause a "future timestamp", ignore those
{
var receiveDelayMs = (lastSentTimestamp - timestamp) / 1_000_000;
// Module.Log($"<{ClientId}> Receiver received video frame {timestamp} with a delay of {receiveDelayMs} ms", ObsLogLevel.Debug);
var receiveDelayMs = (lastSentTimestamp - receiveTimestamp) / 1_000_000;
// Module.Log($"<{ClientId}> Receiver received video frame {receiveTimestamp} with a delay of {receiveDelayMs} ms ({lastSentTimestamp} - {receiveTimestamp})", ObsLogLevel.Debug);
Interlocked.Exchange(ref _receiveDelayMs, (long)receiveDelayMs);
}
}
else if (receiveTimestampType == Beam.ReceiveTimestampTypes.Render)
if (renderTimestamp > 0)
{
var lastSentTimestamp = Interlocked.Read(ref _lastSentTimestamp);
if (lastSentTimestamp >= timestamp) // an offset reset on the receiver side after a reconnect can cause a "future timestamp", ignore those
if (lastSentTimestamp >= renderTimestamp) // an offset reset on the receiver side after a reconnect can cause a "future timestamp", ignore those
{
var renderDelayMs = (lastSentTimestamp - timestamp) / 1_000_000;
// Module.Log($"<{ClientId}> Receiver rendered video frame {timestamp} with a delay of {renderDelayMs} ms", ObsLogLevel.Debug);
var renderDelayMs = (lastSentTimestamp - renderTimestamp) / 1_000_000;
// Module.Log($"<{ClientId}> Receiver rendered video frame {renderTimestamp} with a delay of {renderDelayMs} ms ({lastSentTimestamp} - {renderTimestamp})", ObsLogLevel.Debug);
Interlocked.Exchange(ref _renderDelayMs, (long)renderDelayMs);
}
}
Expand Down

0 comments on commit 0e6b09d

Please sign in to comment.