Skip to content

Commit

Permalink
Switch threadpool implementation (#202)
Browse files Browse the repository at this point in the history
* Switch threadpool implementation

* replace sleep with select in a few places
  • Loading branch information
Jason Gauci committed Jul 22, 2019
1 parent 7bc8077 commit b6ec2e2
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 274 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@
[submodule "external/msgpack-c"]
path = external/msgpack-c
url = https://github.com/msgpack/msgpack-c.git
[submodule "external/ThreadPool"]
path = external/ThreadPool
url = https://github.com/progschj/ThreadPool.git
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ ENDIF()

include_directories(
external
external/ThreadPool
external/Catch2/single_include
external/cxxopts/include
external/msgpack-c/include
Expand Down
1 change: 1 addition & 0 deletions external/ThreadPool
Submodule ThreadPool added at 9a42ec
251 changes: 0 additions & 251 deletions external/ctpl_stl.h

This file was deleted.

18 changes: 15 additions & 3 deletions src/base/Headers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
#include <iostream>
#include <memory>
#include <mutex>
#include <set>
#include <optional>
#include <set>
#include <sstream>
#include <streambuf>
#include <string>
Expand All @@ -66,7 +66,7 @@
#include "json.hpp"
#include "sole.hpp"

#include "ctpl_stl.h"
#include "ThreadPool.h"

using namespace std;

Expand All @@ -76,7 +76,6 @@ using namespace google;
using namespace gflags;

using json = nlohmann::json;
using namespace ctpl;

// The ET protocol version supported by this binary
static const int PROTOCOL_VERSION = 4;
Expand Down Expand Up @@ -177,6 +176,19 @@ inline string protoToString(const T& t) {
}
return s;
}

inline bool waitOnSocketData(int fd) {
fd_set fdset;
FD_ZERO(&fdset);
FD_SET(fd, &fdset);
timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
VLOG(4) << "Before selecting sockFd";
FATAL_FAIL(select(fd + 1, &fdset, NULL, NULL, &tv));
return FD_ISSET(fd, &fdset);
}

} // namespace et

inline bool operator==(const google::protobuf::MessageLite& msg_a,
Expand Down
4 changes: 3 additions & 1 deletion src/base/RawSocketUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ void RawSocketUtils::writeAll(int fd, const char* buf, size_t count) {
void RawSocketUtils::readAll(int fd, char* buf, size_t count) {
size_t bytesRead = 0;
do {
if (!waitOnSocketData(fd)) {
continue;
}
int rc = ::read(fd, buf + bytesRead, count - bytesRead);
if (rc < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// This is fine, just keep retrying
usleep(100 * 1000);
continue;
}
throw std::runtime_error("Cannot read from raw socket");
Expand Down
8 changes: 4 additions & 4 deletions src/base/ServerConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ ServerConnection::ServerConnection(
const SocketEndpoint& _serverEndpoint)
: socketHandler(_socketHandler),
serverEndpoint(_serverEndpoint),
clientHandlerThreadPool(8) {
clientHandlerThreadPool(new ThreadPool(8)) {
socketHandler->listen(serverEndpoint);
}

Expand All @@ -20,14 +20,14 @@ bool ServerConnection::acceptNewConnection(int fd) {
}
VLOG(1) << "SERVER: got client socket fd: " << clientSocketFd;
lock_guard<std::recursive_mutex> guard(classMutex);
clientHandlerThreadPool.push(
[this, clientSocketFd](int id) { this->clientHandler(clientSocketFd); });
clientHandlerThreadPool->enqueue(
[this, clientSocketFd]() { this->clientHandler(clientSocketFd); });
return true;
}

void ServerConnection::shutdown() {
socketHandler->stopListening(serverEndpoint);
clientHandlerThreadPool.stop();
clientHandlerThreadPool.reset();
for (const auto& it : clientConnections) {
it.second->shutdown();
}
Expand Down
2 changes: 1 addition & 1 deletion src/base/ServerConnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class ServerConnection {
std::unordered_map<string, string> clientKeys;
std::unordered_map<string, shared_ptr<ServerClientConnection>>
clientConnections;
ctpl::thread_pool clientHandlerThreadPool;
std::unique_ptr<ThreadPool> clientHandlerThreadPool;
recursive_mutex classMutex;
mutex connectMutex;
};
Expand Down
Loading

0 comments on commit b6ec2e2

Please sign in to comment.