Skip to content

Commit

Permalink
Replace global IPC transaction lock with thread tokens in each message
Browse files Browse the repository at this point in the history
  • Loading branch information
sgretscher committed Feb 11, 2024
1 parent 5e21d0a commit 2071b9a
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 129 deletions.
124 changes: 13 additions & 111 deletions TestHost/IPC/IPCMessageChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ class IPCReceivePort : public IPCMessagePort
ARA_INTERNAL_ASSERT (_sharedMemory != nullptr);
}

void runReceiveLoop (int32_t milliseconds)
bool runReceiveLoop (int32_t milliseconds)
{
const auto waitRequest { ::WaitForSingleObject (_dataAvailable, milliseconds) };
if (waitRequest == WAIT_TIMEOUT)
return;
return false;
ARA_INTERNAL_ASSERT (waitRequest == WAIT_OBJECT_0);

const auto messageID { _sharedMemory->messageID };
Expand All @@ -109,6 +109,7 @@ class IPCReceivePort : public IPCMessagePort
::SetEvent (_dataReceived);

_channel->routeReceivedMessage (messageID, decoder);
return true;
}

private:
Expand Down Expand Up @@ -174,9 +175,9 @@ class IPCReceivePort
CFRelease (_port);
}

void runReceiveLoop (int32_t milliseconds)
bool runReceiveLoop (int32_t milliseconds)
{
CFRunLoopRunInMode (kCFRunLoopDefaultMode, 0.001 * milliseconds, true);
return (CFRunLoopRunInMode (kCFRunLoopDefaultMode, 0.001 * milliseconds, true) != kCFRunLoopRunTimedOut);
}

private:
Expand Down Expand Up @@ -234,26 +235,6 @@ class IPCSendPort
CFMessagePortRef _port {};
};

sem_t* _openSemaphore (const std::string& channelID, bool create)
{
const auto previousUMask { umask (0) };

std::string semName { "/" };
semName += channelID;
if (semName.length () > PSHMNAMLEN - 1)
semName.erase (10, semName.length () - PSHMNAMLEN + 1);

auto result { sem_open (semName.c_str (), ((create) ? O_CREAT | O_EXCL : 0), S_IRUSR | S_IWUSR, 0) };
ARA_INTERNAL_ASSERT (result != SEM_FAILED);

if (!create)
sem_unlink (semName.c_str ());

umask (previousUMask);

return result;
}


//------------------------------------------------------------------------------
#endif
Expand All @@ -263,12 +244,6 @@ sem_t* _openSemaphore (const std::string& channelID, bool create)
IPCMessageChannel* IPCMessageChannel::createPublishingID (const std::string& channelID, ARA::IPC::ARAIPCMessageHandler* handler)
{
auto channel { new IPCMessageChannel { handler } };
#if defined (_WIN32)
channel->_transactionLock = ::CreateMutexA (NULL, FALSE, (std::string { "Transaction" } + channelID).c_str ());
ARA_INTERNAL_ASSERT (channel->_transactionLock != nullptr);
#elif defined (__APPLE__)
channel->_transactionLock = _openSemaphore (channelID, true);
#endif
channel->_sendPort = new IPCSendPort { channelID + ".from_server" };
channel->_receivePort = new IPCReceivePort { channelID + ".to_server", channel };
return channel;
Expand All @@ -279,72 +254,13 @@ IPCMessageChannel* IPCMessageChannel::createConnectedToID (const std::string& ch
auto channel { new IPCMessageChannel { handler } };
channel->_receivePort = new IPCReceivePort { channelID + ".from_server", channel };
channel->_sendPort = new IPCSendPort { channelID + ".to_server" };
#if defined (_WIN32)
channel->_transactionLock = ::CreateMutexA (NULL, FALSE, (std::string { "Transaction" } + channelID).c_str ());
ARA_INTERNAL_ASSERT (channel->_transactionLock != nullptr);
#elif defined (__APPLE__)
channel->_transactionLock = _openSemaphore (channelID, false);
#endif
channel->unlockTransaction ();
return channel;
}

IPCMessageChannel::~IPCMessageChannel ()
{
delete _sendPort;
delete _receivePort;

#if defined (_WIN32)
::CloseHandle (_transactionLock);
#elif defined (__APPLE__)
sem_close (_transactionLock);
#endif
}

void IPCMessageChannel::lockTransaction ()
{
if (std::this_thread::get_id () == _receiveThread)
{
// \todo in order to avoid this busy-wait, the other side would need to send us some "lock available" message
// upon unlock whenever a lock request was rejected - this thread could wait via some signal then retry...
#if defined (_WIN32)
while (true)
{
const auto waitWriteMutex { ::WaitForSingleObject (_transactionLock, 0) };
if (waitWriteMutex == WAIT_OBJECT_0)
break;

ARA_INTERNAL_ASSERT (waitWriteMutex == WAIT_TIMEOUT);
runReceiveLoop (1);
}
#elif defined (__APPLE__)
while (sem_trywait (_transactionLock) != 0)
{
if (errno != EINTR)
runReceiveLoop (1);
}
#endif
}
else
{
#if defined (_WIN32)
const auto waitWriteMutex { ::WaitForSingleObject (_transactionLock, messageTimeout) };
ARA_INTERNAL_ASSERT (waitWriteMutex == WAIT_OBJECT_0);
#elif defined (__APPLE__)
while (sem_wait (_transactionLock) != 0)
ARA_INTERNAL_ASSERT (errno == EINTR);
#endif
}
}

void IPCMessageChannel::unlockTransaction ()
{
#if defined (_WIN32)
::ReleaseMutex (_transactionLock);
#elif defined (__APPLE__)
auto result { sem_post (_transactionLock) };
ARA_INTERNAL_ASSERT (result == 0);
#endif
}

void IPCMessageChannel::_sendMessage (ARA::IPC::ARAIPCMessageID messageID, ARA::IPC::ARAIPCMessageEncoder* encoder)
Expand All @@ -355,9 +271,6 @@ void IPCMessageChannel::_sendMessage (ARA::IPC::ARAIPCMessageID messageID, ARA::
const auto messageData { static_cast<const IPCXMLMessageEncoder*> (encoder)->createEncodedMessage () };
#endif

if (messageID != 0)
_sendAwaitsMessage = true;

_sendPort->sendMessage (messageID, messageData);

#if defined (__APPLE__)
Expand All @@ -366,33 +279,22 @@ void IPCMessageChannel::_sendMessage (ARA::IPC::ARAIPCMessageID messageID, ARA::
#endif
}

void IPCMessageChannel::runReceiveLoop (int32_t milliseconds)
bool IPCMessageChannel::runReceiveLoop (int32_t milliseconds)
{
ARA_INTERNAL_ASSERT (std::this_thread::get_id () == _receiveThread);
_receivePort->runReceiveLoop (milliseconds);
return _receivePort->runReceiveLoop (milliseconds);
}

void IPCMessageChannel::_signalReceivedMessage (std::thread::id activeThread)
bool IPCMessageChannel::runsReceiveLoopOnCurrentThread ()
{
ARA_INTERNAL_ASSERT (std::this_thread::get_id () == _receiveThread);

if (activeThread == std::this_thread::get_id ())
_sendAwaitsMessage = false;
else
ARAIPCMessageChannel::_signalReceivedMessage (activeThread);
return (std::this_thread::get_id () == _receiveThread);
}

void IPCMessageChannel::_waitForReceivedMessage ()
void IPCMessageChannel::loopUntilMessageReceived ()
{
if (std::this_thread::get_id () == _receiveThread)
{
do
{
runReceiveLoop (messageTimeout);
} while (_sendAwaitsMessage);
}
else
ARAIPCMessageChannel::_waitForReceivedMessage ();
ARA_INTERNAL_ASSERT (std::this_thread::get_id () == _receiveThread);
while (!runReceiveLoop (messageTimeout))
{}
}

ARA::IPC::ARAIPCMessageEncoder* IPCMessageChannel::createEncoder ()
Expand Down
17 changes: 4 additions & 13 deletions TestHost/IPC/IPCMessageChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,29 +69,20 @@ class IPCMessageChannel : public ARA::IPC::ARAIPCMessageChannel

// message receiving
// waits up to the specified amount of milliseconds for an incoming event and processes it
void runReceiveLoop (int32_t milliseconds);
// returns true if some event was processed during that time
bool runReceiveLoop (int32_t milliseconds);

protected:
using ARA::IPC::ARAIPCMessageChannel::ARAIPCMessageChannel;

void lockTransaction () override;
void unlockTransaction () override;
void _sendMessage (ARA::IPC::ARAIPCMessageID messageID, ARA::IPC::ARAIPCMessageEncoder* encoder) override;
void _signalReceivedMessage (std::thread::id activeThread) override;
void _waitForReceivedMessage () override;
bool runsReceiveLoopOnCurrentThread () override;
void loopUntilMessageReceived () override;

private:
friend class IPCReceivePort;

std::thread::id _receiveThread { std::this_thread::get_id () };
bool _sendAwaitsMessage { false };
#if defined (_WIN32)
HANDLE
#elif defined (__APPLE__)
sem_t*
#endif
_transactionLock {}; // global lock shared between both processes, taken when starting a new transaction

IPCSendPort* _sendPort {};
IPCReceivePort* _receivePort {};
};
19 changes: 14 additions & 5 deletions TestHost/IPC/IPCXMLEncoding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ IPCXMLMessage::IPCXMLMessage (std::shared_ptr<pugi::xml_document> dictionary, pu

const char* IPCXMLMessage::_getEncodedKey (const MessageKey argKey)
{
ARA_INTERNAL_ASSERT (argKey >= 0);
static std::map<MessageKey, std::string> cache;
auto existingEntry { cache.find (argKey) };
if (existingEntry != cache.end ())
Expand Down Expand Up @@ -99,15 +98,11 @@ void IPCXMLMessageEncoder::appendBytes (const MessageKey argKey, const uint8_t*

pugi::xml_attribute IPCXMLMessageEncoder::_appendAttribute (const MessageKey argKey)
{
ARA_INTERNAL_ASSERT (argKey >= 0);

return _root.append_attribute (_getEncodedKey (argKey));
}

ARA::IPC::ARAIPCMessageEncoder* IPCXMLMessageEncoder::appendSubMessage (const MessageKey argKey)
{
ARA_INTERNAL_ASSERT (argKey >= 0);

return new IPCXMLMessageEncoder { _dictionary, _root.append_child (_getEncodedKey (argKey)) };
}

Expand Down Expand Up @@ -291,3 +286,17 @@ ARA::IPC::ARAIPCMessageDecoder* IPCXMLMessageDecoder::readSubMessage (const Mess

return new IPCXMLMessageDecoder { _dictionary, child };
}

bool IPCXMLMessageDecoder::hasDataForKey (const MessageKey argKey) const
{
ARA_INTERNAL_ASSERT (!_root.empty ());
const auto encodedKey { _getEncodedKey (argKey) };

if (!_root.attribute (encodedKey).empty ())
return true;

if (!_root.child (encodedKey).empty ())
return true;

return false;
}
1 change: 1 addition & 0 deletions TestHost/IPC/IPCXMLEncoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class IPCXMLMessageDecoder : public IPCXMLMessage, public ARA::IPC::ARAIPCMessag
bool readBytesSize (MessageKey argKey, size_t* argSize) const override;
void readBytes (MessageKey argKey, uint8_t* argValue) const override;
ARAIPCMessageDecoder* readSubMessage (const MessageKey argKey) const override;
bool hasDataForKey (MessageKey argKey) const override;

private:
using IPCXMLMessage::IPCXMLMessage;
Expand Down

0 comments on commit 2071b9a

Please sign in to comment.