Skip to content

Commit

Permalink
[Core] Replace the Plasma eventloop with boost::asio (ray-project#9431)
Browse files Browse the repository at this point in the history
  • Loading branch information
suquark committed Jul 20, 2020
1 parent 440c9c4 commit 4accc16
Show file tree
Hide file tree
Showing 38 changed files with 614 additions and 2,552 deletions.
50 changes: 18 additions & 32 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,6 @@ PROPAGATED_WINDOWS_DEFINES = ["ARROW_STATIC"]

PLASMA_COPTS = COPTS + select({
"@bazel_tools//src/conditions:windows": [
"-D" + "WIN32_REPLACE_FD_APIS",
"/FI" + "win32fd.h",
] + ["-D" + define for define in PROPAGATED_WINDOWS_DEFINES],
"//conditions:default": [
"-DARROW_USE_GLOG",
Expand All @@ -316,26 +314,38 @@ cc_library(
name = "plasma_client",
srcs = [
"src/ray/object_manager/plasma/client.cc",
"src/ray/object_manager/plasma/fling.cc",
"src/ray/object_manager/plasma/io.cc",
"src/ray/object_manager/plasma/connection.cc",
"src/ray/object_manager/plasma/malloc.cc",
"src/ray/object_manager/plasma/plasma.cc",
"src/ray/object_manager/plasma/protocol.cc",
],
"src/ray/object_manager/plasma/shared_memory.cc",
] + select({
"@bazel_tools//src/conditions:windows": [
],
"//conditions:default": [
"src/ray/object_manager/plasma/fling.cc",
],
}),
hdrs = [
"src/ray/object_manager/format/object_manager_generated.h",
"src/ray/object_manager/notification/object_store_notification_manager.h",
"src/ray/object_manager/plasma/client.h",
"src/ray/object_manager/plasma/common.h",
"src/ray/object_manager/plasma/compat.h",
"src/ray/object_manager/plasma/external_store.h",
"src/ray/object_manager/plasma/fling.h",
"src/ray/object_manager/plasma/io.h",
"src/ray/object_manager/plasma/connection.h",
"src/ray/object_manager/plasma/malloc.h",
"src/ray/object_manager/plasma/plasma.h",
"src/ray/object_manager/plasma/plasma_generated.h",
"src/ray/object_manager/plasma/protocol.h",
],
"src/ray/object_manager/plasma/shared_memory.h",
] + select({
"@bazel_tools//src/conditions:windows": [
],
"//conditions:default": [
"src/ray/object_manager/plasma/fling.h",
],
}),
copts = PLASMA_COPTS,
defines = select({
"@bazel_tools//src/conditions:windows": PROPAGATED_WINDOWS_DEFINES,
Expand All @@ -354,32 +364,10 @@ cc_library(
],
)

cc_library(
name = "ae",
srcs = [
"src/ray/thirdparty/ae/ae.c",
],
hdrs = [
"src/ray/thirdparty/ae/ae.h",
"src/ray/thirdparty/ae/ae_epoll.c",
"src/ray/thirdparty/ae/ae_evport.c",
"src/ray/thirdparty/ae/ae_kqueue.c",
"src/ray/thirdparty/ae/ae_select.c",
"src/ray/thirdparty/ae/config.h",
"src/ray/thirdparty/ae/zmalloc.h",
],
copts = PLASMA_COPTS,
strip_include_prefix = "src",
deps = [
":platform_shims",
],
)

cc_library(
name = "plasma_store_server_lib",
srcs = [
"src/ray/object_manager/plasma/dlmalloc.cc",
"src/ray/object_manager/plasma/events.cc",
"src/ray/object_manager/plasma/eviction_policy.cc",
"src/ray/object_manager/plasma/external_store.cc",
"src/ray/object_manager/plasma/plasma_allocator.cc",
Expand All @@ -388,7 +376,6 @@ cc_library(
"src/ray/object_manager/plasma/store_runner.cc",
],
hdrs = [
"src/ray/object_manager/plasma/events.h",
"src/ray/object_manager/plasma/eviction_policy.h",
"src/ray/object_manager/plasma/external_store.h",
"src/ray/object_manager/plasma/plasma_allocator.h",
Expand All @@ -401,7 +388,6 @@ cc_library(
linkopts = PLASMA_LINKOPTS,
strip_include_prefix = "src",
deps = [
":ae",
":plasma_client",
"@com_github_google_glog//:glog",
],
Expand Down
5 changes: 2 additions & 3 deletions src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/util/util.h"

namespace ray {

Expand Down Expand Up @@ -209,8 +208,8 @@ class ClientConnection : public ServerConnection {
/// ProcessClientMessage handler will be called.
void ProcessMessages();

private:
/// A private constructor for a node client connection.
protected:
/// A protected constructor for a node client connection.
ClientConnection(MessageHandler &message_handler, local_stream_socket &&socket,
const std::string &debug_label,
const std::vector<std::string> &message_type_enum_names,
Expand Down
2 changes: 2 additions & 0 deletions src/ray/common/status.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ namespace ray {
#define STATUS_CODE_UNEXPECTED_SYSTEM_EXIT "UnexpectedSystemExit"
#define STATUS_CODE_UNKNOWN "Unknown"
#define STATUS_CODE_NOT_FOUND "NotFound"
#define STATUS_CODE_DISCONNECTED "Disconnected"
// object store status
#define STATUS_CODE_OBJECT_EXISTS "ObjectExists"
#define STATUS_CODE_OBJECT_NOT_FOUND "ObjectNotFound"
Expand Down Expand Up @@ -92,6 +93,7 @@ std::string Status::CodeAsString() const {
{StatusCode::IntentionalSystemExit, STATUS_CODE_INTENTIONAL_SYSTEM_EXIT},
{StatusCode::UnexpectedSystemExit, STATUS_CODE_UNEXPECTED_SYSTEM_EXIT},
{StatusCode::NotFound, STATUS_CODE_NOT_FOUND},
{StatusCode::Disconnected, STATUS_CODE_DISCONNECTED},
{StatusCode::ObjectExists, STATUS_CODE_OBJECT_EXISTS},
{StatusCode::ObjectNotFound, STATUS_CODE_OBJECT_NOT_FOUND},
{StatusCode::ObjectAlreadySealed, STATUS_CODE_OBJECT_STORE_ALREADY_SEALED},
Expand Down
6 changes: 6 additions & 0 deletions src/ray/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ enum class StatusCode : char {
IntentionalSystemExit = 14,
UnexpectedSystemExit = 15,
NotFound = 16,
Disconnected = 17,
// object store status
ObjectExists = 21,
ObjectNotFound = 22,
Expand Down Expand Up @@ -173,6 +174,10 @@ class RAY_EXPORT Status {
return Status(StatusCode::NotFound, msg);
}

static Status Disconnected(const std::string &msg) {
return Status(StatusCode::Disconnected, msg);
}

static Status ObjectExists(const std::string &msg) {
return Status(StatusCode::ObjectExists, msg);
}
Expand Down Expand Up @@ -210,6 +215,7 @@ class RAY_EXPORT Status {
return code() == StatusCode::IntentionalSystemExit;
}
bool IsNotFound() const { return code() == StatusCode::NotFound; }
bool IsDisconnected() const { return code() == StatusCode::Disconnected; }
bool IsObjectExists() const { return code() == StatusCode::ObjectExists; }
bool IsObjectNotFound() const { return code() == StatusCode::ObjectNotFound; }
bool IsObjectAlreadySealed() const { return code() == StatusCode::ObjectAlreadySealed; }
Expand Down
Loading

0 comments on commit 4accc16

Please sign in to comment.