Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filter support (#3) #27

Merged
merged 9 commits into from
Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 147 additions & 47 deletions Beam.cs

Large diffs are not rendered by default.

103 changes: 84 additions & 19 deletions BeamReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class BeamReceiver
string _pipeName = "";
bool _isConnecting;
ulong _frameTimestampOffset;
readonly ConcurrentQueue<ulong> _lastRenderedVideoFrameTimestamps = new();
readonly ConcurrentQueue<ulong> _lastRenderedFrameTimestamps = new();
unsafe void* _turboJpegDecompress = null;
unsafe qoir_decode_options_struct* _qoirDecodeOptions;

Expand Down Expand Up @@ -209,9 +209,9 @@ public void Disconnect()
}
}

public void SetLastOutputVideoFrameTimestamp(ulong timestamp)
public void SetLastOutputFrameTimestamp(ulong timestamp)
{
_lastRenderedVideoFrameTimestamps.Enqueue(timestamp);
_lastRenderedFrameTimestamps.Enqueue(timestamp);
}

public delegate Task AsyncEventHandler<TEventArgs>(object? sender, TEventArgs e);
Expand Down Expand Up @@ -278,7 +278,7 @@ private unsafe void TurboJpegDecompressToBgra(byte[] receivedFrameData, int data
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private unsafe void TurboJpegDecompressToYuv(byte[] receivedFrameData, int dataSize, byte[] rawDataBuffer, Beam.PlaneInfo planeInfo, int width, int height)
private unsafe void TurboJpegDecompressToYuv(byte[] receivedFrameData, int dataSize, byte[] rawDataBuffer, Beam.VideoPlaneInfo planeInfo, int width, int height)
{
fixed (byte* jpegBuf = receivedFrameData, dstBuf = rawDataBuffer)
{
Expand Down Expand Up @@ -358,7 +358,7 @@ public static unsafe double GetLocalFps()
private async Task ProcessDataLoopAsync(Socket? socket, NamedPipeClientStream? pipeStream, CancellationToken cancellationToken)
{
_frameTimestampOffset = 0;
_lastRenderedVideoFrameTimestamps.Clear();
_lastRenderedFrameTimestamps.Clear();

string endpointName;
PipeReader pipeReader;
Expand Down Expand Up @@ -389,10 +389,10 @@ private async Task ProcessDataLoopAsync(Socket? socket, NamedPipeClientStream? p

int videoHeaderSize = Beam.VideoHeader.VideoHeaderDataSize;
uint rawVideoDataSize = 0;
Beam.PlaneInfo planeInfo = Beam.PlaneInfo.Empty;
Beam.VideoPlaneInfo planeInfo = Beam.VideoPlaneInfo.Empty;
bool jpegInitialized = false;
byte[] nv12ConversionBuffer = Array.Empty<byte>();
Beam.PlaneInfo i420PlaneInfo = Beam.PlaneInfo.Empty;
Beam.VideoPlaneInfo i420PlaneInfo = Beam.VideoPlaneInfo.Empty;

double senderFps = 30;
uint logCycle = 0;
Expand All @@ -408,6 +408,7 @@ private async Task ProcessDataLoopAsync(Socket? socket, NamedPipeClientStream? p
ulong senderVideoTimestamp;

bool firstVideoFrame = true;
bool firstAudioFrame = true;

if (EncoderSupport.LibJpegTurbo)
TurboJpegDecompressInit();
Expand All @@ -432,7 +433,7 @@ private async Task ProcessDataLoopAsync(Socket? socket, NamedPipeClientStream? p
frameReceivedTime = DateTime.UtcNow;

var beamType = Beam.GetBeamType(readResult.Buffer);
if (beamType == Beam.Type.Video)
if (beamType is Beam.Type.Video or Beam.Type.VideoOnly)
{
// read and validate header information
pipeReader.AdvanceTo(videoHeader.FromSequence(readResult.Buffer), readResult.Buffer.End);
Expand Down Expand Up @@ -505,7 +506,7 @@ private async Task ProcessDataLoopAsync(Socket? socket, NamedPipeClientStream? p
renderDelayAveragingFrameCount = (int)(senderFps / 2);
renderDelays = new int[renderDelayAveragingFrameCount];

planeInfo = Beam.GetPlaneInfo(videoHeader.Format, videoHeader.Width, videoHeader.Height);
planeInfo = Beam.GetVideoPlaneInfo(videoHeader.Format, videoHeader.Width, videoHeader.Height);

if (videoHeader.Compression == Beam.CompressionTypes.Density)
rawVideoDataSize = (uint)Density.density_decompress_safe_size(planeInfo.DataSize);
Expand Down Expand Up @@ -574,7 +575,7 @@ private async Task ProcessDataLoopAsync(Socket? socket, NamedPipeClientStream? p
if (videoHeader.Format == video_format.VIDEO_FORMAT_NV12) // for this case we need an extra buffer and plane info for conversion to NV12, since JPEG decompression always outputs I420
{
nv12ConversionBuffer = new byte[rawVideoDataSize];
i420PlaneInfo = Beam.GetPlaneInfo(video_format.VIDEO_FORMAT_I420, videoHeader.Width, videoHeader.Height);
i420PlaneInfo = Beam.GetVideoPlaneInfo(video_format.VIDEO_FORMAT_I420, videoHeader.Width, videoHeader.Height);
}
}

Expand Down Expand Up @@ -614,7 +615,7 @@ private async Task ProcessDataLoopAsync(Socket? socket, NamedPipeClientStream? p
if (logCycle++ >= senderFps)
logCycle = 0;
}
else if (beamType == Beam.Type.Audio)
else if (beamType is Beam.Type.Audio or Beam.Type.AudioOnly)
{
// read and validate header information
pipeReader.AdvanceTo(audioHeader.FromSequence(readResult.Buffer), readResult.Buffer.End);
Expand All @@ -630,6 +631,70 @@ private async Task ProcessDataLoopAsync(Socket? socket, NamedPipeClientStream? p
break;
}

// normally some mechanisms depend on video frames, for audio-only feeds these have to be applied here
if (beamType == Beam.Type.AudioOnly)
{
// tell the sender the current audio frame timestamp that was received - only done if this is an audio only feed
pipeWriter.GetSpan(sizeof(byte))[0] = (byte)Beam.ReceiveTimestampTypes.Receive;
pipeWriter.Advance(sizeof(byte));
BinaryPrimitives.WriteUInt64LittleEndian(pipeWriter.GetSpan(sizeof(ulong)), audioHeader.Timestamp);
pipeWriter.Advance(sizeof(ulong));
var writeReceivedTimestampResult = await pipeWriter.FlushAsync(cancellationToken);
if (writeReceivedTimestampResult.IsCanceled || writeReceivedTimestampResult.IsCompleted)
{
if (writeReceivedTimestampResult.IsCanceled)
Module.Log("processDataLoopAsync() exit from sending through cancellation.", ObsLogLevel.Debug);
else
Module.Log("processDataLoopAsync() exit from sending through completion.", ObsLogLevel.Debug);
break;
}

// set frame timestamp offset
if (_frameTimestampOffset == 0) // initialize the offset if this is the very first frame since connecting
{
_frameTimestampOffset = audioHeader.Timestamp;
Module.Log($"Audio data: Frame timestamp offset initialized to: {_frameTimestampOffset}", ObsLogLevel.Debug);
}

if (firstAudioFrame)
{
firstAudioFrame = false;

senderFps = (1 / ((1 / (double)audioHeader.SampleRate) * audioHeader.Frames));

// initialize render delay averaging
renderDelayAveragingFrameCount = (int)(senderFps / 2);
renderDelays = new int[renderDelayAveragingFrameCount];

//TODO: enable frame buffer for audio-only feeds - currently doesn't work, since the frame buffer is video frame centric (not even ramp-up is finished without video frames)
// initialize frame buffer
// if (FrameBufferTimeMs > 0)
// {
// FrameBuffer = new FrameBuffer(FrameBufferTimeMs, FrameBufferFixedDelay, senderFps, senderFps, RawDataBufferPool);
// Module.Log($"Buffering {FrameBuffer.VideoFrameBufferCount} audio frames based on a frame buffer time of {FrameBuffer.FrameBufferTimeMs} ms for {senderFps:F} sender FPS (local: {localFps:F} FPS).", ObsLogLevel.Info);
// }
// else
// {
FrameBuffer = null;
Module.Log("Frame buffering disabled.", ObsLogLevel.Info);
// }
}

// average render delay calculation
renderDelays[renderDelayAveragingCycle] = audioHeader.RenderDelay;
if (++renderDelayAveragingCycle >= renderDelayAveragingFrameCount)
{
renderDelayAveragingCycle = 0;
renderDelayAverage = (int)renderDelays.Average();
}

if (logCycle++ >= senderFps)
{
logCycle = 0;
Module.Log($"Audio data: Received header {audioHeader.Timestamp}, Receive/Render delay: {audioHeader.ReceiveDelay} / {audioHeader.RenderDelay} ({renderDelayAverage}) ms ", ObsLogLevel.Debug);
}
}

if (audioHeader.Timestamp > _frameTimestampOffset)
audioHeader.Timestamp -= _frameTimestampOffset;
else
Expand All @@ -652,12 +717,12 @@ private async Task ProcessDataLoopAsync(Socket? socket, NamedPipeClientStream? p
}

// process the frame
if (!firstVideoFrame && (_frameTimestampOffset > 0)) // Beam treats video frames as a kind of header in several ways, ignore audio frames that were received before the first video frame
if ((!firstVideoFrame || (beamType == Beam.Type.AudioOnly)) && (_frameTimestampOffset > 0)) // Beam treats video frames as a kind of header in several ways, ignore audio frames that were received before the first video frame (unless it's an audio-only feed)
{
if (FrameBuffer == null)
OnAudioFrameReceived(new Beam.BeamAudioData(audioHeader, readResult.Buffer.Slice(0, audioHeader.DataSize).ToArray(), frameReceivedTime));
OnAudioFrameReceived(new Beam.BeamAudioData(audioHeader, readResult.Buffer.Slice(0, audioHeader.DataSize).ToArray(), frameReceivedTime, renderDelayAverage));
else
FrameBuffer.ProcessFrame(new Beam.BeamAudioData(audioHeader, readResult.Buffer.Slice(0, audioHeader.DataSize).ToArray(), frameReceivedTime));
FrameBuffer.ProcessFrame(new Beam.BeamAudioData(audioHeader, readResult.Buffer.Slice(0, audioHeader.DataSize).ToArray(), frameReceivedTime, renderDelayAverage));
}

long receiveLength = readResult.Buffer.Length; // remember this here, before the buffer is invalidated with the next line
Expand All @@ -674,16 +739,16 @@ private async Task ProcessDataLoopAsync(Socket? socket, NamedPipeClientStream? p
break;
}

// tell the sender the last video frame timestamps that were output by OBS
while (_lastRenderedVideoFrameTimestamps.TryDequeue(out ulong lastRenderedVideoFrameTimestamp))
// tell the sender the last frame timestamps that were output by OBS
while (_lastRenderedFrameTimestamps.TryDequeue(out ulong lastRenderedFrameTimestamp))
{
if (FrameBuffer != null)
lastRenderedVideoFrameTimestamp = FrameBuffer.GetOriginalVideoTimestamp(lastRenderedVideoFrameTimestamp) + _frameTimestampOffset;
lastRenderedFrameTimestamp = FrameBuffer.GetOriginalVideoTimestamp(lastRenderedFrameTimestamp) + _frameTimestampOffset;
else
lastRenderedVideoFrameTimestamp += _frameTimestampOffset;
lastRenderedFrameTimestamp += _frameTimestampOffset;
pipeWriter.GetSpan(sizeof(byte))[0] = (byte)Beam.ReceiveTimestampTypes.Render;
pipeWriter.Advance(sizeof(byte));
BinaryPrimitives.WriteUInt64LittleEndian(pipeWriter.GetSpan(sizeof(ulong)), lastRenderedVideoFrameTimestamp);
BinaryPrimitives.WriteUInt64LittleEndian(pipeWriter.GetSpan(sizeof(ulong)), lastRenderedFrameTimestamp);
pipeWriter.Advance(sizeof(ulong));
var writeRenderTimestampResult = await pipeWriter.FlushAsync(cancellationToken);
if (writeRenderTimestampResult.IsCanceled || writeRenderTimestampResult.IsCompleted)
Expand Down
Loading