Skip to content

Commit

Permalink
Merge pull request #13 from YorVeX/feature/libjpeg-turbo-support
Browse files Browse the repository at this point in the history
JPEG support (lossy and lossless) based on the libjpeg-turbo library
  • Loading branch information
YorVeX authored May 20, 2023
2 parents cde6882 + 8d0ba85 commit 5e22dc9
Show file tree
Hide file tree
Showing 20 changed files with 1,599 additions and 207 deletions.
27 changes: 25 additions & 2 deletions Beam.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public enum CompressionTypes : int
Qoi = 1,
Lz4 = 2,
QoiLz4 = 3,
Jpeg = 4,
}

#region helper methods
Expand Down Expand Up @@ -45,6 +46,28 @@ public static SequencePosition GetTimestamp(ReadOnlySequence<byte> sequence, out
return reader.Position;
}

public static unsafe uint[] GetYuvPlaneSizes(video_format format, uint width, uint height)
{
uint halfHeight = 0;
uint halfwidth = 0;
uint[] planeSizes;

switch (format)
{
//TODO: support more YUV formats for JPEG compression
case video_format.VIDEO_FORMAT_NV12: // deinterleave and convert to I420
halfHeight = (height + 1) / 2;
halfwidth = width / 2;
planeSizes = new uint[3];
planeSizes[0] = (width * height);
planeSizes[1] = (halfwidth * halfHeight);
planeSizes[2] = (halfwidth * halfHeight);
return planeSizes;
default: // doesn't need to be deinterleaved or not supported
return Array.Empty<uint>();
}
}

public static unsafe uint[] GetPlaneSizes(video_format format, uint height, uint* linesize)
{
uint halfHeight = 0;
Expand Down Expand Up @@ -86,7 +109,7 @@ public static unsafe uint[] GetPlaneSizes(video_format format, uint height, uint
case video_format.VIDEO_FORMAT_Y800:
case video_format.VIDEO_FORMAT_BGR3:
case video_format.VIDEO_FORMAT_AYUV:
// case video_format.VIDEO_FORMAT_V210: // newer OBS
// case video_format.VIDEO_FORMAT_V210: // OBS 29.1.X+
planeSizes = new uint[1];
planeSizes[0] = (linesize[0] * height);
return planeSizes;
Expand All @@ -110,7 +133,7 @@ public static unsafe uint[] GetPlaneSizes(video_format format, uint height, uint
return planeSizes;
default:
Module.Log($"Unsupported video format: {format}", ObsLogLevel.Error);
return new uint[0];
return Array.Empty<uint>();
}

}
Expand Down
96 changes: 76 additions & 20 deletions BeamReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

using System.Buffers;
using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.IO.Pipelines;
using System.IO.Pipes;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using LibJpegTurbo;
using K4os.Compression.LZ4;

namespace xObsBeam;
Expand All @@ -23,6 +25,7 @@ public class BeamReceiver
bool _isConnected = false;
ulong _frameTimestampOffset = 0;
ArrayPool<byte> _rawDataBufferPool = ArrayPool<byte>.Create();
unsafe void* _turboJpegDecompress = null;

public ArrayPool<byte> RawDataBufferPool
{
Expand Down Expand Up @@ -212,6 +215,57 @@ void OnDisconnected()
Task.Run(() => Disconnected?.Invoke(this, EventArgs.Empty));
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private unsafe void turboJpegDecompressToBGRA(byte[] receivedFrameData, int dataSize, byte[] rawDataBuffer, int width, int height)
{
fixed (byte* jpegBuf = receivedFrameData, dstBuf = rawDataBuffer)
{
int compressResult;
if (EncoderSupport.LibJpegTurboV3)
compressResult = TurboJpeg.tj3Decompress8(_turboJpegDecompress, jpegBuf, (uint)dataSize, dstBuf, width * TurboJpeg.tjPixelSize[(int)TJPF.TJPF_BGRA], (int)TJPF.TJPF_BGRA);
else
compressResult = TurboJpeg.tjDecompress2(_turboJpegDecompress, jpegBuf, (uint)dataSize, dstBuf, width, width * TurboJpeg.tjPixelSize[(int)TJPF.TJPF_BGRA], height, (int)TJPF.TJPF_BGRA, 0);
if (compressResult != 0)
Module.Log("tjDecompress2 failed with error " + TurboJpeg.tjGetErrorCode(_turboJpegDecompress) + ": " + Marshal.PtrToStringUTF8((IntPtr)TurboJpeg.tjGetErrorStr2(_turboJpegDecompress)), ObsLogLevel.Error);
}
}

private unsafe void turboJpegDecompressInit()
{
if (EncoderSupport.LibJpegTurboV3)
_turboJpegDecompress = TurboJpeg.tj3Init((int)TJINIT.TJINIT_DECOMPRESS);
else if (EncoderSupport.LibJpegTurbo)
_turboJpegDecompress = TurboJpeg.tjInitDecompress();
}

private unsafe void turboJpegDecompressDestroy()
{
if (_turboJpegDecompress != null)
{
if (EncoderSupport.LibJpegTurboV3)
TurboJpeg.tj3Destroy(_turboJpegDecompress);
else
TurboJpeg.tjDestroy(_turboJpegDecompress);
_turboJpegDecompress = null;
}
}

private unsafe uint getRawVideoDataSize(Beam.VideoHeader videoHeader)
{
uint rawVideoDataSize = 0;
// get the plane sizes for the current frame format and size
fixed (uint* linesize = videoHeader.Linesize)
{
var videoPlaneSizes = Beam.GetPlaneSizes(videoHeader.Format, videoHeader.Height, linesize);
if (videoPlaneSizes.Length == 0) // unsupported format
return rawVideoDataSize;

for (int planeIndex = 0; planeIndex < videoPlaneSizes.Length; planeIndex++)
rawVideoDataSize += videoPlaneSizes[planeIndex];
return rawVideoDataSize;
}
}

private async Task processDataLoopAsync(Socket? socket, NamedPipeClientStream? pipeStream, CancellationToken cancellationToken)
{
string endpointName;
Expand Down Expand Up @@ -242,6 +296,7 @@ private async Task processDataLoopAsync(Socket? socket, NamedPipeClientStream? p
var audioHeader = new Beam.AudioHeader();

int videoHeaderSize = Beam.VideoHeader.VideoHeaderDataSize;
uint rawVideoDataSize = 0;

uint fps = 30;
uint logCycle = 0;
Expand All @@ -257,15 +312,10 @@ private async Task processDataLoopAsync(Socket? socket, NamedPipeClientStream? p
FrameBuffer frameBuffer = new FrameBuffer();
frameBuffer.FrameBufferTimeMs = FrameBufferTimeMs;

lock (_sizeLock)
{
if ((_width > 0) && (_height > 0)) // could still be set from a previous run, in this case use this information to initialize the buffers already
{
maxVideoDataSize = (int)(_width * _height * 4);
receivedFrameData = new byte[maxVideoDataSize];
lz4DecompressBuffer = new byte[maxVideoDataSize];
}
}
bool firstFrame = true;

if (EncoderSupport.LibJpegTurbo)
turboJpegDecompressInit();

// main loop
while (!cancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -336,10 +386,12 @@ private async Task processDataLoopAsync(Socket? socket, NamedPipeClientStream? p
_width = videoHeader.Width;
_height = videoHeader.Height;
}
if (sizeChanged) // re-allocate the arrays matching the new necessary size
if (sizeChanged || firstFrame) // re-allocate the arrays matching the new necessary size
{
firstFrame = false;
rawVideoDataSize = getRawVideoDataSize(videoHeader);
maxVideoDataSize = (int)(videoHeader.Width * videoHeader.Height * 4);
receivedFrameData = new byte[maxVideoDataSize];
receivedFrameData = new byte[rawVideoDataSize];
lz4DecompressBuffer = new byte[maxVideoDataSize];
_rawDataBufferPool = ArrayPool<byte>.Create(maxVideoDataSize, 2);
}
Expand All @@ -361,18 +413,23 @@ private async Task processDataLoopAsync(Socket? socket, NamedPipeClientStream? p

// now decompress QOI
Qoi.Decode(lz4DecompressBuffer, videoHeader.QoiDataSize, rawDataBuffer, maxVideoDataSize);

}
// need to decompress LZ4 only
else if (videoHeader.Compression == Beam.CompressionTypes.Lz4)
{
int decompressedSize = LZ4Codec.Decode(receivedFrameData, 0, videoHeader.DataSize, rawDataBuffer, 0, maxVideoDataSize);
if (decompressedSize != maxVideoDataSize)
Module.Log($"LZ4 decompression failed, expected {maxVideoDataSize} bytes, got {decompressedSize} bytes.", ObsLogLevel.Error);
if (decompressedSize != rawVideoDataSize)
Module.Log($"LZ4 decompression failed, expected {rawVideoDataSize} bytes, got {decompressedSize} bytes.", ObsLogLevel.Error);
}
// need to decompress QOI only
else if (videoHeader.Compression == Beam.CompressionTypes.Qoi)
Qoi.Decode(receivedFrameData, videoHeader.DataSize, rawDataBuffer, maxVideoDataSize);
// need to decompress JPEG only
else if (videoHeader.Compression == Beam.CompressionTypes.Jpeg)
{
//TODO: check the current global color format setting in OBS and output to BGRA or YUV based on that (YUV would be preferred, since JPEG is already YUV internally and NV12 the OBS default)
turboJpegDecompressToBGRA(receivedFrameData, (int)maxVideoDataSize, rawDataBuffer, (int)videoHeader.Width, (int)videoHeader.Height);
}
}

// process the frame
Expand All @@ -386,14 +443,12 @@ private async Task processDataLoopAsync(Socket? socket, NamedPipeClientStream? p
}
else
frameBuffer.ProcessFrame(new Beam.BeamVideoData(videoHeader, rawDataBuffer, frameReceivedTime));

long receiveLength = readResult.Buffer.Length; // remember this here, before the buffer is invalidated with the next line
pipeReader.AdvanceTo(readResult.Buffer.GetPosition(videoHeader.DataSize), readResult.Buffer.End);

// tell the sender the current video frame timestamp that was received
var timestampBuffer = pipeWriter.GetMemory(sizeof(ulong));
BinaryPrimitives.WriteUInt64LittleEndian(timestampBuffer.Span, senderVideoTimestamp);
pipeWriter.Advance(8);
// tell the sender the current video frame timestamp that was received - only done for frames that were not skipped by the sender
BinaryPrimitives.WriteUInt64LittleEndian(pipeWriter.GetSpan(sizeof(ulong)), senderVideoTimestamp);
pipeWriter.Advance(sizeof(ulong));
var writeResult = await pipeWriter.FlushAsync(cancellationToken);
if (writeResult.IsCanceled || writeResult.IsCompleted)
{
Expand Down Expand Up @@ -526,6 +581,7 @@ private async Task processDataLoopAsync(Socket? socket, NamedPipeClientStream? p
try { pipeStream.Dispose(); } catch (Exception ex) { Module.Log($"{ex.GetType().Name} when disposing pipe: {ex.Message}", ObsLogLevel.Error); }
}
Module.Log($"Disconnected from {endpointName}.", ObsLogLevel.Info);
turboJpegDecompressDestroy();
stream?.Close();
_isConnected = false;
OnDisconnected();
Expand Down
Loading

0 comments on commit 5e22dc9

Please sign in to comment.