Skip to content

Commit

Permalink
Move IPC in TestHost to background thread (similar to AUv3)
Browse files Browse the repository at this point in the history
  • Loading branch information
sgretscher committed Feb 11, 2024
1 parent 2071b9a commit 66cd4ca
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 12 deletions.
2 changes: 2 additions & 0 deletions TestHost/CompanionAPIs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -547,10 +547,12 @@ class IPCPlugInEntry : public PlugInEntry, private RemoteLauncher, protected ARA
return true;
}

#if !USE_ARA_BACKGROUND_IPC
void idleThreadForDuration (int32_t milliseconds) override
{
static_cast<IPCMessageChannel*> (getMessageChannel ())->runReceiveLoop (milliseconds);
}
#endif

void initializeARA (ARA::ARAAssertFunction* /*assertFunctionAddress*/) override
{
Expand Down
93 changes: 81 additions & 12 deletions TestHost/IPC/IPCMessageChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
#include "IPC/IPCXMLEncoding.h"
#endif

#if defined (__APPLE__)
#if defined (_WIN32)
#include <chrono>
#elif defined (__APPLE__)
#include <sys/posix_shm.h>
#include <sys/stat.h>
#endif
Expand Down Expand Up @@ -94,14 +96,44 @@ class IPCReceivePort : public IPCMessagePort
ARA_INTERNAL_ASSERT (_fileMapping != nullptr);
_sharedMemory = (SharedMemory*) ::MapViewOfFile (_fileMapping, FILE_MAP_WRITE, 0, 0, sizeof (SharedMemory));
ARA_INTERNAL_ASSERT (_sharedMemory != nullptr);

#if USE_ARA_BACKGROUND_IPC
_receiveThread = new std::thread {
[this] () {
while (!_exitReceiveThread.load (std::memory_order_acquire))
runReceiveLoop (messageTimeout);
} };
#endif
}

#if USE_ARA_BACKGROUND_IPC
~IPCReceivePort ()
{
_exitReceiveThread.store (true, std::memory_order_release);
_receiveThread->join ();
delete _receiveThread;
}
#endif

bool runReceiveLoop (int32_t milliseconds)
{
const auto waitRequest { ::WaitForSingleObject (_dataAvailable, milliseconds) };
if (waitRequest == WAIT_TIMEOUT)
#if USE_ARA_BACKGROUND_IPC
const auto deadline { std::chrono::steady_clock::now () + std::chrono::milliseconds { milliseconds } };
while (true)
{
const auto waitResult { ::WaitForSingleObjectEx (_dataAvailable, milliseconds, true) };
if (waitResult == WAIT_OBJECT_0)
break;
ARA_INTERNAL_ASSERT ((waitResult == WAIT_IO_COMPLETION) || (waitResult == WAIT_TIMEOUT));
if ((waitResult == WAIT_TIMEOUT) || (std::chrono::steady_clock::now () >= deadline))
return false;
}
#else
const auto waitResult { ::WaitForSingleObject (_dataAvailable, milliseconds) };
if (waitResult == WAIT_TIMEOUT)
return false;
ARA_INTERNAL_ASSERT (waitRequest == WAIT_OBJECT_0);
ARA_INTERNAL_ASSERT (waitResult == WAIT_OBJECT_0);
#endif

const auto messageID { _sharedMemory->messageID };
const auto decoder { IPCXMLMessageDecoder::createWithMessageData (_sharedMemory->messageData, _sharedMemory->messageSize) };
Expand All @@ -114,6 +146,10 @@ class IPCReceivePort : public IPCMessagePort

private:
IPCMessageChannel* const _channel;
#if USE_ARA_BACKGROUND_IPC
std::thread* _receiveThread {};
std::atomic<bool> _exitReceiveThread { false };
#endif
};

class IPCSendPort : public IPCMessagePort
Expand Down Expand Up @@ -156,21 +192,46 @@ class IPCReceivePort
public:
IPCReceivePort (const std::string& portID, IPCMessageChannel* channel)
{
auto wrappedPortID { CFStringCreateWithCStringNoCopy (kCFAllocatorDefault, portID.c_str (), kCFStringEncodingASCII, kCFAllocatorNull) };
#if USE_ARA_BACKGROUND_IPC
auto receiveThreadReady { dispatch_semaphore_create (0) };

CFMessagePortContext portContext { 0, channel, nullptr, nullptr, nullptr };
_port = CFMessagePortCreateLocal (kCFAllocatorDefault, wrappedPortID, _portCallback, &portContext, nullptr);
ARA_INTERNAL_ASSERT (_port != nullptr);
_receiveThread = new std::thread { [&] ()
{
#endif

CFRelease (wrappedPortID);
auto wrappedPortID { CFStringCreateWithCStringNoCopy (kCFAllocatorDefault, portID.c_str (), kCFStringEncodingASCII, kCFAllocatorNull) };

CFMessagePortContext portContext { 0, channel, nullptr, nullptr, nullptr };
_port = CFMessagePortCreateLocal (kCFAllocatorDefault, wrappedPortID, _portCallback, &portContext, nullptr);
ARA_INTERNAL_ASSERT (_port != nullptr);

CFRelease (wrappedPortID);

auto runLoop { CFRunLoopGetCurrent () };
CFRunLoopSourceRef runLoopSource { CFMessagePortCreateRunLoopSource (kCFAllocatorDefault, _port, 0) };
CFRunLoopAddSource (runLoop, runLoopSource, kCFRunLoopDefaultMode);
CFRelease (runLoopSource);

#if USE_ARA_BACKGROUND_IPC
_receiveThreadLoop = runLoop;

CFRunLoopSourceRef runLoopSource { CFMessagePortCreateRunLoopSource (kCFAllocatorDefault, _port, 0) };
CFRunLoopAddSource (CFRunLoopGetCurrent (), runLoopSource, kCFRunLoopDefaultMode);
CFRelease (runLoopSource);
dispatch_semaphore_signal (receiveThreadReady);

CFRunLoopRun ();
} };

dispatch_semaphore_wait (receiveThreadReady, DISPATCH_TIME_FOREVER);
#endif
}

~IPCReceivePort ()
{
#if USE_ARA_BACKGROUND_IPC
CFRunLoopStop (_receiveThreadLoop);
_receiveThread->join ();
delete _receiveThread;
#endif

CFMessagePortInvalidate (_port);
CFRelease (_port);
}
Expand All @@ -195,6 +256,10 @@ class IPCReceivePort

private:
CFMessagePortRef _port {};
#if USE_ARA_BACKGROUND_IPC
std::thread* _receiveThread {};
CFRunLoopRef _receiveThreadLoop {};
#endif
};

class IPCSendPort
Expand Down Expand Up @@ -281,10 +346,13 @@ void IPCMessageChannel::_sendMessage (ARA::IPC::ARAIPCMessageID messageID, ARA::

bool IPCMessageChannel::runReceiveLoop (int32_t milliseconds)
{
#if !USE_ARA_BACKGROUND_IPC
ARA_INTERNAL_ASSERT (std::this_thread::get_id () == _receiveThread);
#endif
return _receivePort->runReceiveLoop (milliseconds);
}

#if !USE_ARA_BACKGROUND_IPC
bool IPCMessageChannel::runsReceiveLoopOnCurrentThread ()
{
return (std::this_thread::get_id () == _receiveThread);
Expand All @@ -296,6 +364,7 @@ void IPCMessageChannel::loopUntilMessageReceived ()
while (!runReceiveLoop (messageTimeout))
{}
}
#endif

ARA::IPC::ARAIPCMessageEncoder* IPCMessageChannel::createEncoder ()
{
Expand Down
12 changes: 12 additions & 0 deletions TestHost/IPC/IPCMessageChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
#endif


// run IPC either on main thread or on background thread
#ifndef USE_ARA_BACKGROUND_IPC
#define USE_ARA_BACKGROUND_IPC 1
#endif


class IPCSendPort;
class IPCReceivePort;

Expand Down Expand Up @@ -76,13 +82,19 @@ class IPCMessageChannel : public ARA::IPC::ARAIPCMessageChannel
using ARA::IPC::ARAIPCMessageChannel::ARAIPCMessageChannel;

void _sendMessage (ARA::IPC::ARAIPCMessageID messageID, ARA::IPC::ARAIPCMessageEncoder* encoder) override;

#if !USE_ARA_BACKGROUND_IPC
bool runsReceiveLoopOnCurrentThread () override;
void loopUntilMessageReceived () override;
#endif

private:
friend class IPCReceivePort;

#if !USE_ARA_BACKGROUND_IPC
std::thread::id _receiveThread { std::this_thread::get_id () };
#endif

IPCSendPort* _sendPort {};
IPCReceivePort* _receivePort {};
};

0 comments on commit 66cd4ca

Please sign in to comment.