From 58e307a69b9aa4d11732a0862c5b224ecc50a9f0 Mon Sep 17 00:00:00 2001 From: sgretscher <41306803+sgretscher@users.noreply.github.com> Date: Fri, 27 Oct 2023 10:34:24 +0200 Subject: [PATCH] Windows IPC implementation refactored to match macOS structure (2 separate ports/shard memory regions for send and receive) --- TestHost/IPC/IPCMessageChannel.cpp | 403 +++++++++++++++++------------ TestHost/IPC/IPCMessageChannel.h | 51 ++-- 2 files changed, 253 insertions(+), 201 deletions(-) diff --git a/TestHost/IPC/IPCMessageChannel.cpp b/TestHost/IPC/IPCMessageChannel.cpp index 62a469d..7d65272 100644 --- a/TestHost/IPC/IPCMessageChannel.cpp +++ b/TestHost/IPC/IPCMessageChannel.cpp @@ -39,169 +39,196 @@ constexpr auto messageTimeout { 5 * 60 * 1000 }; #endif + //------------------------------------------------------------------------------ #if defined (_WIN32) //------------------------------------------------------------------------------ -IPCMessageChannel::~IPCMessageChannel () + +class IPCMessagePort { - ::UnmapViewOfFile (_sharedMemory); - ::CloseHandle (_fileMapping); - ::CloseHandle (_dataReceived); - ::CloseHandle (_dataAvailable); - ::CloseHandle (_transactionLock); +protected: + IPCMessagePort (const std::string& channelID) + : _dataAvailable { ::CreateEventA (NULL, FALSE, FALSE, (std::string { "Available" } + channelID).c_str ()) }, + _dataReceived { ::CreateEventA (NULL, FALSE, FALSE, (std::string { "Received" } + channelID).c_str ()) } + { + ARA_INTERNAL_ASSERT (_dataAvailable != nullptr); + ARA_INTERNAL_ASSERT (_dataReceived != nullptr); + } - ARA_INTERNAL_ASSERT (_callbackLevel == 0); - ARA_INTERNAL_ASSERT (!_awaitsReply); - ARA_INTERNAL_ASSERT (_replyHandler == nullptr); -} +public: + ~IPCMessagePort () + { + ::UnmapViewOfFile (_sharedMemory); + ::CloseHandle (_fileMapping); + ::CloseHandle (_dataReceived); + ::CloseHandle (_dataAvailable); + } -IPCMessageChannel::IPCMessageChannel (const std::string& channelID) -{ - _transactionLock = ::CreateMutexA (NULL, FALSE, (std::string { "Transaction" } + channelID).c_str ()); - ARA_INTERNAL_ASSERT (_transactionLock != nullptr); - _dataAvailable = ::CreateEventA (NULL, FALSE, FALSE, (std::string { "Available" } + channelID).c_str ()); - ARA_INTERNAL_ASSERT (_dataAvailable != nullptr); - _dataReceived = ::CreateEventA (NULL, FALSE, FALSE, (std::string { "Received" } + channelID).c_str ()); - ARA_INTERNAL_ASSERT (_dataReceived != nullptr); -} +protected: + struct SharedMemory + { + static constexpr DWORD maxMessageSize { 4 * 1024 * 1024L - 64}; -IPCMessageChannel* IPCMessageChannel::createPublishingID (const std::string& channelID, const ReceiveCallback& callback) -{ - auto channel { new IPCMessageChannel { channelID } }; - channel->_receiveCallback = callback; + size_t messageSize; + ARA::IPC::ARAIPCMessageID messageID; + char messageData[maxMessageSize]; + }; - const auto mapKey { std::string { "Map" } + channelID }; - channel->_fileMapping = ::CreateFileMappingA (INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, sizeof (SharedMemory), mapKey.c_str ()); - ARA_INTERNAL_ASSERT (channel->_fileMapping != nullptr); - channel->_sharedMemory = (SharedMemory*) ::MapViewOfFile (channel->_fileMapping, FILE_MAP_WRITE, 0, 0, sizeof (SharedMemory)); - ARA_INTERNAL_ASSERT (channel->_sharedMemory != nullptr); + HANDLE _dataAvailable {}; // signal set by the sending side indicating new data has been placed in shared memory + HANDLE _dataReceived {}; // signal set by the receiving side when evaluating the shared memory + HANDLE _fileMapping {}; + SharedMemory* _sharedMemory {}; - return channel; -} +}; -IPCMessageChannel* IPCMessageChannel::createConnectedToID (const std::string& channelID, const ReceiveCallback& callback) +class IPCReceivePort : public IPCMessagePort { - auto channel { new IPCMessageChannel { channelID } }; - channel->_receiveCallback = callback; +public: + IPCReceivePort (const std::string& channelID, IPCMessageChannel* channel) + : IPCMessagePort { channelID }, + _channel { channel } + { + const auto mapKey { std::string { "Map" } + channelID }; + _fileMapping = ::CreateFileMappingA (INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, sizeof (SharedMemory), mapKey.c_str ()); + ARA_INTERNAL_ASSERT (_fileMapping != nullptr); + _sharedMemory = (SharedMemory*) ::MapViewOfFile (_fileMapping, FILE_MAP_WRITE, 0, 0, sizeof (SharedMemory)); + ARA_INTERNAL_ASSERT (_sharedMemory != nullptr); + } - const auto mapKey { std::string { "Map" } + channelID }; - while (!channel->_fileMapping) + void runReceiveLoop (int32_t milliseconds) { - ::Sleep (100); - channel->_fileMapping = ::OpenFileMappingA (FILE_MAP_WRITE, FALSE, mapKey.c_str ()); + const auto waitRequest { ::WaitForSingleObject (_dataAvailable, milliseconds) }; + if (waitRequest == WAIT_TIMEOUT) + return; + ARA_INTERNAL_ASSERT (waitRequest == WAIT_OBJECT_0); + + const auto messageID { _sharedMemory->messageID }; + const auto decoder { _channel->createDecoderForMessage (messageID, _sharedMemory->messageData, _sharedMemory->messageSize) }; + + ::SetEvent (_dataReceived); + + _channel->routeReceivedMessage (messageID, decoder); } - channel->_sharedMemory = (SharedMemory*) ::MapViewOfFile (channel->_fileMapping, FILE_MAP_WRITE, 0, 0, 0); - ARA_INTERNAL_ASSERT (channel->_sharedMemory != nullptr); - return channel; -} +private: + IPCMessageChannel* const _channel; +}; -void IPCMessageChannel::_lockTransaction (bool isOnCreationThread) +class IPCSendPort : public IPCMessagePort { - if (isOnCreationThread) +public: + IPCSendPort (const std::string& channelID) + : IPCMessagePort { channelID } { - while (true) + const auto mapKey { std::string { "Map" } + channelID }; + while (!_fileMapping) { - const auto waitWriteMutex { ::WaitForSingleObject (_transactionLock, 0) }; - if (waitWriteMutex == WAIT_OBJECT_0) - break; - - ARA_INTERNAL_ASSERT (waitWriteMutex == WAIT_TIMEOUT); - runReceiveLoop (1); + ::Sleep (100); + _fileMapping = ::OpenFileMappingA (FILE_MAP_WRITE, FALSE, mapKey.c_str ()); } + _sharedMemory = (SharedMemory*) ::MapViewOfFile (_fileMapping, FILE_MAP_WRITE, 0, 0, 0); + ARA_INTERNAL_ASSERT (_sharedMemory != nullptr); } - else + + void sendMessage (ARA::IPC::ARAIPCMessageID messageID, const std::string& messageData) { - const auto waitWriteMutex { ::WaitForSingleObject (_transactionLock, messageTimeout) }; - ARA_INTERNAL_ASSERT (waitWriteMutex == WAIT_OBJECT_0); - } -} + ARA_INTERNAL_ASSERT (messageData.size () <= SharedMemory::maxMessageSize); -void IPCMessageChannel::_unlockTransaction () -{ - ::ReleaseMutex (_transactionLock); -} + _sharedMemory->messageID = messageID; + _sharedMemory->messageSize = messageData.size (); + std::memcpy (_sharedMemory->messageData, messageData.c_str (), messageData.size ()); -void IPCMessageChannel::_sendMessage (ARA::IPC::ARAIPCMessageID messageID, ARA::IPC::ARAIPCMessageEncoder* encoder) -{ - const auto messageData { static_cast (encoder)->createEncodedMessage () }; - ARA_INTERNAL_ASSERT (messageData.size () <= SharedMemory::maxMessageSize); + ::SetEvent (_dataAvailable); + const auto waitResult { ::WaitForSingleObject (_dataReceived, messageTimeout) }; + ARA_INTERNAL_ASSERT (waitResult == WAIT_OBJECT_0); + } +}; - _readLock.lock (); - _sharedMemory->messageID = messageID; - _sharedMemory->messageSize = messageData.size (); - std::memcpy (_sharedMemory->messageData, messageData.c_str (), messageData.size ()); +//------------------------------------------------------------------------------ +#elif defined (__APPLE__) +//------------------------------------------------------------------------------ - ::SetEvent (_dataAvailable); - const auto waitResult { ::WaitForSingleObject (_dataReceived, messageTimeout) }; - ARA_INTERNAL_ASSERT (waitResult == WAIT_OBJECT_0); - _readLock.unlock (); -} -void IPCMessageChannel::runReceiveLoop (int32_t milliseconds) +class IPCReceivePort { - ARA_INTERNAL_ASSERT (std::this_thread::get_id () == _creationThreadID); +public: + IPCReceivePort (const std::string& portID, IPCMessageChannel* channel) + { + auto wrappedPortID { CFStringCreateWithCStringNoCopy (kCFAllocatorDefault, portID.c_str (), kCFStringEncodingASCII, kCFAllocatorNull) }; - const auto waitRequest { ::WaitForSingleObject (_dataAvailable, milliseconds) }; - if (waitRequest == WAIT_TIMEOUT) - return; - ARA_INTERNAL_ASSERT (waitRequest == WAIT_OBJECT_0); + CFMessagePortContext portContext { 0, channel, nullptr, nullptr, nullptr }; + _port = CFMessagePortCreateLocal (kCFAllocatorDefault, wrappedPortID, _portCallback, &portContext, nullptr); + ARA_INTERNAL_ASSERT (_port != nullptr); - if (!_readLock.try_lock ()) // if we're concurrently sending from another thread, back out and re-set _dataAvailable - { - ::SetEvent (_dataAvailable); - return; + CFRelease (wrappedPortID); + + CFRunLoopSourceRef runLoopSource { CFMessagePortCreateRunLoopSource (kCFAllocatorDefault, _port, 0) }; + CFRunLoopAddSource (CFRunLoopGetCurrent (), runLoopSource, kCFRunLoopDefaultMode); + CFRelease (runLoopSource); } - const auto messageID { _sharedMemory->messageID }; - ARA::IPC::ARAIPCMessageDecoder* decoder { nullptr }; - if ((messageID != 0) || _replyHandler) - decoder = IPCXMLMessageDecoder::createWithMessageData (_sharedMemory->messageData, _sharedMemory->messageSize); + ~IPCReceivePort () + { + CFMessagePortInvalidate (_port); + CFRelease (_port); + } - ::SetEvent (_dataReceived); + void runReceiveLoop (int32_t milliseconds) + { + CFRunLoopRunInMode (kCFRunLoopDefaultMode, 0.001 * milliseconds, true); + } - _routeReceivedMessage (messageID, decoder); - - _readLock.unlock (); -} +private: + static CFDataRef _portCallback (CFMessagePortRef /*port*/, SInt32 messageID, CFDataRef messageData, void* info) + { + auto channel { static_cast (info) }; + const auto decoder { channel->createDecoderForMessage (messageID, messageData) }; + channel->routeReceivedMessage (messageID, decoder); + return nullptr; + } -//------------------------------------------------------------------------------ -#elif defined (__APPLE__) -//------------------------------------------------------------------------------ +private: + CFMessagePortRef _port {}; +}; -IPCMessageChannel::~IPCMessageChannel () +class IPCSendPort { - CFMessagePortInvalidate (_sendPort); - CFRelease (_sendPort); - - CFMessagePortInvalidate (_receivePort); - CFRelease (_receivePort); +public: + IPCSendPort (const std::string& portID) + { + auto wrappedPortID { CFStringCreateWithCStringNoCopy (kCFAllocatorDefault, portID.c_str (), kCFStringEncodingASCII, kCFAllocatorNull) }; - sem_close (_transactionLock); + auto timeout { 0.001 * messageTimeout }; + while (timeout > 0.0) + { + if ((_port = CFMessagePortCreateRemote (kCFAllocatorDefault, wrappedPortID))) + break; - ARA_INTERNAL_ASSERT (_callbackLevel == 0); - ARA_INTERNAL_ASSERT (!_awaitsReply); - ARA_INTERNAL_ASSERT (_replyHandler == nullptr); -} + constexpr auto waitTime { 0.01 }; + CFRunLoopRunInMode (kCFRunLoopDefaultMode, waitTime, true); + timeout -= waitTime; + } + ARA_INTERNAL_ASSERT (_port != nullptr); -CFDataRef IPCMessageChannel::_portCallback (CFMessagePortRef /*port*/, SInt32 messageID, CFDataRef messageData, void* info) -{ - auto channel { static_cast (info) }; - ARA_INTERNAL_ASSERT (std::this_thread::get_id () == channel->_creationThreadID); + CFRelease (wrappedPortID); + } - ARA::IPC::ARAIPCMessageDecoder* decoder { nullptr }; - if ((messageID != 0) || channel->_replyHandler) -#if USE_ARA_CF_ENCODING - decoder = ARA::IPC::ARAIPCCFCreateMessageDecoder (messageData); -#else - decoder = IPCXMLMessageDecoder::createWithMessageData (messageData); -#endif + ~IPCSendPort () + { + CFMessagePortInvalidate (_port); + CFRelease (_port); + } - channel->_routeReceivedMessage (messageID, decoder); + void sendMessage (ARA::IPC::ARAIPCMessageID messageID, CFDataRef messageData) + { + const auto ARA_MAYBE_UNUSED_VAR (result) { CFMessagePortSendRequest (_port, messageID, messageData, 0.001 * messageTimeout, 0.0, nullptr, nullptr) }; + ARA_INTERNAL_ASSERT (result == kCFMessagePortSuccess); + } - return nullptr; -} +private: + CFMessagePortRef _port {}; +}; sem_t* _openSemaphore (const std::string& channelID, bool create) { @@ -223,94 +250,103 @@ sem_t* _openSemaphore (const std::string& channelID, bool create) return result; } -CFMessagePortRef __attribute__ ((cf_returns_retained)) IPCMessageChannel::_createMessagePortPublishingID (const std::string& portID, IPCMessageChannel* channel) -{ - auto wrappedPortID { CFStringCreateWithCStringNoCopy (kCFAllocatorDefault, portID.c_str (), kCFStringEncodingASCII, kCFAllocatorNull) }; - - CFMessagePortContext portContext { 0, channel, nullptr, nullptr, nullptr }; - auto result = CFMessagePortCreateLocal (kCFAllocatorDefault, wrappedPortID, _portCallback, &portContext, nullptr); - ARA_INTERNAL_ASSERT (result != nullptr); - - CFRelease (wrappedPortID); - - CFRunLoopSourceRef runLoopSource { CFMessagePortCreateRunLoopSource (kCFAllocatorDefault, result, 0) }; - CFRunLoopAddSource (CFRunLoopGetCurrent (), runLoopSource, kCFRunLoopDefaultMode); - CFRelease (runLoopSource); - - return result; -} - -CFMessagePortRef __attribute__ ((cf_returns_retained)) IPCMessageChannel::_createMessagePortConnectedToID (const std::string& portID) -{ - CFMessagePortRef result {}; - - auto wrappedPortID { CFStringCreateWithCStringNoCopy (kCFAllocatorDefault, portID.c_str (), kCFStringEncodingASCII, kCFAllocatorNull) }; - auto timeout { 0.001 * messageTimeout }; - while (timeout > 0.0) - { - if ((result = CFMessagePortCreateRemote (kCFAllocatorDefault, wrappedPortID))) - break; - - constexpr auto waitTime { 0.01 }; - CFRunLoopRunInMode (kCFRunLoopDefaultMode, waitTime, true); - timeout -= waitTime; - } - ARA_INTERNAL_ASSERT (result != nullptr); +//------------------------------------------------------------------------------ +#endif +//------------------------------------------------------------------------------ - CFRelease (wrappedPortID); - return result; -} +IPCMessageChannel::IPCMessageChannel (const ReceiveCallback& callback) +: _receiveCallback { callback } +{} IPCMessageChannel* IPCMessageChannel::createPublishingID (const std::string& channelID, const ReceiveCallback& callback) { - auto channel { new IPCMessageChannel }; - channel->_receiveCallback = callback; + auto channel { new IPCMessageChannel { callback } }; +#if defined (_WIN32) + channel->_transactionLock = ::CreateMutexA (NULL, FALSE, (std::string { "Transaction" } + channelID).c_str ()); + ARA_INTERNAL_ASSERT (channel->_transactionLock != nullptr); +#elif defined (__APPLE__) channel->_transactionLock = _openSemaphore (channelID, true); - channel->_sendPort = _createMessagePortConnectedToID (channelID + ".from_server"); - channel->_receivePort = _createMessagePortPublishingID (channelID + ".to_server", channel); +#endif + channel->_sendPort = new IPCSendPort { channelID + ".from_server" }; + channel->_receivePort = new IPCReceivePort { channelID + ".to_server", channel }; return channel; } IPCMessageChannel* IPCMessageChannel::createConnectedToID (const std::string& channelID, const ReceiveCallback& callback) { - auto channel { new IPCMessageChannel }; - channel->_receiveCallback = callback; - channel->_receivePort = _createMessagePortPublishingID (channelID + ".from_server", channel); - channel->_sendPort = _createMessagePortConnectedToID (channelID + ".to_server"); + auto channel { new IPCMessageChannel { callback } }; + channel->_receivePort = new IPCReceivePort { channelID + ".from_server", channel }; + channel->_sendPort = new IPCSendPort { channelID + ".to_server" }; +#if defined (_WIN32) + channel->_transactionLock = ::CreateMutexA (NULL, FALSE, (std::string { "Transaction" } + channelID).c_str ()); + ARA_INTERNAL_ASSERT (channel->_transactionLock != nullptr); +#elif defined (__APPLE__) channel->_transactionLock = _openSemaphore (channelID, false); +#endif channel->_unlockTransaction (); return channel; } -void IPCMessageChannel::runReceiveLoop (int32_t milliseconds) +IPCMessageChannel::~IPCMessageChannel () { - ARA_INTERNAL_ASSERT (std::this_thread::get_id () == _creationThreadID); - CFRunLoopRunInMode (kCFRunLoopDefaultMode, 0.001 * milliseconds, true); + delete _sendPort; + delete _receivePort; + +#if defined (_WIN32) + ::CloseHandle (_transactionLock); +#elif defined (__APPLE__) + sem_close (_transactionLock); +#endif + + ARA_INTERNAL_ASSERT (_callbackLevel == 0); + ARA_INTERNAL_ASSERT (!_awaitsReply); + ARA_INTERNAL_ASSERT (_replyHandler == nullptr); } void IPCMessageChannel::_lockTransaction (bool isOnCreationThread) { if (isOnCreationThread) { +#if defined (_WIN32) + while (true) + { + const auto waitWriteMutex { ::WaitForSingleObject (_transactionLock, 0) }; + if (waitWriteMutex == WAIT_OBJECT_0) + break; + + ARA_INTERNAL_ASSERT (waitWriteMutex == WAIT_TIMEOUT); + runReceiveLoop (1); + } +#elif defined (__APPLE__) while (sem_trywait (_transactionLock) != 0) { if (errno != EINTR) runReceiveLoop (1); } +#endif } else { +#if defined (_WIN32) + const auto waitWriteMutex { ::WaitForSingleObject (_transactionLock, messageTimeout) }; + ARA_INTERNAL_ASSERT (waitWriteMutex == WAIT_OBJECT_0); +#elif defined (__APPLE__) while (sem_wait (_transactionLock) != 0) ARA_INTERNAL_ASSERT (errno == EINTR); +#endif } } void IPCMessageChannel::_unlockTransaction () { +#if defined (_WIN32) + ::ReleaseMutex (_transactionLock); +#elif defined (__APPLE__) auto result { sem_post (_transactionLock) }; ARA_INTERNAL_ASSERT (result == 0); +#endif } void IPCMessageChannel::_sendMessage (ARA::IPC::ARAIPCMessageID messageID, ARA::IPC::ARAIPCMessageEncoder* encoder) @@ -321,19 +357,44 @@ void IPCMessageChannel::_sendMessage (ARA::IPC::ARAIPCMessageID messageID, ARA:: const auto messageData { static_cast (encoder)->createEncodedMessage () }; #endif - const auto ARA_MAYBE_UNUSED_VAR (portSendResult) { CFMessagePortSendRequest (_sendPort, messageID, messageData, 0.001 * messageTimeout, 0.0, nullptr, nullptr) }; - ARA_INTERNAL_ASSERT (portSendResult == kCFMessagePortSuccess); + _sendPort->sendMessage (messageID, messageData); +#if defined (__APPLE__) if (messageData) CFRelease (messageData); +#endif } -//------------------------------------------------------------------------------ +void IPCMessageChannel::runReceiveLoop (int32_t milliseconds) +{ + ARA_INTERNAL_ASSERT (std::this_thread::get_id () == _creationThreadID); + _receivePort->runReceiveLoop (milliseconds); +} + +#if defined (_WIN32) +const ARA::IPC::ARAIPCMessageDecoder* IPCMessageChannel::createDecoderForMessage (ARA::IPC::ARAIPCMessageID messageID, const char* data, const size_t dataSize) +{ + if ((messageID != 0) || _replyHandler) + return IPCXMLMessageDecoder::createWithMessageData (data, dataSize); + return nullptr; +} +#elif defined (__APPLE__) +const ARA::IPC::ARAIPCMessageDecoder* IPCMessageChannel::createDecoderForMessage (ARA::IPC::ARAIPCMessageID messageID, CFDataRef messageData) +{ + if ((messageID != 0) || _replyHandler) +#if USE_ARA_CF_ENCODING + return ARA::IPC::ARAIPCCFCreateMessageDecoder (messageData); +#else + return IPCXMLMessageDecoder::createWithMessageData (messageData); +#endif + return nullptr; +} #endif -//------------------------------------------------------------------------------ -void IPCMessageChannel::_routeReceivedMessage (ARA::IPC::ARAIPCMessageID messageID, const ARA::IPC::ARAIPCMessageDecoder* decoder) +void IPCMessageChannel::routeReceivedMessage (ARA::IPC::ARAIPCMessageID messageID, const ARA::IPC::ARAIPCMessageDecoder* decoder) { + ARA_INTERNAL_ASSERT (std::this_thread::get_id () == _creationThreadID); + if (messageID != 0) { _handleReceivedMessage (messageID, decoder); diff --git a/TestHost/IPC/IPCMessageChannel.h b/TestHost/IPC/IPCMessageChannel.h index 4c41910..154f384 100644 --- a/TestHost/IPC/IPCMessageChannel.h +++ b/TestHost/IPC/IPCMessageChannel.h @@ -24,7 +24,6 @@ #if defined (_WIN32) #include - #include #include #elif defined (__APPLE__) #include @@ -49,6 +48,10 @@ #endif +class IPCSendPort; +class IPCReceivePort; + + class IPCMessageChannel : public ARA::IPC::ARAIPCMessageChannel { public: @@ -75,47 +78,35 @@ class IPCMessageChannel : public ARA::IPC::ARAIPCMessageChannel // waits up to the specified amount of milliseconds for an incoming event and processes it void runReceiveLoop (int32_t milliseconds); + // callbacks for the _receivePort + const ARA::IPC::ARAIPCMessageDecoder* createDecoderForMessage (ARA::IPC::ARAIPCMessageID messageID, +#if defined (_WIN32) + const char* data, const size_t dataSize); +#elif defined (__APPLE__) + CFDataRef messageData); +#endif + void routeReceivedMessage (ARA::IPC::ARAIPCMessageID messageID, const ARA::IPC::ARAIPCMessageDecoder* decoder); + private: - IPCMessageChannel () = default; + IPCMessageChannel (const ReceiveCallback& callback); void _lockTransaction (bool isOnCreationThread); void _unlockTransaction (); void _sendMessage (ARA::IPC::ARAIPCMessageID messageID, ARA::IPC::ARAIPCMessageEncoder* encoder); - void _routeReceivedMessage (ARA::IPC::ARAIPCMessageID messageID, const ARA::IPC::ARAIPCMessageDecoder* decoder); void _handleReceivedMessage (ARA::IPC::ARAIPCMessageID messageID, const ARA::IPC::ARAIPCMessageDecoder* decoder); -#if defined (_WIN32) - explicit IPCMessageChannel (const std::string& channelID); -#elif defined (__APPLE__) - static CFDataRef _portCallback (CFMessagePortRef port, SInt32 messageID, CFDataRef messageData, void* info); - static CFMessagePortRef __attribute__ ((cf_returns_retained)) _createMessagePortPublishingID (const std::string& portID, IPCMessageChannel* channel); - static CFMessagePortRef __attribute__ ((cf_returns_retained)) _createMessagePortConnectedToID (const std::string& portID); -#endif private: #if defined (_WIN32) - struct SharedMemory - { - static constexpr DWORD maxMessageSize { 4 * 1024 * 1024L - 64}; - - size_t messageSize; - ARA::IPC::ARAIPCMessageID messageID; - char messageData[maxMessageSize]; - }; - - HANDLE _transactionLock {}; // global lock shared between both processes, taken when starting a new transaction - HANDLE _dataAvailable {}; // signal set by the sending side indicating new data has been placed in shared memory - HANDLE _dataReceived {}; // signal set by the receiving side when evaluating the shared memory - HANDLE _fileMapping {}; - SharedMemory* _sharedMemory {}; - std::recursive_mutex _readLock {}; // process-level lock blocking access to reading data - + HANDLE #elif defined (__APPLE__) - sem_t* _transactionLock {}; - CFMessagePortRef _sendPort {}; - CFMessagePortRef _receivePort {}; + sem_t* #endif + _transactionLock {}; // global lock shared between both processes, taken when starting a new transaction + + IPCSendPort* _sendPort {}; + IPCReceivePort* _receivePort {}; std::thread::id _creationThreadID { std::this_thread::get_id () }; - ReceiveCallback _receiveCallback {}; + ReceiveCallback const _receiveCallback {}; int32_t _callbackLevel { 0 }; bool _awaitsReply {}; ReplyHandler _replyHandler {};