From 4accc169954a0188bdd70b16d58a834ee2c6274f Mon Sep 17 00:00:00 2001 From: "Siyuan (Ryans) Zhuang" Date: Mon, 20 Jul 2020 02:52:51 -0700 Subject: [PATCH] [Core] Replace the Plasma eventloop with boost::asio (#9431) --- BUILD.bazel | 50 +- src/ray/common/client_connection.h | 5 +- src/ray/common/status.cc | 2 + src/ray/common/status.h | 6 + src/ray/object_manager/plasma/client.cc | 147 +----- src/ray/object_manager/plasma/common.h | 2 +- src/ray/object_manager/plasma/compat.h | 21 + src/ray/object_manager/plasma/connection.cc | 155 ++++++ src/ray/object_manager/plasma/connection.h | 53 ++ src/ray/object_manager/plasma/dlmalloc.cc | 76 ++- src/ray/object_manager/plasma/events.cc | 107 ---- src/ray/object_manager/plasma/events.h | 108 ---- .../object_manager/plasma/eviction_policy.h | 2 + src/ray/object_manager/plasma/fling.cc | 64 +-- src/ray/object_manager/plasma/fling.h | 17 - src/ray/object_manager/plasma/io.cc | 259 ---------- src/ray/object_manager/plasma/io.h | 68 --- src/ray/object_manager/plasma/malloc.cc | 14 +- src/ray/object_manager/plasma/malloc.h | 7 +- src/ray/object_manager/plasma/plasma.cc | 40 -- src/ray/object_manager/plasma/plasma.h | 89 +--- src/ray/object_manager/plasma/protocol.cc | 39 +- src/ray/object_manager/plasma/protocol.h | 9 +- .../plasma/quota_aware_policy.cc | 1 + .../object_manager/plasma/shared_memory.cc | 55 +++ src/ray/object_manager/plasma/shared_memory.h | 32 ++ src/ray/object_manager/plasma/store.cc | 278 +++++------ src/ray/object_manager/plasma/store.h | 40 +- src/ray/object_manager/plasma/store_runner.cc | 24 +- src/ray/object_manager/plasma/store_runner.h | 4 +- src/ray/thirdparty/ae/ae.c | 465 ------------------ src/ray/thirdparty/ae/ae.h | 123 ----- src/ray/thirdparty/ae/ae_epoll.c | 137 ------ src/ray/thirdparty/ae/ae_evport.c | 320 ------------ src/ray/thirdparty/ae/ae_kqueue.c | 138 ------ src/ray/thirdparty/ae/ae_select.c | 110 ----- src/ray/thirdparty/ae/config.h | 54 -- src/ray/thirdparty/ae/zmalloc.h | 45 -- 38 files changed, 614 insertions(+), 2552 deletions(-) create mode 100644 src/ray/object_manager/plasma/connection.cc create mode 100644 src/ray/object_manager/plasma/connection.h delete mode 100644 src/ray/object_manager/plasma/events.cc delete mode 100644 src/ray/object_manager/plasma/events.h delete mode 100644 src/ray/object_manager/plasma/io.cc delete mode 100644 src/ray/object_manager/plasma/io.h create mode 100644 src/ray/object_manager/plasma/shared_memory.cc create mode 100644 src/ray/object_manager/plasma/shared_memory.h delete mode 100644 src/ray/thirdparty/ae/ae.c delete mode 100644 src/ray/thirdparty/ae/ae.h delete mode 100644 src/ray/thirdparty/ae/ae_epoll.c delete mode 100644 src/ray/thirdparty/ae/ae_evport.c delete mode 100644 src/ray/thirdparty/ae/ae_kqueue.c delete mode 100644 src/ray/thirdparty/ae/ae_select.c delete mode 100644 src/ray/thirdparty/ae/config.h delete mode 100644 src/ray/thirdparty/ae/zmalloc.h diff --git a/BUILD.bazel b/BUILD.bazel index b9b11693fa79e..3b56d67776100 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -296,8 +296,6 @@ PROPAGATED_WINDOWS_DEFINES = ["ARROW_STATIC"] PLASMA_COPTS = COPTS + select({ "@bazel_tools//src/conditions:windows": [ - "-D" + "WIN32_REPLACE_FD_APIS", - "/FI" + "win32fd.h", ] + ["-D" + define for define in PROPAGATED_WINDOWS_DEFINES], "//conditions:default": [ "-DARROW_USE_GLOG", @@ -316,12 +314,18 @@ cc_library( name = "plasma_client", srcs = [ "src/ray/object_manager/plasma/client.cc", - "src/ray/object_manager/plasma/fling.cc", - "src/ray/object_manager/plasma/io.cc", + "src/ray/object_manager/plasma/connection.cc", "src/ray/object_manager/plasma/malloc.cc", "src/ray/object_manager/plasma/plasma.cc", "src/ray/object_manager/plasma/protocol.cc", - ], + "src/ray/object_manager/plasma/shared_memory.cc", + ] + select({ + "@bazel_tools//src/conditions:windows": [ + ], + "//conditions:default": [ + "src/ray/object_manager/plasma/fling.cc", + ], + }), hdrs = [ "src/ray/object_manager/format/object_manager_generated.h", "src/ray/object_manager/notification/object_store_notification_manager.h", @@ -329,13 +333,19 @@ cc_library( "src/ray/object_manager/plasma/common.h", "src/ray/object_manager/plasma/compat.h", "src/ray/object_manager/plasma/external_store.h", - "src/ray/object_manager/plasma/fling.h", - "src/ray/object_manager/plasma/io.h", + "src/ray/object_manager/plasma/connection.h", "src/ray/object_manager/plasma/malloc.h", "src/ray/object_manager/plasma/plasma.h", "src/ray/object_manager/plasma/plasma_generated.h", "src/ray/object_manager/plasma/protocol.h", - ], + "src/ray/object_manager/plasma/shared_memory.h", + ] + select({ + "@bazel_tools//src/conditions:windows": [ + ], + "//conditions:default": [ + "src/ray/object_manager/plasma/fling.h", + ], + }), copts = PLASMA_COPTS, defines = select({ "@bazel_tools//src/conditions:windows": PROPAGATED_WINDOWS_DEFINES, @@ -354,32 +364,10 @@ cc_library( ], ) -cc_library( - name = "ae", - srcs = [ - "src/ray/thirdparty/ae/ae.c", - ], - hdrs = [ - "src/ray/thirdparty/ae/ae.h", - "src/ray/thirdparty/ae/ae_epoll.c", - "src/ray/thirdparty/ae/ae_evport.c", - "src/ray/thirdparty/ae/ae_kqueue.c", - "src/ray/thirdparty/ae/ae_select.c", - "src/ray/thirdparty/ae/config.h", - "src/ray/thirdparty/ae/zmalloc.h", - ], - copts = PLASMA_COPTS, - strip_include_prefix = "src", - deps = [ - ":platform_shims", - ], -) - cc_library( name = "plasma_store_server_lib", srcs = [ "src/ray/object_manager/plasma/dlmalloc.cc", - "src/ray/object_manager/plasma/events.cc", "src/ray/object_manager/plasma/eviction_policy.cc", "src/ray/object_manager/plasma/external_store.cc", "src/ray/object_manager/plasma/plasma_allocator.cc", @@ -388,7 +376,6 @@ cc_library( "src/ray/object_manager/plasma/store_runner.cc", ], hdrs = [ - "src/ray/object_manager/plasma/events.h", "src/ray/object_manager/plasma/eviction_policy.h", "src/ray/object_manager/plasma/external_store.h", "src/ray/object_manager/plasma/plasma_allocator.h", @@ -401,7 +388,6 @@ cc_library( linkopts = PLASMA_LINKOPTS, strip_include_prefix = "src", deps = [ - ":ae", ":plasma_client", "@com_github_google_glog//:glog", ], diff --git a/src/ray/common/client_connection.h b/src/ray/common/client_connection.h index 6916c72a651c0..24d8e161e9191 100644 --- a/src/ray/common/client_connection.h +++ b/src/ray/common/client_connection.h @@ -23,7 +23,6 @@ #include "ray/common/id.h" #include "ray/common/status.h" -#include "ray/util/util.h" namespace ray { @@ -209,8 +208,8 @@ class ClientConnection : public ServerConnection { /// ProcessClientMessage handler will be called. void ProcessMessages(); - private: - /// A private constructor for a node client connection. + protected: + /// A protected constructor for a node client connection. ClientConnection(MessageHandler &message_handler, local_stream_socket &&socket, const std::string &debug_label, const std::vector &message_type_enum_names, diff --git a/src/ray/common/status.cc b/src/ray/common/status.cc index 43e6cbbd343b0..b3e2eba50384b 100644 --- a/src/ray/common/status.cc +++ b/src/ray/common/status.cc @@ -50,6 +50,7 @@ namespace ray { #define STATUS_CODE_UNEXPECTED_SYSTEM_EXIT "UnexpectedSystemExit" #define STATUS_CODE_UNKNOWN "Unknown" #define STATUS_CODE_NOT_FOUND "NotFound" +#define STATUS_CODE_DISCONNECTED "Disconnected" // object store status #define STATUS_CODE_OBJECT_EXISTS "ObjectExists" #define STATUS_CODE_OBJECT_NOT_FOUND "ObjectNotFound" @@ -92,6 +93,7 @@ std::string Status::CodeAsString() const { {StatusCode::IntentionalSystemExit, STATUS_CODE_INTENTIONAL_SYSTEM_EXIT}, {StatusCode::UnexpectedSystemExit, STATUS_CODE_UNEXPECTED_SYSTEM_EXIT}, {StatusCode::NotFound, STATUS_CODE_NOT_FOUND}, + {StatusCode::Disconnected, STATUS_CODE_DISCONNECTED}, {StatusCode::ObjectExists, STATUS_CODE_OBJECT_EXISTS}, {StatusCode::ObjectNotFound, STATUS_CODE_OBJECT_NOT_FOUND}, {StatusCode::ObjectAlreadySealed, STATUS_CODE_OBJECT_STORE_ALREADY_SEALED}, diff --git a/src/ray/common/status.h b/src/ray/common/status.h index 885ad2bb8ecc0..d8fd18400334f 100644 --- a/src/ray/common/status.h +++ b/src/ray/common/status.h @@ -93,6 +93,7 @@ enum class StatusCode : char { IntentionalSystemExit = 14, UnexpectedSystemExit = 15, NotFound = 16, + Disconnected = 17, // object store status ObjectExists = 21, ObjectNotFound = 22, @@ -173,6 +174,10 @@ class RAY_EXPORT Status { return Status(StatusCode::NotFound, msg); } + static Status Disconnected(const std::string &msg) { + return Status(StatusCode::Disconnected, msg); + } + static Status ObjectExists(const std::string &msg) { return Status(StatusCode::ObjectExists, msg); } @@ -210,6 +215,7 @@ class RAY_EXPORT Status { return code() == StatusCode::IntentionalSystemExit; } bool IsNotFound() const { return code() == StatusCode::NotFound; } + bool IsDisconnected() const { return code() == StatusCode::Disconnected; } bool IsObjectExists() const { return code() == StatusCode::ObjectExists; } bool IsObjectNotFound() const { return code() == StatusCode::ObjectNotFound; } bool IsObjectAlreadySealed() const { return code() == StatusCode::ObjectAlreadySealed; } diff --git a/src/ray/object_manager/plasma/client.cc b/src/ray/object_manager/plasma/client.cc index f4af036c956f2..5fd5c7315f2cb 100644 --- a/src/ray/object_manager/plasma/client.cc +++ b/src/ray/object_manager/plasma/client.cc @@ -19,30 +19,7 @@ #include "ray/object_manager/plasma/client.h" -#include -#ifndef _WIN32 -#include -#endif -#include -#include -#ifndef _WIN32 -#include -#endif -#ifdef _WIN32 -#include -#else -#include -#include -#endif -#include -#include -#ifndef _WIN32 -#include -#endif -#include -#ifndef _WIN32 -#include -#endif +#include #include #include @@ -52,15 +29,14 @@ #include #include +#include + #include "arrow/buffer.h" -#include "arrow/util/thread_pool.h" -#include "ray/object_manager/plasma/common.h" -#include "ray/object_manager/plasma/fling.h" -#include "ray/object_manager/plasma/io.h" -#include "ray/object_manager/plasma/malloc.h" +#include "ray/object_manager/plasma/connection.h" #include "ray/object_manager/plasma/plasma.h" #include "ray/object_manager/plasma/protocol.h" +#include "ray/object_manager/plasma/shared_memory.h" #ifdef PLASMA_CUDA #include "arrow/gpu/cuda_api.h" @@ -164,62 +140,6 @@ struct ObjectInUseEntry { bool is_sealed; }; -class ClientMmapTableEntry { - public: - ClientMmapTableEntry(int fd, int64_t map_size) - : fd_(fd), pointer_(nullptr), length_(0) { - // We subtract kMmapRegionsGap from the length that was added - // in fake_mmap in malloc.h, to make map_size page-aligned again. - length_ = map_size - kMmapRegionsGap; -#ifdef _WIN32 - pointer_ = reinterpret_cast(MapViewOfFile(reinterpret_cast(fh_get(fd)), FILE_MAP_ALL_ACCESS, 0, 0, length_)); - // TODO(pcm): Don't fail here, instead return a Status. - if (pointer_ == NULL) { - RAY_LOG(FATAL) << "mmap failed"; - } -#else - pointer_ = reinterpret_cast( - mmap(NULL, length_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)); - // TODO(pcm): Don't fail here, instead return a Status. - if (pointer_ == MAP_FAILED) { - RAY_LOG(FATAL) << "mmap failed"; - } -#endif - close(fd); // Closing this fd has an effect on performance. - } - - ~ClientMmapTableEntry() { - // At this point it is safe to unmap the memory, as the PlasmaBuffer - // keeps the PlasmaClient (and therefore the ClientMmapTableEntry) - // alive until it is destroyed. - // We don't need to close the associated file, since it has - // already been closed in the constructor. - int r; -#ifdef _WIN32 - r = UnmapViewOfFile(pointer_) ? 0 : -1; -#else - r = munmap(pointer_, length_); -#endif - if (r != 0) { - RAY_LOG(ERROR) << "munmap returned " << r << ", errno = " << errno; - } - } - - uint8_t* pointer() { return pointer_; } - - int fd() { return fd_; } - - private: - /// The associated file descriptor on the client. - int fd_; - /// The result of mmap for this file descriptor. - uint8_t* pointer_; - /// The length of the memory-mapped file. - size_t length_; - - ARROW_DISALLOW_COPY_AND_ASSIGN(ClientMmapTableEntry); -}; - class PlasmaClient::Impl : public std::enable_shared_from_this { public: Impl(); @@ -271,8 +191,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this&)>& wrap_buffer, ObjectBuffer* object_buffers); - uint8_t* LookupOrMmap(int fd, int store_fd_val, int64_t map_size); - - uint8_t* LookupMmappedFile(int store_fd_val); + uint8_t* LookupMmappedFile(MEMFD_TYPE store_fd_val); void IncrementObjectCount(const ObjectID& object_id, PlasmaObject* object, bool is_sealed); - /// File descriptor of the Unix domain socket that connects to the store. + /// The boost::asio IO context for the client. + boost::asio::io_service main_service_; + /// The connection to the store service. std::shared_ptr store_conn_; /// Table of dlmalloc buffer files that have been memory mapped so far. This /// is a hash table mapping a file descriptor to a struct containing the /// address of the corresponding memory-mapped file. - std::unordered_map> mmap_table_; + std::unordered_map> mmap_table_; /// A hash table of the object IDs that are currently being used by this /// client. std::unordered_map> objects_in_use_; @@ -308,8 +228,6 @@ class PlasmaClient::Impl : public std::enable_shared_from_this deletion_cache_; - /// A queue of notification - std::deque> pending_notification_; /// A mutex which protects this class. std::recursive_mutex client_mutex_; @@ -334,11 +252,13 @@ PlasmaClient::Impl::~Impl() {} // If the file descriptor fd has been mmapped in this client process before, // return the pointer that was returned by mmap, otherwise mmap it and store the // pointer in a hash table. -uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_size) { +uint8_t* PlasmaClient::Impl::GetStoreFdAndMmap(MEMFD_TYPE store_fd_val, int64_t map_size) { auto entry = mmap_table_.find(store_fd_val); if (entry != mmap_table_.end()) { return entry->second->pointer(); } else { + MEMFD_TYPE fd; + RAY_CHECK_OK(store_conn_->RecvFd(&fd)); mmap_table_[store_fd_val] = std::unique_ptr(new ClientMmapTableEntry(fd, map_size)); return mmap_table_[store_fd_val]->pointer(); @@ -347,7 +267,7 @@ uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_ // Get a pointer to a file that we know has been memory mapped in this client // process before. -uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) { +uint8_t* PlasmaClient::Impl::LookupMmappedFile(MEMFD_TYPE store_fd_val) { auto entry = mmap_table_.find(store_fd_val); RAY_CHECK(entry != mmap_table_.end()); return entry->second->pointer(); @@ -360,17 +280,6 @@ bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) { return (elem != objects_in_use_.end()); } -int PlasmaClient::Impl::GetStoreFd(int store_fd) { - auto entry = mmap_table_.find(store_fd); - if (entry == mmap_table_.end()) { - int fd = recv_fd(store_conn_->fd); - RAY_CHECK(fd >= 0) << "recv not successful"; - return fd; - } else { - return entry->second->fd(); - } -} - void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id, PlasmaObject* object, bool is_sealed) { // Increment the count of the object to track the fact that it is being used. @@ -410,20 +319,19 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size, RAY_RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaCreateReply, &buffer)); ObjectID id; PlasmaObject object; - int store_fd; + MEMFD_TYPE store_fd; int64_t mmap_size; RAY_RETURN_NOT_OK( ReadCreateReply(buffer.data(), buffer.size(), &id, &object, &store_fd, &mmap_size)); // If the CreateReply included an error, then the store will not send a file // descriptor. if (device_num == 0) { - int fd = GetStoreFd(store_fd); RAY_CHECK(object.data_size == data_size); RAY_CHECK(object.metadata_size == metadata_size); // The metadata should come right after the data. RAY_CHECK(object.metadata_offset == object.data_offset + data_size); *data = std::make_shared( - shared_from_this(), LookupOrMmap(fd, store_fd, mmap_size) + object.data_offset, + shared_from_this(), GetStoreFdAndMmap(store_fd, mmap_size) + object.data_offset, data_size); // If plasma_create is being called from a transfer, then we will not copy the // metadata here. The metadata will be written along with the data streamed @@ -530,7 +438,7 @@ Status PlasmaClient::Impl::GetBuffers( std::vector received_object_ids(num_objects); std::vector object_data(num_objects); PlasmaObject* object; - std::vector store_fds; + std::vector store_fds; std::vector mmap_sizes; RAY_RETURN_NOT_OK(ReadGetReply(buffer.data(), buffer.size(), received_object_ids.data(), object_data.data(), num_objects, store_fds, mmap_sizes)); @@ -539,8 +447,7 @@ Status PlasmaClient::Impl::GetBuffers( // in the subsequent loop based on just the store file descriptor and without // having to know the relevant file descriptor received from recv_fd. for (size_t i = 0; i < store_fds.size(); i++) { - int fd = GetStoreFd(store_fds[i]); - LookupOrMmap(fd, store_fds[i], mmap_sizes[i]); + GetStoreFdAndMmap(store_fds[i], mmap_sizes[i]); } for (int64_t i = 0; i < num_objects; ++i) { @@ -813,16 +720,10 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name, int release_delay, int num_retries) { std::lock_guard guard(client_mutex_); - int fd = -1; - RAY_RETURN_NOT_OK(ConnectIpcSocketRetry(store_socket_name, num_retries, -1, &fd)); - store_conn_.reset(new StoreConn(fd)); - if (manager_socket_name != "") { - return Status::NotImplemented("plasma manager is no longer supported"); - } - if (release_delay != 0) { - RAY_LOG(WARNING) << "The release_delay parameter in PlasmaClient::Connect " - << "is deprecated"; - } + /// The local stream socket that connects to store. + ray::local_stream_socket socket(main_service_); + RAY_RETURN_NOT_OK(ray::ConnectSocketRetry(socket, store_socket_name)); + store_conn_.reset(new StoreConn(std::move(socket))); // Send a ConnectRequest to the store to get its memory capacity. RAY_RETURN_NOT_OK(SendConnectRequest(store_conn_)); std::vector buffer; diff --git a/src/ray/object_manager/plasma/common.h b/src/ray/object_manager/plasma/common.h index 591fa0e2a1123..fe081c0cf14fc 100644 --- a/src/ray/object_manager/plasma/common.h +++ b/src/ray/object_manager/plasma/common.h @@ -62,7 +62,7 @@ struct ObjectTableEntry { ~ObjectTableEntry(); /// Memory mapped file containing the object. - int fd; + MEMFD_TYPE fd; /// Device number. int device_num; /// Size of the underlying map. diff --git a/src/ray/object_manager/plasma/compat.h b/src/ray/object_manager/plasma/compat.h index 504b523da4f51..fef23659e6432 100644 --- a/src/ray/object_manager/plasma/compat.h +++ b/src/ray/object_manager/plasma/compat.h @@ -30,3 +30,24 @@ typedef __darwin_mach_port_t mach_port_t; mach_port_t pthread_mach_thread_np(pthread_t); #endif /* _MACH_PORT_T */ #endif /* __APPLE__ */ + +#ifdef _WIN32 +#ifndef _WINDOWS_ +#ifndef WIN32_LEAN_AND_MEAN // Sorry for the inconvenience. Please include any related + // headers you need manually. + // (https://stackoverflow.com/a/8294669) +#define WIN32_LEAN_AND_MEAN // Prevent inclusion of WinSock2.h +#endif // #ifndef WIN32_LEAN_AND_MEAN +#include // Force inclusion of WinGDI here to resolve name conflict +#endif // #ifndef _WINDOWS_ +#define MEMFD_TYPE HANDLE +#define INVALID_FD NULL +// https://docs.microsoft.com/en-us/windows/win32/winauto/32-bit-and-64-bit-interoperability +#define FD2INT(x) (static_cast(reinterpret_cast(x))) +#define INT2FD(x) (reinterpret_cast(static_cast(x))) +#else +#define MEMFD_TYPE int +#define INVALID_FD -1 +#define FD2INT(x) (x) +#define INT2FD(x) (x) +#endif // #ifndef _WIN32 diff --git a/src/ray/object_manager/plasma/connection.cc b/src/ray/object_manager/plasma/connection.cc new file mode 100644 index 0000000000000..3eff5d068a403 --- /dev/null +++ b/src/ray/object_manager/plasma/connection.cc @@ -0,0 +1,155 @@ +#include "ray/object_manager/plasma/connection.h" + +#include "ray/object_manager/format/object_manager_generated.h" +#ifndef _WIN32 +#include "ray/object_manager/plasma/fling.h" +#endif +#include "ray/object_manager/plasma/plasma_generated.h" +#include "ray/object_manager/plasma/protocol.h" +#include "ray/util/logging.h" + +namespace plasma { + +using ray::Status; + +namespace fb = ray::object_manager::protocol; + +std::ostream &operator<<(std::ostream &os, const std::shared_ptr &client) { + os << std::to_string(client->GetNativeHandle()); + return os; +} + +std::ostream &operator<<(std::ostream &os, const std::shared_ptr &store_conn) { + os << std::to_string(store_conn->GetNativeHandle()); + return os; +} + +namespace { + +const std::vector GenerateEnumNames(const char *const *enum_names_ptr, + int start_index, int end_index) { + std::vector enum_names; + for (int i = 0; i < start_index; ++i) { + enum_names.push_back("EmptyMessageType"); + } + size_t i = 0; + while (true) { + const char *name = enum_names_ptr[i]; + if (name == nullptr) { + break; + } + enum_names.push_back(name); + i++; + } + RAY_CHECK(static_cast(end_index) == enum_names.size() - 1) + << "Message Type mismatch!"; + return enum_names; +} + +static const std::vector object_store_message_enum = + GenerateEnumNames(flatbuf::EnumNamesMessageType(), + static_cast(MessageType::MIN), + static_cast(MessageType::MAX)); +} // namespace + +Client::Client(ray::MessageHandler &message_handler, ray::local_stream_socket &&socket) + : ray::ClientConnection(message_handler, std::move(socket), "worker", + object_store_message_enum, + static_cast(MessageType::PlasmaDisconnectClient)) {} + +std::shared_ptr Client::Create(PlasmaStoreMessageHandler message_handler, + ray::local_stream_socket &&socket) { + ray::MessageHandler ray_message_handler = [message_handler]( + std::shared_ptr client, + int64_t message_type, const std::vector &message) { + Status s = message_handler( + std::static_pointer_cast(client->shared_ClientConnection_from_this()), + (MessageType)message_type, message); + if (!s.ok()) { + if (!s.IsDisconnected()) { + RAY_LOG(ERROR) << "Fail to process client message. " << s.ToString(); + } + client->Close(); + } else { + client->ProcessMessages(); + } + }; + std::shared_ptr self(new Client(ray_message_handler, std::move(socket))); + // Let our manager process our new connection. + self->ProcessMessages(); + return self; +} + +Status Client::SendFd(MEMFD_TYPE fd) { + // Only send the file descriptor if it hasn't been sent (see analogous + // logic in GetStoreFd in client.cc). + if (used_fds_.find(fd) == used_fds_.end()) { +#ifdef _WIN32 + DWORD target_pid; + RAY_RETURN_NOT_OK(ReadBuffer({boost::asio::buffer(&target_pid, sizeof(target_pid))})); + if (!target_pid) { + return Status::Invalid("Received invalid PID"); + } + /* This is a regular handle... fit it into the same struct */ + HANDLE target_process = OpenProcess(PROCESS_DUP_HANDLE, FALSE, target_pid); + if (!target_process) { + return Status::Invalid("Cannot open PID = " + std::to_string(target_pid)); + } + HANDLE target_handle = NULL; + bool success = DuplicateHandle(GetCurrentProcess(), fd, target_process, + &target_handle, 0, TRUE, DUPLICATE_SAME_ACCESS); + if (!success) { + // TODO(suquark): Define better error type. + return Status::IOError("Fail to duplicate handle to PID = " + std::to_string(target_pid)); + } + Status s = WriteBuffer({boost::asio::buffer(&target_handle, sizeof(target_handle))}); + if (!s.ok()) { + /* we failed to send the handle, and it needs cleaning up! */ + HANDLE duplicated_back = NULL; + if (DuplicateHandle(target_process, fd, GetCurrentProcess(), + &duplicated_back, 0, FALSE, DUPLICATE_CLOSE_SOURCE)) { + CloseHandle(duplicated_back); + } + CloseHandle(target_process); + return s; + } + CloseHandle(target_process); +#else + auto ec = send_fd(GetNativeHandle(), fd); + if (ec <= 0) { + if (ec == 0) { + return Status::IOError("Encountered unexpected EOF"); + } else { + return Status::IOError("Unknown I/O Error"); + } + } +#endif + used_fds_.insert(fd); // Succeed, record the fd. + } + return Status::OK(); +} + +StoreConn::StoreConn(ray::local_stream_socket &&socket) : + ray::ServerConnection(std::move(socket)) {} + +Status StoreConn::RecvFd(MEMFD_TYPE *fd) { +#ifdef _WIN32 + DWORD pid = GetCurrentProcessId(); + Status s = WriteBuffer({boost::asio::buffer(&pid, sizeof(pid))}); + if (!s.ok()) { + return Status::IOError("Failed to send PID."); + } + s = ReadBuffer({boost::asio::buffer(fd, sizeof(*fd))}); + if (!s.ok()) { + return Status::IOError("Failed to receive the handle."); + } +#else + *fd = recv_fd(GetNativeHandle()); + if (*fd < 0) { + return Status::IOError("Failed to receive the fd."); + } +#endif + return Status::OK(); +} + +} // namespace plasma diff --git a/src/ray/object_manager/plasma/connection.h b/src/ray/object_manager/plasma/connection.h new file mode 100644 index 0000000000000..cda267deb1d85 --- /dev/null +++ b/src/ray/object_manager/plasma/connection.h @@ -0,0 +1,53 @@ +#pragma once + +#include "ray/common/client_connection.h" +#include "ray/common/id.h" +#include "ray/common/status.h" +#include "ray/object_manager/plasma/compat.h" + +namespace plasma { + +namespace flatbuf { + enum class MessageType : int64_t; +} + +class Client; + +using PlasmaStoreMessageHandler = + std::function, flatbuf::MessageType, const std::vector&)>; + +/// Contains all information that is associated with a Plasma store client. +class Client : public ray::ClientConnection { + public: + static std::shared_ptr Create( + PlasmaStoreMessageHandler message_handler, ray::local_stream_socket &&socket); + + ray::Status SendFd(MEMFD_TYPE fd); + + /// Object ids that are used by this client. + std::unordered_set object_ids; + + std::string name = "anonymous_client"; + + private: + Client(ray::MessageHandler &message_handler, ray::local_stream_socket &&socket); + /// File descriptors that are used by this client. + std::unordered_set used_fds_; +}; + +std::ostream &operator<<(std::ostream &os, const std::shared_ptr &client); + +/// Contains all information that is associated with a Plasma store client. +class StoreConn : public ray::ServerConnection { + public: + StoreConn(ray::local_stream_socket &&socket); + + /// Receive a file descriptor for the store. + /// + /// \return A file descriptor. + ray::Status RecvFd(MEMFD_TYPE *fd); +}; + +std::ostream &operator<<(std::ostream &os, const std::shared_ptr &store_conn); + +} // namespace plasma diff --git a/src/ray/object_manager/plasma/dlmalloc.cc b/src/ray/object_manager/plasma/dlmalloc.cc index 772a33c2380d3..1c8882726c50e 100644 --- a/src/ray/object_manager/plasma/dlmalloc.cc +++ b/src/ray/object_manager/plasma/dlmalloc.cc @@ -22,9 +22,8 @@ #include #include #include -#ifdef _WIN32 -#include -#else + +#ifndef _WIN32 #include #include #endif @@ -32,7 +31,6 @@ #include #include -#include "ray/object_manager/plasma/common.h" #include "ray/object_manager/plasma/plasma.h" namespace plasma { @@ -70,77 +68,66 @@ static void* pointer_advance(void* p, ptrdiff_t n) { return (unsigned char*)p + static void* pointer_retreat(void* p, ptrdiff_t n) { return (unsigned char*)p - n; } -// Create a buffer. This is creating a temporary file and then -// immediately unlinking it so we do not leave traces in the system. -int create_buffer(int64_t size) { - int fd; - std::string file_template = plasma_config->directory; #ifdef _WIN32 - HANDLE h = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, +void create_and_mmap_buffer(int64_t size, void **pointer, HANDLE* handle) { + *handle = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, (DWORD)((uint64_t)size >> (CHAR_BIT * sizeof(DWORD))), (DWORD)(uint64_t)size, NULL); - if (h) { - fd = fh_open(reinterpret_cast(h), -1); - } else { - fd = -1; + RAY_CHECK(*handle != NULL) << "Failed to create buffer during mmap"; + *pointer = MapViewOfFile(*handle, FILE_MAP_ALL_ACCESS, 0, 0, (size_t)size); + if (*pointer == NULL) { + RAY_LOG(ERROR) << "MapViewOfFile failed with error: " << GetLastError(); } +} #else +void create_and_mmap_buffer(int64_t size, void **pointer, int* fd) { + // Create a buffer. This is creating a temporary file and then + // immediately unlinking it so we do not leave traces in the system. + std::string file_template = plasma_config->directory; file_template += "/plasmaXXXXXX"; std::vector file_name(file_template.begin(), file_template.end()); file_name.push_back('\0'); - fd = mkstemp(&file_name[0]); - if (fd < 0) { + *fd = mkstemp(&file_name[0]); + if (*fd < 0) { RAY_LOG(FATAL) << "create_buffer failed to open file " << &file_name[0]; - return -1; } // Immediately unlink the file so we do not leave traces in the system. if (unlink(&file_name[0]) != 0) { RAY_LOG(FATAL) << "failed to unlink file " << &file_name[0]; - return -1; } if (!plasma_config->hugepages_enabled) { // Increase the size of the file to the desired size. This seems not to be // needed for files that are backed by the huge page fs, see also // http://www.mail-archive.com/kvm-devel@lists.sourceforge.net/msg14737.html - if (ftruncate(fd, (off_t)size) != 0) { + if (ftruncate(*fd, (off_t)size) != 0) { RAY_LOG(FATAL) << "failed to ftruncate file " << &file_name[0]; - return -1; } } -#endif - return fd; -} -void* fake_mmap(size_t size) { - // Add kMmapRegionsGap so that the returned pointer is deliberately not - // page-aligned. This ensures that the segments of memory returned by - // fake_mmap are never contiguous. - size += kMmapRegionsGap; - - int fd = create_buffer(size); - RAY_CHECK(fd >= 0) << "Failed to create buffer during mmap"; // MAP_POPULATE can be used to pre-populate the page tables for this memory region // which avoids work when accessing the pages later. However it causes long pauses // when mmapping the files. Only supported on Linux. - void* pointer; -#ifdef _WIN32 - pointer = MapViewOfFile(reinterpret_cast(fh_get(fd)), FILE_MAP_ALL_ACCESS, 0, 0, size); - if (pointer == NULL) { - RAY_LOG(ERROR) << "MapViewOfFile failed with error: " << GetLastError(); - return reinterpret_cast(-1); - } -#else - pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - if (pointer == MAP_FAILED) { + *pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, *fd, 0); + if (*pointer == MAP_FAILED) { RAY_LOG(ERROR) << "mmap failed with error: " << std::strerror(errno); if (errno == ENOMEM && plasma_config->hugepages_enabled) { RAY_LOG(ERROR) << " (this probably means you have to increase /proc/sys/vm/nr_hugepages)"; } - return pointer; } +} #endif +void* fake_mmap(size_t size) { + // Add kMmapRegionsGap so that the returned pointer is deliberately not + // page-aligned. This ensures that the segments of memory returned by + // fake_mmap are never contiguous. + size += kMmapRegionsGap; + + void* pointer; + MEMFD_TYPE fd; + create_and_mmap_buffer(size, &pointer, &fd); + // Increase dlmalloc's allocation granularity directly. mparams.granularity *= GRANULARITY_MULTIPLIER; @@ -170,12 +157,15 @@ int fake_munmap(void* addr, int64_t size) { int r; #ifdef _WIN32 r = UnmapViewOfFile(addr) ? 0 : -1; + if (r == 0) { + CloseHandle(entry->second.fd); + } #else r = munmap(addr, size); -#endif if (r == 0) { close(entry->second.fd); } +#endif mmap_records.erase(entry); return r; diff --git a/src/ray/object_manager/plasma/events.cc b/src/ray/object_manager/plasma/events.cc deleted file mode 100644 index 027bc0de3daa8..0000000000000 --- a/src/ray/object_manager/plasma/events.cc +++ /dev/null @@ -1,107 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "ray/object_manager/plasma/events.h" - -#include - -#include - -extern "C" { -#include "ray/thirdparty/ae/ae.h" -} - -namespace plasma { - -// Verify that the constants defined in events.h are defined correctly. -static_assert(kEventLoopTimerDone == AE_NOMORE, "constant defined incorrectly"); -static_assert(kEventLoopOk == AE_OK, "constant defined incorrectly"); -static_assert(kEventLoopRead == AE_READABLE, "constant defined incorrectly"); -static_assert(kEventLoopWrite == AE_WRITABLE, "constant defined incorrectly"); - -void EventLoop::FileEventCallback(aeEventLoop* loop, int fd, void* context, int events) { - FileCallback* callback = reinterpret_cast(context); - (*callback)(events); -} - -int EventLoop::TimerEventCallback(aeEventLoop* loop, TimerID timer_id, void* context) { - TimerCallback* callback = reinterpret_cast(context); - return (*callback)(timer_id); -} - -constexpr int kInitialEventLoopSize = 1024; - -EventLoop::EventLoop() { loop_ = aeCreateEventLoop(kInitialEventLoopSize); } - -bool EventLoop::AddFileEvent(int fd, int events, const FileCallback& callback) { - if (file_callbacks_.find(fd) != file_callbacks_.end()) { - return false; - } - auto data = std::unique_ptr(new FileCallback(callback)); - void* context = reinterpret_cast(data.get()); - // Try to add the file descriptor. - int err = aeCreateFileEvent(loop_, fd, events, EventLoop::FileEventCallback, context); - // If it cannot be added, increase the size of the event loop. - if (err == AE_ERR && errno == ERANGE) { - err = aeResizeSetSize(loop_, 3 * aeGetSetSize(loop_) / 2); - if (err != AE_OK) { - return false; - } - err = aeCreateFileEvent(loop_, fd, events, EventLoop::FileEventCallback, context); - } - // In any case, test if there were errors. - if (err == AE_OK) { - file_callbacks_.emplace(fd, std::move(data)); - return true; - } - return false; -} - -void EventLoop::RemoveFileEvent(int fd) { - aeDeleteFileEvent(loop_, fd, AE_READABLE | AE_WRITABLE); - file_callbacks_.erase(fd); -} - -void EventLoop::Start() { aeMain(loop_); } - -void EventLoop::Stop() { aeStop(loop_); } - -void EventLoop::Shutdown() { - if (loop_ != nullptr) { - aeDeleteEventLoop(loop_); - loop_ = nullptr; - } -} - -EventLoop::~EventLoop() { Shutdown(); } - -int64_t EventLoop::AddTimer(int64_t timeout, const TimerCallback& callback) { - auto data = std::unique_ptr(new TimerCallback(callback)); - void* context = reinterpret_cast(data.get()); - int64_t timer_id = - aeCreateTimeEvent(loop_, timeout, EventLoop::TimerEventCallback, context, NULL); - timer_callbacks_.emplace(timer_id, std::move(data)); - return timer_id; -} - -int EventLoop::RemoveTimer(int64_t timer_id) { - int err = aeDeleteTimeEvent(loop_, timer_id); - timer_callbacks_.erase(timer_id); - return err; -} - -} // namespace plasma diff --git a/src/ray/object_manager/plasma/events.h b/src/ray/object_manager/plasma/events.h deleted file mode 100644 index 7b08d44432ff6..0000000000000 --- a/src/ray/object_manager/plasma/events.h +++ /dev/null @@ -1,108 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include - -struct aeEventLoop; - -namespace plasma { - -// The constants below are defined using hardcoded values taken from ae.h so -// that ae.h does not need to be included in this file. - -/// Constant specifying that the timer is done and it will be removed. -constexpr int kEventLoopTimerDone = -1; // AE_NOMORE - -/// A successful status. -constexpr int kEventLoopOk = 0; // AE_OK - -/// Read event on the file descriptor. -constexpr int kEventLoopRead = 1; // AE_READABLE - -/// Write event on the file descriptor. -constexpr int kEventLoopWrite = 2; // AE_WRITABLE - -typedef long long TimerID; // NOLINT - -class EventLoop { - public: - // Signature of the handler that will be called when there is a new event - // on the file descriptor that this handler has been registered for. - // - // The arguments are the event flags (read or write). - using FileCallback = std::function; - - // This handler will be called when a timer times out. The timer id is - // passed as an argument. The return is the number of milliseconds the timer - // shall be reset to or kEventLoopTimerDone if the timer shall not be - // triggered again. - using TimerCallback = std::function; - - EventLoop(); - - ~EventLoop(); - - /// Add a new file event handler to the event loop. - /// - /// \param fd The file descriptor we are listening to. - /// \param events The flags for events we are listening to (read or write). - /// \param callback The callback that will be called when the event happens. - /// \return Returns true if the event handler was added successfully. - bool AddFileEvent(int fd, int events, const FileCallback& callback); - - /// Remove a file event handler from the event loop. - /// - /// \param fd The file descriptor of the event handler. - void RemoveFileEvent(int fd); - - /// Register a handler that will be called after a time slice of - /// "timeout" milliseconds. - /// - /// \param timeout The timeout in milliseconds. - /// \param callback The callback for the timeout. - /// \return The ID of the newly created timer. - int64_t AddTimer(int64_t timeout, const TimerCallback& callback); - - /// Remove a timer handler from the event loop. - /// - /// \param timer_id The ID of the timer that is to be removed. - /// \return The ae.c error code. TODO(pcm): needs to be standardized - int RemoveTimer(int64_t timer_id); - - /// \brief Run the event loop. - void Start(); - - /// \brief Stop the event loop - void Stop(); - - void Shutdown(); - - private: - static void FileEventCallback(aeEventLoop* loop, int fd, void* context, int events); - - static int TimerEventCallback(aeEventLoop* loop, TimerID timer_id, void* context); - - aeEventLoop* loop_; - std::unordered_map> file_callbacks_; - std::unordered_map> timer_callbacks_; -}; - -} // namespace plasma diff --git a/src/ray/object_manager/plasma/eviction_policy.h b/src/ray/object_manager/plasma/eviction_policy.h index d646dbd3fc3b1..6abb1a6ec0259 100644 --- a/src/ray/object_manager/plasma/eviction_policy.h +++ b/src/ray/object_manager/plasma/eviction_policy.h @@ -29,6 +29,8 @@ namespace plasma { +class Client; + // ==== The eviction policy ==== // // This file contains declaration for all functions and data structures that diff --git a/src/ray/object_manager/plasma/fling.cc b/src/ray/object_manager/plasma/fling.cc index cf2ec705bf065..5e6d466ab3150 100644 --- a/src/ray/object_manager/plasma/fling.cc +++ b/src/ray/object_manager/plasma/fling.cc @@ -14,14 +14,21 @@ #include "ray/object_manager/plasma/fling.h" +#include #include #include "ray/util/logging.h" -#ifdef _WIN32 -#include // socklen_t -#else -typedef int SOCKET; +#include +#include +#include +#include + +// This is necessary for Mac OS X, see http://www.apuebook.com/faqs2e.html +// (10). +#if !defined(CMSG_SPACE) && !defined(CMSG_LEN) +#define CMSG_SPACE(len) (__DARWIN_ALIGN32(sizeof(struct cmsghdr)) + __DARWIN_ALIGN32(len)) +#define CMSG_LEN(len) (__DARWIN_ALIGN32(sizeof(struct cmsghdr)) + (len)) #endif void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len) { @@ -40,12 +47,7 @@ void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len) int send_fd(int conn, int fd) { struct msghdr msg; struct iovec iov; -#ifdef _WIN32 - SOCKET to_send = fh_get(fd); -#else - SOCKET to_send = fd; -#endif - char buf[CMSG_SPACE(sizeof(to_send))]; + char buf[CMSG_SPACE(sizeof(fd))]; memset(&buf, 0, sizeof(buf)); init_msg(&msg, &iov, buf, sizeof(buf)); @@ -56,17 +58,12 @@ int send_fd(int conn, int fd) { } header->cmsg_level = SOL_SOCKET; header->cmsg_type = SCM_RIGHTS; - header->cmsg_len = CMSG_LEN(sizeof(to_send)); - memcpy(CMSG_DATA(header), reinterpret_cast(&to_send), sizeof(to_send)); + header->cmsg_len = CMSG_LEN(sizeof(fd)); + memcpy(CMSG_DATA(header), reinterpret_cast(&fd), sizeof(fd)); -#ifdef _WIN32 - SOCKET sock = fh_get(conn); -#else - SOCKET sock = conn; -#endif // Send file descriptor. while (true) { - ssize_t r = sendmsg(sock, &msg, 0); + ssize_t r = sendmsg(conn, &msg, 0); if (r < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; @@ -97,16 +94,11 @@ int send_fd(int conn, int fd) { int recv_fd(int conn) { struct msghdr msg; struct iovec iov; - char buf[CMSG_SPACE(sizeof(SOCKET))]; + char buf[CMSG_SPACE(sizeof(int))]; init_msg(&msg, &iov, buf, sizeof(buf)); -#ifdef _WIN32 - SOCKET sock = fh_get(conn); -#else - int sock = conn; -#endif while (true) { - ssize_t r = recvmsg(sock, &msg, 0); + ssize_t r = recvmsg(conn, &msg, 0); if (r == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; @@ -119,24 +111,20 @@ int recv_fd(int conn) { } } - SOCKET found_fd = -1; + int found_fd = -1; int oh_noes = 0; for (struct cmsghdr* header = CMSG_FIRSTHDR(&msg); header != NULL; header = CMSG_NXTHDR(&msg, header)) if (header->cmsg_level == SOL_SOCKET && header->cmsg_type == SCM_RIGHTS) { ssize_t count = (header->cmsg_len - (CMSG_DATA(header) - reinterpret_cast(header))) / - sizeof(SOCKET); + sizeof(int); for (int i = 0; i < count; ++i) { - SOCKET fd = (reinterpret_cast(CMSG_DATA(header)))[i]; + int fd = (reinterpret_cast(CMSG_DATA(header)))[i]; if (found_fd == -1) { found_fd = fd; } else { -#ifdef _WIN32 - closesocket(fd) == 0 || ((WSAGetLastError() == WSAENOTSOCK || WSAGetLastError() == WSANOTINITIALISED) && CloseHandle(reinterpret_cast(fd))); -#else close(fd); -#endif oh_noes = 1; } } @@ -146,19 +134,9 @@ int recv_fd(int conn) { // them all to prevent fd leaks but notify the caller that we got // a bad message. if (oh_noes) { -#ifdef _WIN32 - closesocket(found_fd) == 0 || ((WSAGetLastError() == WSAENOTSOCK || WSAGetLastError() == WSANOTINITIALISED) && CloseHandle(reinterpret_cast(found_fd))); -#else close(found_fd); -#endif errno = EBADMSG; return -1; } - -#ifdef _WIN32 - int to_receive = fh_open(found_fd, -1); -#else - int to_receive = found_fd; -#endif - return to_receive; + return found_fd; } diff --git a/src/ray/object_manager/plasma/fling.h b/src/ray/object_manager/plasma/fling.h index 2acfe370bb44a..45e78b69ebab0 100644 --- a/src/ray/object_manager/plasma/fling.h +++ b/src/ray/object_manager/plasma/fling.h @@ -25,23 +25,6 @@ #pragma once -#include -#include -#include -#ifndef _WIN32 -#include -#endif -#include - -// This is necessary for Mac OS X, see http://www.apuebook.com/faqs2e.html -// (10). -#if !defined(CMSG_SPACE) && !defined(CMSG_LEN) -#define CMSG_SPACE(len) (__DARWIN_ALIGN32(sizeof(struct cmsghdr)) + __DARWIN_ALIGN32(len)) -#define CMSG_LEN(len) (__DARWIN_ALIGN32(sizeof(struct cmsghdr)) + (len)) -#endif - -void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len); - // Send a file descriptor over a unix domain socket. // // \param conn Unix domain socket to send the file descriptor over. diff --git a/src/ray/object_manager/plasma/io.cc b/src/ray/object_manager/plasma/io.cc deleted file mode 100644 index 5ec22d7aef45e..0000000000000 --- a/src/ray/object_manager/plasma/io.cc +++ /dev/null @@ -1,259 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "ray/object_manager/plasma/io.h" - -#include -#include -#include - -#include "ray/object_manager/plasma/common.h" -#include "ray/object_manager/plasma/plasma_generated.h" -#ifndef _WIN32 -#include -#include -#endif - -/// Number of times we try connecting to a socket. -constexpr int64_t kNumConnectAttempts = 80; -/// Time to wait between connection attempts to a socket. -constexpr int64_t kConnectTimeoutMs = 100; - -namespace plasma { - -using flatbuf::MessageType; - -Status WriteBytes(int fd, uint8_t* cursor, size_t length) { - ssize_t nbytes = 0; - size_t bytesleft = length; - size_t offset = 0; - while (bytesleft > 0) { - // While we haven't written the whole message, write to the file descriptor, - // advance the cursor, and decrease the amount left to write. - nbytes = write(fd, cursor + offset, bytesleft); - if (nbytes < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { - continue; - } - return Status::IOError(strerror(errno)); - } else if (nbytes == 0) { - return Status::IOError("Encountered unexpected EOF"); - } - RAY_CHECK(nbytes > 0); - bytesleft -= nbytes; - offset += nbytes; - } - - return Status::OK(); -} - -Status WriteMessage(int fd, MessageType type, int64_t length, uint8_t* bytes) { - int64_t version = kPlasmaProtocolVersion; - RAY_RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast(&version), sizeof(version))); - RAY_RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast(&type), sizeof(type))); - RAY_RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast(&length), sizeof(length))); - return WriteBytes(fd, bytes, length * sizeof(char)); -} - -Status ReadBytes(int fd, uint8_t* cursor, size_t length) { - ssize_t nbytes = 0; - // Termination condition: EOF or read 'length' bytes total. - size_t bytesleft = length; - size_t offset = 0; - while (bytesleft > 0) { - nbytes = read(fd, cursor + offset, bytesleft); - if (nbytes < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { - continue; - } - return Status::IOError(strerror(errno)); - } else if (0 == nbytes) { - return Status::IOError("Encountered unexpected EOF"); - } - RAY_CHECK(nbytes > 0); - bytesleft -= nbytes; - offset += nbytes; - } - - return Status::OK(); -} - -Status ReadMessage(int fd, MessageType* type, std::vector* buffer) { - int64_t version; - RAY_RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast(&version), sizeof(version)), - *type = MessageType::PlasmaDisconnectClient); - RAY_CHECK(version == kPlasmaProtocolVersion) << "version = " << version; - RAY_RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast(type), sizeof(*type)), - *type = MessageType::PlasmaDisconnectClient); - int64_t length_temp; - RAY_RETURN_NOT_OK_ELSE( - ReadBytes(fd, reinterpret_cast(&length_temp), sizeof(length_temp)), - *type = MessageType::PlasmaDisconnectClient); - // The length must be read as an int64_t, but it should be used as a size_t. - size_t length = static_cast(length_temp); - if (length > buffer->size()) { - buffer->resize(length); - } - RAY_RETURN_NOT_OK_ELSE(ReadBytes(fd, buffer->data(), length), - *type = MessageType::PlasmaDisconnectClient); - return Status::OK(); -} - -int ConnectOrListenIpcSock(const std::string& pathname, bool shall_listen) { - union { - struct sockaddr addr; -#ifndef _WIN32 - struct sockaddr_un un; -#endif - struct sockaddr_in in; - } socket_address; - int addrlen; - memset(&socket_address, 0, sizeof(socket_address)); - if (pathname.find("tcp://") == 0) { - addrlen = sizeof(socket_address.in); - socket_address.in.sin_family = AF_INET; - std::string addr = pathname.substr(pathname.find('/') + 2); - size_t i = addr.rfind(':'), j; - if (i >= addr.size()) { - j = i = addr.size(); - } else { - j = i + 1; - } - socket_address.in.sin_addr.s_addr = inet_addr(addr.substr(0, i).c_str()); - socket_address.in.sin_port = htons(static_cast(atoi(addr.substr(j).c_str()))); - if (socket_address.in.sin_addr.s_addr == INADDR_NONE) { - RAY_LOG(ERROR) << "Socket address is not a valid IPv4 address: " << pathname; - return -1; - } - if (socket_address.in.sin_port == htons(0)) { - RAY_LOG(ERROR) << "Socket address is missing a valid port: " << pathname; - return -1; - } - } else { -#ifdef _WIN32 - RAY_LOG(ERROR) << "UNIX domain sockets not supported on Windows: " << pathname; - return -1; -#else - addrlen = sizeof(socket_address.un); - socket_address.un.sun_family = AF_UNIX; - if (pathname.size() + 1 > sizeof(socket_address.un.sun_path)) { - RAY_LOG(ERROR) << "Socket pathname is too long."; - return -1; - } - strncpy(socket_address.un.sun_path, pathname.c_str(), pathname.size() + 1); -#endif - } - - int socket_fd = socket(socket_address.addr.sa_family, SOCK_STREAM, 0); - if (socket_fd < 0) { - RAY_LOG(ERROR) << "socket() failed for pathname " << pathname; - return -1; - } - if (shall_listen) { - // Tell the system to allow the port to be reused. - int on = 1; - if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(&on), - sizeof(on)) < 0) { - RAY_LOG(ERROR) << "setsockopt failed for pathname " << pathname; - close(socket_fd); - return -1; - } - - if (socket_address.addr.sa_family == AF_UNIX) { -#ifdef _WIN32 - _unlink(pathname.c_str()); -#else - unlink(pathname.c_str()); -#endif - } - if (bind(socket_fd, &socket_address.addr, addrlen) != 0) { - RAY_LOG(ERROR) << "Bind failed for pathname " << pathname; - close(socket_fd); - return -1; - } - - if (listen(socket_fd, 128) == -1) { - RAY_LOG(ERROR) << "Could not listen to socket " << pathname; - close(socket_fd); - return -1; - } - } else { - if (connect(socket_fd, &socket_address.addr, addrlen) != 0) { - close(socket_fd); - return -1; - } - } - return socket_fd; -} - -Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries, - int64_t timeout, int* fd) { - // Pick the default values if the user did not specify. - if (num_retries < 0) { - num_retries = kNumConnectAttempts; - } - if (timeout < 0) { - timeout = kConnectTimeoutMs; - } - *fd = ConnectOrListenIpcSock(pathname, false); - while (*fd < 0 && num_retries > 0) { - RAY_LOG(ERROR) << "Connection to IPC socket failed for pathname " << pathname - << ", retrying " << num_retries << " more times"; - // Sleep for timeout milliseconds. - std::this_thread::sleep_for(std::chrono::milliseconds(timeout)); - *fd = ConnectOrListenIpcSock(pathname, false); - --num_retries; - } - - // If we could not connect to the socket, exit. - if (*fd == -1) { - return Status::IOError("Could not connect to socket " + pathname); - } - - return Status::OK(); -} - -int AcceptClient(int socket_fd) { - int client_fd = accept(socket_fd, NULL, NULL); - if (client_fd < 0) { - RAY_LOG(ERROR) << "Error reading from socket."; - return -1; - } - return client_fd; -} - -std::unique_ptr ReadMessageAsync(int sock) { - int64_t size; - Status s = ReadBytes(sock, reinterpret_cast(&size), sizeof(int64_t)); - if (!s.ok()) { - // The other side has closed the socket. - RAY_LOG(DEBUG) << "Socket has been closed, or some other error has occurred."; - close(sock); - return NULL; - } - auto message = std::unique_ptr(new uint8_t[size]); - s = ReadBytes(sock, message.get(), size); - if (!s.ok()) { - // The other side has closed the socket. - RAY_LOG(DEBUG) << "Socket has been closed, or some other error has occurred."; - close(sock); - return NULL; - } - return message; -} - -} // namespace plasma diff --git a/src/ray/object_manager/plasma/io.h b/src/ray/object_manager/plasma/io.h deleted file mode 100644 index d9d38edb6ab09..0000000000000 --- a/src/ray/object_manager/plasma/io.h +++ /dev/null @@ -1,68 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#ifndef _WIN32 -#include -#include -#endif -#include - -#include -#include -#include - -#include "ray/common/ray_config.h" -#include "ray/common/status.h" -#include "ray/object_manager/plasma/common.h" -#include "ray/object_manager/plasma/compat.h" - -namespace plasma { - -using ray::Status; - -namespace flatbuf { - -// Forward declaration outside the namespace, which is defined in plasma_generated.h. -enum class MessageType : int64_t; - -} // namespace flatbuf - -// TODO(suquark): We temporarily sync this with Ray cookie. This code will -// be removed soon in later PRs. -constexpr int64_t kPlasmaProtocolVersion = 0x5241590000000000; - -Status WriteBytes(int fd, uint8_t* cursor, size_t length); - -Status WriteMessage(int fd, flatbuf::MessageType type, int64_t length, uint8_t* bytes); - -Status ReadBytes(int fd, uint8_t* cursor, size_t length); - -Status ReadMessage(int fd, flatbuf::MessageType* type, std::vector* buffer); - -int ConnectOrListenIpcSock(const std::string& pathname, bool shall_listen); - -Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries, - int64_t timeout, int* fd); - -int AcceptClient(int socket_fd); - -std::unique_ptr ReadMessageAsync(int sock); - -} // namespace plasma diff --git a/src/ray/object_manager/plasma/malloc.cc b/src/ray/object_manager/plasma/malloc.cc index 95fa7fa559eed..6a810ef0a4b06 100644 --- a/src/ray/object_manager/plasma/malloc.cc +++ b/src/ray/object_manager/plasma/malloc.cc @@ -17,15 +17,7 @@ #include "ray/object_manager/plasma/malloc.h" -#include #include -#include -#include -#include - -#include -#include -#include #include "ray/object_manager/plasma/common.h" #include "ray/object_manager/plasma/plasma.h" @@ -40,7 +32,7 @@ static ptrdiff_t pointer_distance(void const* pfrom, void const* pto) { return (unsigned char const*)pto - (unsigned char const*)pfrom; } -void GetMallocMapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offset) { +void GetMallocMapinfo(void* addr, MEMFD_TYPE* fd, int64_t* map_size, ptrdiff_t* offset) { // TODO(rshin): Implement a more efficient search through mmap_records. for (const auto& entry : mmap_records) { if (addr >= entry.first && addr < pointer_advance(entry.first, entry.second.size)) { @@ -50,12 +42,12 @@ void GetMallocMapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offset) return; } } - *fd = -1; + *fd = INVALID_FD; *map_size = 0; *offset = 0; } -int64_t GetMmapSize(int fd) { +int64_t GetMmapSize(MEMFD_TYPE fd) { for (const auto& entry : mmap_records) { if (entry.second.fd == fd) { return entry.second.size; diff --git a/src/ray/object_manager/plasma/malloc.h b/src/ray/object_manager/plasma/malloc.h index edc0763a5acf3..2d204a69c0e4d 100644 --- a/src/ray/object_manager/plasma/malloc.h +++ b/src/ray/object_manager/plasma/malloc.h @@ -20,6 +20,7 @@ #include #include +#include "ray/object_manager/plasma/compat.h" #include namespace plasma { @@ -30,16 +31,16 @@ namespace plasma { /// (in the client we cannot guarantee that these mmaps are contiguous). constexpr int64_t kMmapRegionsGap = sizeof(size_t); -void GetMallocMapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* offset); +void GetMallocMapinfo(void* addr, MEMFD_TYPE* fd, int64_t* map_length, ptrdiff_t* offset); /// Get the mmap size corresponding to a specific file descriptor. /// /// \param fd The file descriptor to look up. /// \return The size of the corresponding memory-mapped file. -int64_t GetMmapSize(int fd); +int64_t GetMmapSize(MEMFD_TYPE fd); struct MmapRecord { - int fd; + MEMFD_TYPE fd; int64_t size; }; diff --git a/src/ray/object_manager/plasma/plasma.cc b/src/ray/object_manager/plasma/plasma.cc index fd866dcee9749..0e15d24e1209f 100644 --- a/src/ray/object_manager/plasma/plasma.cc +++ b/src/ray/object_manager/plasma/plasma.cc @@ -17,54 +17,14 @@ #include "ray/object_manager/plasma/plasma.h" -#ifndef _WIN32 -#include -#endif -#include -#include - -#include "ray/object_manager/format/object_manager_generated.h" #include "ray/object_manager/plasma/common.h" -#include "ray/object_manager/plasma/protocol.h" - -namespace fb = ray::object_manager::protocol; namespace plasma { -Client::~Client() { close(fd); } - -std::ostream &operator<<(std::ostream &os, const std::shared_ptr &client) { - os << client->fd; - return os; -} - -StoreConn::~StoreConn() { close(fd); } - -std::ostream &operator<<(std::ostream &os, const std::shared_ptr &store_conn) { - os << store_conn->fd; - return os; -} - ObjectTableEntry::ObjectTableEntry() : pointer(nullptr), ref_count(0) {} ObjectTableEntry::~ObjectTableEntry() { pointer = nullptr; } -int WarnIfSigpipe(int status, int client_sock) { - if (status >= 0) { - return 0; - } - if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) { - RAY_LOG(WARNING) << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when " - "sending a message to client on fd " - << client_sock - << ". The client on the other end may " - "have hung up."; - return errno; - } - RAY_LOG(FATAL) << "Failed to write message to client on fd " << client_sock << "."; - return -1; // This is never reached. -} - ObjectTableEntry* GetObjectTableEntry(PlasmaStoreInfo* store_info, const ObjectID& object_id) { auto it = store_info->objects.find(object_id); diff --git a/src/ray/object_manager/plasma/plasma.h b/src/ray/object_manager/plasma/plasma.h index 4f2ff68a0d138..ab369e0d0e5a0 100644 --- a/src/ray/object_manager/plasma/plasma.h +++ b/src/ray/object_manager/plasma/plasma.h @@ -17,27 +17,14 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include - #include #include #include #include #include -#include "ray/common/status.h" -#include "ray/object_manager/plasma/compat.h" - -#include "ray/common/status.h" -#include "ray/object_manager/format/object_manager_generated.h" #include "ray/object_manager/plasma/common.h" -#include "ray/util/logging.h" +#include "ray/object_manager/plasma/compat.h" #ifdef PLASMA_CUDA using arrow::cuda::CudaIpcMemHandle; @@ -45,65 +32,9 @@ using arrow::cuda::CudaIpcMemHandle; namespace plasma { -using ray::Status; -using ray::object_manager::protocol::ObjectInfoT; - -#define HANDLE_SIGPIPE(s, fd_) \ - do { \ - Status _s = (s); \ - if (!_s.ok()) { \ - if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) { \ - RAY_LOG(WARNING) \ - << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when " \ - "sending a message to client on fd " \ - << fd_ \ - << ". " \ - "The client on the other end may have hung up."; \ - } else { \ - return _s; \ - } \ - } \ - } while (0); - /// Allocation granularity used in plasma for object allocation. constexpr int64_t kBlockSize = 64; -/// Contains all information that is associated with a Plasma store client. -struct Client { - explicit Client(int fd) : fd(fd) {} - - ~Client(); - - /// The file descriptor used to communicate with the client. - int fd; - - /// Object ids that are used by this client. - std::unordered_set object_ids; - - /// File descriptors that are used by this client. - std::unordered_set used_fds; - - std::string name = "anonymous_client"; - - /// The object notifications for clients. We notify the client about the - /// objects in the order that the objects were sealed or deleted. - std::deque> object_notifications; -}; - -std::ostream &operator<<(std::ostream &os, const std::shared_ptr &client); - -/// Connection to Plasma Store. -struct StoreConn { - explicit StoreConn(int fd) : fd(fd) {} - - ~StoreConn(); - - /// The file descriptor used to communicate with the store. - int fd; -}; - -std::ostream &operator<<(std::ostream &os, const std::shared_ptr &store_conn); - // TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec. struct PlasmaObject { #ifdef PLASMA_CUDA @@ -113,7 +44,7 @@ struct PlasmaObject { /// The file descriptor of the memory mapped file in the store. It is used as /// a unique identifier of the file in the client to look up the corresponding /// file descriptor on the client's side. - int store_fd; + MEMFD_TYPE store_fd; /// The offset in bytes in the memory mapped file of the data. ptrdiff_t data_offset; /// The offset in bytes in the memory mapped file of the metadata. @@ -166,22 +97,6 @@ struct PlasmaStoreInfo { ObjectTableEntry* GetObjectTableEntry(PlasmaStoreInfo* store_info, const ObjectID& object_id); -/// Print a warning if the status is less than zero. This should be used to check -/// the success of messages sent to plasma clients. We print a warning instead of -/// failing because the plasma clients are allowed to die. This is used to handle -/// situations where the store writes to a client file descriptor, and the client -/// may already have disconnected. If we have processed the disconnection and -/// closed the file descriptor, we should get a BAD FILE DESCRIPTOR error. If we -/// have not, then we should get a SIGPIPE. If we write to a TCP socket that -/// isn't connected yet, then we should get an ECONNRESET. -/// -/// \param status The status to check. If it is less less than zero, we will -/// print a warning. -/// \param client_sock The client socket. This is just used to print some extra -/// information. -/// \return The errno set. -int WarnIfSigpipe(int status, int client_sock); - /// Globally accessible reference to plasma store configuration. extern const PlasmaStoreInfo* plasma_config; diff --git a/src/ray/object_manager/plasma/protocol.cc b/src/ray/object_manager/plasma/protocol.cc index cbfe53a801211..9ab1d7b328e51 100644 --- a/src/ray/object_manager/plasma/protocol.cc +++ b/src/ray/object_manager/plasma/protocol.cc @@ -22,7 +22,7 @@ #include "flatbuffers/flatbuffers.h" #include "ray/object_manager/plasma/common.h" -#include "ray/object_manager/plasma/io.h" +#include "ray/object_manager/plasma/connection.h" #include "ray/object_manager/plasma/plasma_generated.h" #ifdef PLASMA_CUDA @@ -73,12 +73,7 @@ Status PlasmaReceive(const std::shared_ptr &store_conn, MessageType m if (!store_conn) { return Status::IOError("Connection is closed."); } - MessageType type; - RAY_RETURN_NOT_OK(ReadMessage(store_conn->fd, &type, buffer)); - RAY_CHECK(type == message_type) - << "type = " << static_cast(type) - << ", message_type = " << static_cast(message_type); - return Status::OK(); + return store_conn->ReadMessage(static_cast(message_type), buffer); } // Helper function to create a vector of elements from Data (Request/Reply struct). @@ -110,7 +105,7 @@ Status PlasmaSend(const std::shared_ptr &store_conn, MessageType mess return Status::IOError("Connection is closed."); } fbb->Finish(message); - return WriteMessage(store_conn->fd, message_type, fbb->GetSize(), fbb->GetBufferPointer()); + return store_conn->WriteMessage(static_cast(message_type), fbb->GetSize(), fbb->GetBufferPointer()); } template @@ -120,7 +115,7 @@ Status PlasmaSend(const std::shared_ptr &client, MessageType message_typ return Status::IOError("Connection is closed."); } fbb->Finish(message); - return WriteMessage(client->fd, message_type, fbb->GetSize(), fbb->GetBufferPointer()); + return client->WriteMessage(static_cast(message_type), fbb->GetSize(), fbb->GetBufferPointer()); } Status PlasmaErrorStatus(fb::PlasmaError plasma_error) { @@ -222,7 +217,7 @@ Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id, Status SendCreateReply(const std::shared_ptr &client, ObjectID object_id, PlasmaObject* object, PlasmaError error_code, int64_t mmap_size) { flatbuffers::FlatBufferBuilder fbb; - PlasmaObjectSpec plasma_object(object->store_fd, object->data_offset, object->data_size, + PlasmaObjectSpec plasma_object(FD2INT(object->store_fd), object->data_offset, object->data_size, object->metadata_offset, object->metadata_size, object->device_num); auto object_string = fbb.CreateString(object_id.Binary()); @@ -239,7 +234,7 @@ Status SendCreateReply(const std::shared_ptr &client, ObjectID object_id crb.add_error(static_cast(error_code)); crb.add_plasma_object(&plasma_object); crb.add_object_id(object_string); - crb.add_store_fd(object->store_fd); + crb.add_store_fd(FD2INT(object->store_fd)); crb.add_mmap_size(mmap_size); if (object->device_num != 0) { #ifdef PLASMA_CUDA @@ -253,18 +248,18 @@ Status SendCreateReply(const std::shared_ptr &client, ObjectID object_id } Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, - PlasmaObject* object, int* store_fd, int64_t* mmap_size) { + PlasmaObject* object, MEMFD_TYPE* store_fd, int64_t* mmap_size) { RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *object_id = ObjectID::FromBinary(message->object_id()->str()); - object->store_fd = message->plasma_object()->segment_index(); + object->store_fd = INT2FD(message->plasma_object()->segment_index()); object->data_offset = message->plasma_object()->data_offset(); object->data_size = message->plasma_object()->data_size(); object->metadata_offset = message->plasma_object()->metadata_offset(); object->metadata_size = message->plasma_object()->metadata_size(); - *store_fd = message->store_fd(); + *store_fd = INT2FD(message->store_fd()); *mmap_size = message->mmap_size(); object->device_num = message->plasma_object()->device_num(); @@ -536,7 +531,7 @@ Status ReadGetRequest(uint8_t* data, size_t size, std::vector& object_ Status SendGetReply(const std::shared_ptr &client, ObjectID object_ids[], std::unordered_map& plasma_objects, - int64_t num_objects, const std::vector& store_fds, + int64_t num_objects, const std::vector& store_fds, const std::vector& mmap_sizes) { flatbuffers::FlatBufferBuilder fbb; std::vector objects; @@ -544,7 +539,7 @@ Status SendGetReply(const std::shared_ptr &client, ObjectID object_ids[] std::vector> handles; for (int64_t i = 0; i < num_objects; ++i) { const PlasmaObject& object = plasma_objects[object_ids[i]]; - objects.push_back(PlasmaObjectSpec(object.store_fd, object.data_offset, + objects.push_back(PlasmaObjectSpec(FD2INT(object.store_fd), object.data_offset, object.data_size, object.metadata_offset, object.metadata_size, object.device_num)); #ifdef PLASMA_CUDA @@ -556,10 +551,14 @@ Status SendGetReply(const std::shared_ptr &client, ObjectID object_ids[] } #endif } + std::vector store_fds_as_int; + for (MEMFD_TYPE store_fd : store_fds) { + store_fds_as_int.push_back(FD2INT(store_fd)); + } auto message = fb::CreatePlasmaGetReply( fbb, ToFlatbuffer(&fbb, object_ids, num_objects), fbb.CreateVectorOfStructs(arrow::util::MakeNonNull(objects.data()), num_objects), - fbb.CreateVector(arrow::util::MakeNonNull(store_fds.data()), store_fds.size()), + fbb.CreateVector(arrow::util::MakeNonNull(store_fds_as_int.data()), store_fds_as_int.size()), fbb.CreateVector(arrow::util::MakeNonNull(mmap_sizes.data()), mmap_sizes.size()), fbb.CreateVector(arrow::util::MakeNonNull(handles.data()), handles.size())); return PlasmaSend(client, MessageType::PlasmaGetReply, &fbb, message); @@ -567,7 +566,7 @@ Status SendGetReply(const std::shared_ptr &client, ObjectID object_ids[] Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[], PlasmaObject plasma_objects[], int64_t num_objects, - std::vector& store_fds, std::vector& mmap_sizes) { + std::vector& store_fds, std::vector& mmap_sizes) { RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); #ifdef PLASMA_CUDA @@ -579,7 +578,7 @@ Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[], } for (uoffset_t i = 0; i < num_objects; ++i) { const PlasmaObjectSpec* object = message->plasma_objects()->Get(i); - plasma_objects[i].store_fd = object->segment_index(); + plasma_objects[i].store_fd = INT2FD(object->segment_index()); plasma_objects[i].data_offset = object->data_offset(); plasma_objects[i].data_size = object->data_size(); plasma_objects[i].metadata_offset = object->metadata_offset(); @@ -596,7 +595,7 @@ Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[], } RAY_CHECK(message->store_fds()->size() == message->mmap_sizes()->size()); for (uoffset_t i = 0; i < message->store_fds()->size(); i++) { - store_fds.push_back(message->store_fds()->Get(i)); + store_fds.push_back(INT2FD(message->store_fds()->Get(i))); mmap_sizes.push_back(message->mmap_sizes()->Get(i)); } return Status::OK(); diff --git a/src/ray/object_manager/plasma/protocol.h b/src/ray/object_manager/plasma/protocol.h index f16d38d38b97d..44f7261492413 100644 --- a/src/ray/object_manager/plasma/protocol.h +++ b/src/ray/object_manager/plasma/protocol.h @@ -28,6 +28,9 @@ namespace plasma { +class Client; +class StoreConn; + using ray::Status; using flatbuf::MessageType; @@ -87,7 +90,7 @@ Status SendCreateReply(const std::shared_ptr &client, ObjectID object_id PlasmaError error, int64_t mmap_size); Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, - PlasmaObject* object, int* store_fd, int64_t* mmap_size); + PlasmaObject* object, MEMFD_TYPE* store_fd, int64_t* mmap_size); Status SendAbortRequest(const std::shared_ptr &store_conn, ObjectID object_id); @@ -117,12 +120,12 @@ Status ReadGetRequest(uint8_t* data, size_t size, std::vector& object_ Status SendGetReply(const std::shared_ptr &client, ObjectID object_ids[], std::unordered_map& plasma_objects, - int64_t num_objects, const std::vector& store_fds, + int64_t num_objects, const std::vector& store_fds, const std::vector& mmap_sizes); Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[], PlasmaObject plasma_objects[], int64_t num_objects, - std::vector& store_fds, std::vector& mmap_sizes); + std::vector& store_fds, std::vector& mmap_sizes); /* Plasma Release message functions. */ diff --git a/src/ray/object_manager/plasma/quota_aware_policy.cc b/src/ray/object_manager/plasma/quota_aware_policy.cc index af7acc2faf1ad..64f2d05dd243e 100644 --- a/src/ray/object_manager/plasma/quota_aware_policy.cc +++ b/src/ray/object_manager/plasma/quota_aware_policy.cc @@ -17,6 +17,7 @@ #include "ray/object_manager/plasma/quota_aware_policy.h" #include "ray/object_manager/plasma/common.h" +#include "ray/object_manager/plasma/connection.h" #include "ray/object_manager/plasma/plasma_allocator.h" #include diff --git a/src/ray/object_manager/plasma/shared_memory.cc b/src/ray/object_manager/plasma/shared_memory.cc new file mode 100644 index 0000000000000..bcc9b7ade88ac --- /dev/null +++ b/src/ray/object_manager/plasma/shared_memory.cc @@ -0,0 +1,55 @@ +#include "ray/object_manager/plasma/shared_memory.h" + +#include + +#ifndef _WIN32 +#include +#include +#endif + +#include "ray/object_manager/plasma/malloc.h" +#include "ray/util/logging.h" + +namespace plasma { + +ClientMmapTableEntry::ClientMmapTableEntry(MEMFD_TYPE fd, int64_t map_size) + : fd_(fd), pointer_(nullptr), length_(0) { + // We subtract kMmapRegionsGap from the length that was added + // in fake_mmap in malloc.h, to make map_size page-aligned again. + length_ = map_size - kMmapRegionsGap; +#ifdef _WIN32 + pointer_ = reinterpret_cast(MapViewOfFile(fd, FILE_MAP_ALL_ACCESS, 0, 0, length_)); + // TODO(pcm): Don't fail here, instead return a Status. + if (pointer_ == NULL) { + RAY_LOG(FATAL) << "mmap failed"; + } + CloseHandle(fd); // Closing this fd has an effect on performance. +#else + pointer_ = reinterpret_cast( + mmap(NULL, length_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)); + // TODO(pcm): Don't fail here, instead return a Status. + if (pointer_ == MAP_FAILED) { + RAY_LOG(FATAL) << "mmap failed"; + } + close(fd); // Closing this fd has an effect on performance. +#endif +} + +ClientMmapTableEntry::~ClientMmapTableEntry() { + // At this point it is safe to unmap the memory, as the PlasmaBuffer + // keeps the PlasmaClient (and therefore the ClientMmapTableEntry) + // alive until it is destroyed. + // We don't need to close the associated file, since it has + // already been closed in the constructor. + int r; +#ifdef _WIN32 + r = UnmapViewOfFile(pointer_) ? 0 : -1; +#else + r = munmap(pointer_, length_); +#endif + if (r != 0) { + RAY_LOG(ERROR) << "munmap returned " << r << ", errno = " << errno; + } +} + +} // namespace plasma diff --git a/src/ray/object_manager/plasma/shared_memory.h b/src/ray/object_manager/plasma/shared_memory.h new file mode 100644 index 0000000000000..8ec18374b07cb --- /dev/null +++ b/src/ray/object_manager/plasma/shared_memory.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include + +#include "ray/object_manager/plasma/compat.h" +#include "ray/util/macros.h" + +namespace plasma { + +class ClientMmapTableEntry { + public: + ClientMmapTableEntry(MEMFD_TYPE fd, int64_t map_size); + + ~ClientMmapTableEntry(); + + uint8_t* pointer() { return pointer_; } + + MEMFD_TYPE fd() { return fd_; } + + private: + /// The associated file descriptor on the client. + MEMFD_TYPE fd_; + /// The result of mmap for this file descriptor. + uint8_t* pointer_; + /// The length of the memory-mapped file. + size_t length_; + + RAY_DISALLOW_COPY_AND_ASSIGN(ClientMmapTableEntry); +}; + +} // namespace plasma diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index 0d65ffdb6a52f..e3175f29b2c6a 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -32,6 +32,7 @@ #include #include +#include #include #include #include @@ -41,13 +42,14 @@ #include #include +#include + #include "ray/object_manager/format/object_manager_generated.h" #include "ray/object_manager/plasma/common.h" -#include "ray/object_manager/plasma/fling.h" -#include "ray/object_manager/plasma/io.h" #include "ray/object_manager/plasma/malloc.h" #include "ray/object_manager/plasma/plasma_allocator.h" #include "ray/object_manager/plasma/protocol.h" +#include "ray/util/util.h" #ifdef PLASMA_CUDA #include "arrow/gpu/cuda_api.h" @@ -62,12 +64,9 @@ namespace fb = plasma::flatbuf; namespace plasma { struct GetRequest { - GetRequest(const std::shared_ptr &client, const std::vector& object_ids); + GetRequest(boost::asio::io_service& io_context, const std::shared_ptr &client, const std::vector& object_ids); /// The client that called get. std::shared_ptr client; - /// The ID of the timer that will time out and cause this wait to return to - /// the client if it hasn't already returned. - int64_t timer; /// The object IDs involved in this request. This is used in the reply. std::vector object_ids; /// The object information for the objects in this request. This is used in @@ -78,22 +77,39 @@ struct GetRequest { /// The number of object requests in this wait request that are already /// satisfied. int64_t num_satisfied; + + void AsyncWait(int64_t timeout_ms, + std::function on_timeout) { + // Set an expiry time relative to now. + timer_.expires_from_now(std::chrono::milliseconds(timeout_ms)); + timer_.async_wait(on_timeout); + } + + void CancelTimer() { timer_.cancel(); } + + private: + /// The timer that will time out and cause this wait to return to + /// the client if it hasn't already returned. + boost::asio::steady_timer timer_; }; -GetRequest::GetRequest(const std::shared_ptr &client, const std::vector& object_ids) +GetRequest::GetRequest(boost::asio::io_service& io_context, const std::shared_ptr &client, const std::vector& object_ids) : client(client), - timer(-1), object_ids(object_ids.begin(), object_ids.end()), objects(object_ids.size()), - num_satisfied(0) { + num_satisfied(0), + timer_(io_context) { std::unordered_set unique_ids(object_ids.begin(), object_ids.end()); num_objects_to_wait_for = unique_ids.size(); } -PlasmaStore::PlasmaStore(EventLoop* loop, std::string directory, bool hugepages_enabled, +PlasmaStore::PlasmaStore(boost::asio::io_service &main_service, std::string directory, bool hugepages_enabled, const std::string& socket_name, std::shared_ptr external_store) - : loop_(loop), + : io_context_(main_service), + socket_name_(socket_name), + acceptor_(main_service, ParseUrlEndpoint(socket_name)), + socket_(main_service), eviction_policy_(&store_info_, PlasmaAllocator::GetFootprintLimit()), external_store_(external_store) { store_info_.directory = directory; @@ -108,6 +124,15 @@ PlasmaStore::PlasmaStore(EventLoop* loop, std::string directory, bool hugepages_ // TODO(pcm): Get rid of this destructor by using RAII to clean up data. PlasmaStore::~PlasmaStore() {} +void PlasmaStore::Start() { + // Start listening for clients. + DoAccept(); +} + +void PlasmaStore::Stop() { + acceptor_.close(); +} + const PlasmaStoreInfo* PlasmaStore::GetPlasmaStoreInfo() { return &store_info_; } // If this client is not already using the object, add the client to the @@ -132,7 +157,7 @@ void PlasmaStore::AddToClientObjectIds(const ObjectID& object_id, ObjectTableEnt } // Allocate memory -uint8_t* PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, int* fd, +uint8_t* PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, MEMFD_TYPE* fd, int64_t* map_size, ptrdiff_t* offset, const std::shared_ptr &client, bool is_create) { // First free up space from the client's LRU queue if quota enforcement is on. @@ -176,7 +201,7 @@ uint8_t* PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, int* fd, if (pointer != nullptr) { GetMallocMapinfo(pointer, fd, map_size, offset); - RAY_CHECK(*fd != -1); + RAY_CHECK(*fd != INVALID_FD); } return pointer; } @@ -214,7 +239,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_f return PlasmaError::ObjectExists; } - int fd = -1; + MEMFD_TYPE fd = INVALID_FD; int64_t map_size = 0; ptrdiff_t offset = 0; uint8_t* pointer = nullptr; @@ -316,9 +341,7 @@ void PlasmaStore::RemoveGetRequest(GetRequest* get_request) { } } // Remove the get request. - if (get_request->timer != -1) { - RAY_CHECK(loop_->RemoveTimer(get_request->timer) == kEventLoopOk); - } + get_request->CancelTimer(); delete get_request; } @@ -342,35 +365,33 @@ void PlasmaStore::RemoveGetRequestsForClient(const std::shared_ptr &clie void PlasmaStore::ReturnFromGet(GetRequest* get_req) { // Figure out how many file descriptors we need to send. - std::unordered_set fds_to_send; - std::vector store_fds; + std::unordered_set fds_to_send; + std::vector store_fds; std::vector mmap_sizes; for (const auto& object_id : get_req->object_ids) { PlasmaObject& object = get_req->objects[object_id]; - int fd = object.store_fd; - if (object.data_size != -1 && fds_to_send.count(fd) == 0 && fd != -1) { + MEMFD_TYPE fd = object.store_fd; + if (object.data_size != -1 && fds_to_send.count(fd) == 0 && fd != INVALID_FD) { fds_to_send.insert(fd); store_fds.push_back(fd); mmap_sizes.push_back(GetMmapSize(fd)); } } - // Send the get reply to the client. Status s = SendGetReply(get_req->client, &get_req->object_ids[0], get_req->objects, get_req->object_ids.size(), store_fds, mmap_sizes); - WarnIfSigpipe(s.ok() ? 0 : -1, get_req->client->fd); // If we successfully sent the get reply message to the client, then also send // the file descriptors. if (s.ok()) { // Send all of the file descriptors for the present objects. - for (int store_fd : store_fds) { - // Only send the file descriptor if it hasn't been sent (see analogous - // logic in GetStoreFd in client.cc). - if (get_req->client->used_fds.find(store_fd) == get_req->client->used_fds.end()) { - WarnIfSigpipe(send_fd(get_req->client->fd, store_fd), get_req->client->fd); - get_req->client->used_fds.insert(store_fd); + for (MEMFD_TYPE store_fd : store_fds) { + Status send_fd_status = get_req->client->SendFd(store_fd); + if (!send_fd_status.ok()) { + RAY_LOG(ERROR) << "Failed to send mmap results to client on fd " << get_req->client; } } + } else { + RAY_LOG(ERROR) << "Failed to send Get reply to client on fd " << get_req->client; } // Remove the get request from each of the relevant object_get_requests hash @@ -427,7 +448,7 @@ void PlasmaStore::ProcessGetRequest(const std::shared_ptr &client, const std::vector& object_ids, int64_t timeout_ms) { // Create a get request for this object. - auto get_req = new GetRequest(client, object_ids); + auto get_req = new GetRequest(io_context_, client, object_ids); std::vector evicted_ids; std::vector evicted_entries; for (auto object_id : object_ids) { @@ -503,9 +524,11 @@ void PlasmaStore::ProcessGetRequest(const std::shared_ptr &client, } else if (timeout_ms != -1) { // Set a timer that will cause the get request to return to the client. Note // that a timeout of -1 is used to indicate that no timer should be set. - get_req->timer = loop_->AddTimer(timeout_ms, [this, get_req](int64_t timer_id) { - ReturnFromGet(get_req); - return kEventLoopTimerDone; + get_req->AsyncWait(timeout_ms, [this, get_req](const boost::system::error_code& ec) { + if (ec != boost::asio::error::operation_aborted) { + // Timer was not cancelled, take necessary action. + ReturnFromGet(get_req); + } }); } } @@ -696,24 +719,19 @@ void PlasmaStore::EvictObjects(const std::vector& object_ids) { } } -void PlasmaStore::ConnectClient(int listener_sock) { - int client_fd = AcceptClient(listener_sock); - auto client = std::make_shared(client_fd); - - // Add a callback to handle events on this socket. - // TODO(pcm): Check return value. - loop_->AddFileEvent(client_fd, kEventLoopRead, [this, client](int events) { - Status s = ProcessMessage(client); - if (!s.ok()) { - RAY_LOG(FATAL) << "Failed to process file event: " << s; - } - }); - RAY_LOG(DEBUG) << "New connection with fd " << client; +void PlasmaStore::ConnectClient(const boost::system::error_code &error) { + if (!error) { + // Accept a new local client and dispatch it to the node manager. + auto new_connection = Client::Create(boost::bind( + &PlasmaStore::ProcessMessage, this, _1, _2, _3 + ), std::move(socket_)); + } + // We're ready to accept another client. + DoAccept(); } void PlasmaStore::DisconnectClient(const std::shared_ptr &client) { - int client_fd = client->fd; - RAY_CHECK(client_fd > 0); + client->Close(); RAY_LOG(DEBUG) << "Disconnecting client on fd " << client; // Release all the objects that the client was using. eviction_policy_.ClientDisconnected(client.get()); @@ -746,9 +764,6 @@ void PlasmaStore::DisconnectClient(const std::shared_ptr &client) { // Remove notification for this client from global map. notification_clients_.erase(client); } - - // We lose the last borrower of the Client instance here. - loop_->RemoveFileEvent(client_fd); } /// Send notifications about sealed objects to the subscribers. This is called @@ -757,8 +772,8 @@ void PlasmaStore::DisconnectClient(const std::shared_ptr &client) { /// /// \param client The client to push notifications to. /// \param object_info The notifications. -Status PlasmaStore::SendNotifications( - const std::shared_ptr& client, const std::vector &object_info) { +void PlasmaStore::SendNotifications( + const std::shared_ptr &client, const std::vector &object_info) { namespace protocol = ray::object_manager::protocol; flatbuffers::FlatBufferBuilder fbb; std::vector> info; @@ -769,63 +784,27 @@ Status PlasmaStore::SendNotifications( auto message = protocol::CreatePlasmaNotification(fbb, info_array); fbb.Finish(message); - RAY_LOG(DEBUG) << "Send notifications to fd = " << client->fd; - auto& notifications = client->object_notifications; - auto new_notifications = - std::unique_ptr(new uint8_t[sizeof(int64_t) + fbb.GetSize()]); - *(reinterpret_cast(new_notifications.get())) = fbb.GetSize(); - memcpy(new_notifications.get() + sizeof(int64_t), fbb.GetBufferPointer(), fbb.GetSize()); - notifications.emplace_back(std::move(new_notifications)); - - int num_processed = 0; - bool closed = false; - // Loop over the array of pending notifications and send as many of them as - // possible. - for (size_t i = 0; i < notifications.size(); ++i) { - auto& notification = notifications.at(i); - // Decode the length, which is the first bytes of the message. - int64_t size = *(reinterpret_cast(notification.get())); - - // Attempt to send a notification about this object ID. - ssize_t nbytes = send(client->fd, notification.get(), sizeof(int64_t) + size, 0); - if (nbytes >= 0) { - RAY_CHECK(nbytes == static_cast(sizeof(int64_t)) + size); - } else if (nbytes == -1 && - (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) { - RAY_LOG(DEBUG) << "The socket's send buffer is full, so we are caching this " - "notification and will send it later."; - // Add a callback to the event loop to send queued notifications whenever - // there is room in the socket's send buffer. Callbacks can be added - // more than once here and will be overwritten. The callback is removed - // at the end of the method. - // TODO(pcm): Introduce status codes and check in case the file descriptor - // is added twice. - loop_->AddFileEvent(client->fd, kEventLoopWrite, [this, client](int events) { - Status s = SendNotifications(client, {}); - if (!s.ok()) { - notification_clients_.erase(client); - } - }); - break; - } else { - RAY_LOG(WARNING) << "Failed to send notification to client on fd " << client->fd - << ", errno = " << errno; - if (errno == EPIPE) { - closed = true; - break; + // In C++14, we can use unique_ptr instead. + auto size = new int64_t; + *size = fbb.GetSize(); + auto data = new uint8_t[fbb.GetSize()]; + std::memcpy(data, fbb.GetBufferPointer(), fbb.GetSize()); + + std::vector buffers{ + boost::asio::const_buffer(size, sizeof(*size)), + boost::asio::const_buffer(data, fbb.GetSize()), + }; + client->WriteBufferAsync(buffers, [this, client, size, data](const Status& s) { + if (!s.ok()) { + RAY_LOG(WARNING) << "Failed to send notification to client on fd " << client; + if (s.IsIOError()) { + client->Close(); + notification_clients_.erase(client); } } - num_processed += 1; - } - // Remove the sent notifications from the array. - notifications.erase(notifications.begin(), notifications.begin() + num_processed); - - // If we have sent all notifications, remove the fd from the event loop. - if (notifications.empty()) { - loop_->RemoveFileEvent(client->fd); - } - // Stop sending notifications if the pipe was broken. - return closed ? Status::IOError("Send notifications failed") : Status::OK(); + delete size; + delete[] data; + }); } void PlasmaStore::PushNotification(ObjectInfoT* object_info) { @@ -843,32 +822,17 @@ void PlasmaStore::PushNotifications(const std::vector& object_info) } } - auto it = notification_clients_.begin(); - while (it != notification_clients_.end()) { - Status s = SendNotifications(*it, object_info); - if (s.ok()) { - ++it; - } else { - it = notification_clients_.erase(it); - } + for (const auto& client : notification_clients_) { + SendNotifications(client, object_info); } } // Subscribe to notifications about sealed objects. void PlasmaStore::SubscribeToUpdates(const std::shared_ptr &client) { - RAY_LOG(DEBUG) << "subscribing to updates on fd " << client->fd; // Add this fd to global map, which is needed for this client to receive notifications. notification_clients_.insert(client); - // Make the socket non-blocking. -#ifdef _WINSOCKAPI_ - unsigned long value = 1; - RAY_CHECK(ioctlsocket(client->fd, FIONBIO, &value) == 0); -#else - int flags = fcntl(client->fd, F_GETFL, 0); - RAY_CHECK(fcntl(client->fd, F_SETFL, flags | O_NONBLOCK) == 0); -#endif - + std::vector infos; // Push notifications to the new subscriber about existing sealed objects. for (const auto& entry : store_info_.objects) { if (entry.second->state == ObjectState::PLASMA_SEALED) { @@ -876,21 +840,18 @@ void PlasmaStore::SubscribeToUpdates(const std::shared_ptr &client) { info.object_id = entry.first.Binary(); info.data_size = entry.second->data_size; info.metadata_size = entry.second->metadata_size; - Status s = SendNotifications(client, {info}); - if (!s.ok()) { - notification_clients_.erase(client); - } + infos.push_back(info); } } + SendNotifications(client, infos); } -Status PlasmaStore::ProcessMessage(const std::shared_ptr &client) { - fb::MessageType type; - Status s = ReadMessage(client->fd, &type, &input_buffer_); - RAY_CHECK(s.ok() || s.IsIOError()); - - uint8_t* input = input_buffer_.data(); - size_t input_size = input_buffer_.size(); +Status PlasmaStore::ProcessMessage(const std::shared_ptr &client, + fb::MessageType type, + const std::vector &message) { + // TODO(suquark): We should convert these interfaces to const later. + uint8_t* input = (uint8_t*)message.data(); + size_t input_size = message.size(); ObjectID object_id; PlasmaObject object = {}; @@ -909,15 +870,9 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr &client) { if (error_code == PlasmaError::OK && device_num == 0) { mmap_size = GetMmapSize(object.store_fd); } - HANDLE_SIGPIPE( - SendCreateReply(client, object_id, &object, error_code, mmap_size), - client->fd); - // Only send the file descriptor if it hasn't been sent (see analogous - // logic in GetStoreFd in client.cc). Similar in ReturnFromGet. - if (error_code == PlasmaError::OK && device_num == 0 && - client->used_fds.find(object.store_fd) == client->used_fds.end()) { - WarnIfSigpipe(send_fd(client->fd, object.store_fd), client->fd); - client->used_fds.insert(object.store_fd); + RAY_RETURN_NOT_OK(SendCreateReply(client, object_id, &object, error_code, mmap_size)); + if (error_code == PlasmaError::OK && device_num == 0) { + RAY_RETURN_NOT_OK(client->SendFd(object.store_fd)); } } break; case fb::MessageType::PlasmaAbortRequest: { @@ -925,7 +880,7 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr &client) { RAY_CHECK(AbortObject(object_id, client) == 1) << "To abort an object, the only " "client currently using it " "must be the creator."; - HANDLE_SIGPIPE(SendAbortReply(client, object_id), client->fd); + RAY_RETURN_NOT_OK(SendAbortReply(client, object_id)); } break; case fb::MessageType::PlasmaGetRequest: { std::vector object_ids_to_get; @@ -945,20 +900,20 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr &client) { for (auto& object_id : object_ids) { error_codes.push_back(DeleteObject(object_id)); } - HANDLE_SIGPIPE(SendDeleteReply(client, object_ids, error_codes), client->fd); + RAY_RETURN_NOT_OK(SendDeleteReply(client, object_ids, error_codes)); } break; case fb::MessageType::PlasmaContainsRequest: { RAY_RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id)); if (ContainsObject(object_id) == ObjectStatus::OBJECT_FOUND) { - HANDLE_SIGPIPE(SendContainsReply(client, object_id, 1), client->fd); + RAY_RETURN_NOT_OK(SendContainsReply(client, object_id, 1)); } else { - HANDLE_SIGPIPE(SendContainsReply(client, object_id, 0), client->fd); + RAY_RETURN_NOT_OK(SendContainsReply(client, object_id, 0)); } } break; case fb::MessageType::PlasmaSealRequest: { RAY_RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id)); SealObjects({object_id}); - HANDLE_SIGPIPE(SendSealReply(client, object_id, PlasmaError::OK), client->fd); + RAY_RETURN_NOT_OK(SendSealReply(client, object_id, PlasmaError::OK)); } break; case fb::MessageType::PlasmaEvictRequest: { // This code path should only be used for testing. @@ -968,24 +923,24 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr &client) { int64_t num_bytes_evicted = eviction_policy_.ChooseObjectsToEvict(num_bytes, &objects_to_evict); EvictObjects(objects_to_evict); - HANDLE_SIGPIPE(SendEvictReply(client, num_bytes_evicted), client->fd); + RAY_RETURN_NOT_OK(SendEvictReply(client, num_bytes_evicted)); } break; case fb::MessageType::PlasmaRefreshLRURequest: { std::vector object_ids; RAY_RETURN_NOT_OK(ReadRefreshLRURequest(input, input_size, &object_ids)); eviction_policy_.RefreshObjects(object_ids); - HANDLE_SIGPIPE(SendRefreshLRUReply(client), client->fd); + RAY_RETURN_NOT_OK(SendRefreshLRUReply(client)); } break; case fb::MessageType::PlasmaSubscribeRequest: SubscribeToUpdates(client); break; case fb::MessageType::PlasmaConnectRequest: { - HANDLE_SIGPIPE(SendConnectReply(client, PlasmaAllocator::GetFootprintLimit()), - client->fd); + RAY_RETURN_NOT_OK(SendConnectReply(client, PlasmaAllocator::GetFootprintLimit())); } break; case fb::MessageType::PlasmaDisconnectClient: RAY_LOG(DEBUG) << "Disconnecting client on fd " << client; DisconnectClient(client); + return Status::Disconnected("The Plasma Store client is disconnected."); break; case fb::MessageType::PlasmaSetOptionsRequest: { std::string client_name; @@ -994,13 +949,11 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr &client) { ReadSetOptionsRequest(input, input_size, &client_name, &output_memory_quota)); client->name = client_name; bool success = eviction_policy_.SetClientQuota(client.get(), output_memory_quota); - HANDLE_SIGPIPE(SendSetOptionsReply(client, success ? PlasmaError::OK - : PlasmaError::OutOfMemory), - client->fd); + RAY_RETURN_NOT_OK(SendSetOptionsReply(client, success ? PlasmaError::OK + : PlasmaError::OutOfMemory)); } break; case fb::MessageType::PlasmaGetDebugStringRequest: { - HANDLE_SIGPIPE(SendGetDebugStringReply(client, eviction_policy_.DebugString()), - client->fd); + RAY_RETURN_NOT_OK(SendGetDebugStringReply(client, eviction_policy_.DebugString())); } break; default: // This code should be unreachable. @@ -1009,4 +962,9 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr &client) { return Status::OK(); } +void PlasmaStore::DoAccept() { + acceptor_.async_accept(socket_, boost::bind(&PlasmaStore::ConnectClient, this, + boost::asio::placeholders::error)); +} + } // namespace plasma diff --git a/src/ray/object_manager/plasma/store.h b/src/ray/object_manager/plasma/store.h index ee6dc9bb6f994..968e08b9928f7 100644 --- a/src/ray/object_manager/plasma/store.h +++ b/src/ray/object_manager/plasma/store.h @@ -28,7 +28,7 @@ #include "ray/object_manager/format/object_manager_generated.h" #include "ray/object_manager/notification/object_store_notification_manager.h" #include "ray/object_manager/plasma/common.h" -#include "ray/object_manager/plasma/events.h" +#include "ray/object_manager/plasma/connection.h" #include "ray/object_manager/plasma/external_store.h" #include "ray/object_manager/plasma/plasma.h" #include "ray/object_manager/plasma/protocol.h" @@ -50,12 +50,18 @@ struct GetRequest; class PlasmaStore { public: // TODO: PascalCase PlasmaStore methods. - PlasmaStore(EventLoop* loop, std::string directory, bool hugepages_enabled, + PlasmaStore(boost::asio::io_service &main_service, std::string directory, bool hugepages_enabled, const std::string& socket_name, std::shared_ptr external_store); ~PlasmaStore(); + /// Start this store. + void Start(); + + /// Stop this store. + void Stop(); + /// Get a const pointer to the internal PlasmaStoreInfo object. const PlasmaStoreInfo* GetPlasmaStoreInfo(); @@ -151,18 +157,19 @@ class PlasmaStore { /// Connect a new client to the PlasmaStore. /// - /// \param listener_sock The socket that is listening to incoming connections. - void ConnectClient(int listener_sock); + /// \param error The error code from the acceptor. + void ConnectClient(const boost::system::error_code &error); /// Disconnect a client from the PlasmaStore. /// /// \param client The client that is disconnected. void DisconnectClient(const std::shared_ptr &client); - Status SendNotifications( - const std::shared_ptr& client, const std::vector &object_info); + void SendNotifications( + const std::shared_ptr &client, const std::vector &object_info); - Status ProcessMessage(const std::shared_ptr &client); + Status ProcessMessage(const std::shared_ptr &client, plasma::flatbuf::MessageType type, + const std::vector &message); void SetNotificationListener( const std::shared_ptr ¬ification_listener) { @@ -208,7 +215,7 @@ class PlasmaStore { void EraseFromObjectTable(const ObjectID& object_id); - uint8_t* AllocateMemory(size_t size, bool evict_if_full, int* fd, int64_t* map_size, + uint8_t* AllocateMemory(size_t size, bool evict_if_full, MEMFD_TYPE* fd, int64_t* map_size, ptrdiff_t* offset, const std::shared_ptr &client, bool is_create); #ifdef PLASMA_CUDA Status AllocateCudaMemory(int device_num, int64_t size, uint8_t** out_pointer, @@ -217,16 +224,23 @@ class PlasmaStore { Status FreeCudaMemory(int device_num, int64_t size, uint8_t* out_pointer); #endif - /// Event loop of the plasma store. - EventLoop* loop_; + // Start listening for clients. + void DoAccept(); + + // A reference to the asio io context. + boost::asio::io_service& io_context_; + /// The name of the socket this object store listens on. + std::string socket_name_; + /// An acceptor for new clients. + boost::asio::basic_socket_acceptor acceptor_; + /// The socket to listen on for new clients. + ray::local_stream_socket socket_; + /// The plasma store information, including the object tables, that is exposed /// to the eviction policy. PlasmaStoreInfo store_info_; /// The state that is managed by the eviction policy. QuotaAwarePolicy eviction_policy_; - /// Input buffer. This is allocated only once to avoid mallocs for every - /// call to process_message. - std::vector input_buffer_; /// A hash table mapping object IDs to a vector of the get requests that are /// waiting for the object to arrive. std::unordered_map> object_get_requests_; diff --git a/src/ray/object_manager/plasma/store_runner.cc b/src/ray/object_manager/plasma/store_runner.cc index 41941279c31d6..a617e04dc64b6 100644 --- a/src/ray/object_manager/plasma/store_runner.cc +++ b/src/ray/object_manager/plasma/store_runner.cc @@ -8,7 +8,6 @@ #include #include -#include "ray/object_manager/plasma/io.h" #include "ray/object_manager/plasma/plasma_allocator.h" namespace plasma { @@ -97,9 +96,7 @@ void PlasmaStoreRunner::Start() { } RAY_LOG(DEBUG) << "starting server listening on " << socket_name_; - // Create the event loop. - loop_.reset(new EventLoop); - store_.reset(new PlasmaStore(loop_.get(), plasma_directory_, hugepages_enabled_, + store_.reset(new PlasmaStore(main_service_, plasma_directory_, hugepages_enabled_, socket_name_, external_store)); plasma_config = store_->GetPlasmaStoreInfo(); @@ -115,14 +112,9 @@ void PlasmaStoreRunner::Start() { PlasmaAllocator::Free( pointer, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t)); - int socket = ConnectOrListenIpcSock(socket_name_, true); - // TODO(pcm): Check return value. - RAY_CHECK(socket >= 0); + store_->Start(); - loop_->AddFileEvent(socket, kEventLoopRead, [this, socket](int events) { - this->store_->ConnectClient(socket); - }); - loop_->Start(); + main_service_.run(); Shutdown(); #ifdef _WINSOCKAPI_ @@ -131,16 +123,12 @@ void PlasmaStoreRunner::Start() { } void PlasmaStoreRunner::Stop() { - if (loop_) { - loop_->Stop(); - } else { - RAY_LOG(ERROR) << "Expected loop_ to be non-NULL; this may be a bug"; - } + store_->Stop(); + main_service_.stop(); } void PlasmaStoreRunner::Shutdown() { - loop_->Shutdown(); - loop_ = nullptr; + store_->Stop(); store_ = nullptr; } diff --git a/src/ray/object_manager/plasma/store_runner.h b/src/ray/object_manager/plasma/store_runner.h index a9b1b07a31099..4b45ed9b9230d 100644 --- a/src/ray/object_manager/plasma/store_runner.h +++ b/src/ray/object_manager/plasma/store_runner.h @@ -2,6 +2,8 @@ #include +#include + #include "ray/object_manager/notification/object_store_notification_manager.h" #include "ray/object_manager/plasma/store.h" @@ -26,7 +28,7 @@ class PlasmaStoreRunner { bool hugepages_enabled_; std::string plasma_directory_; std::string external_store_endpoint_; - std::unique_ptr loop_; + boost::asio::io_service main_service_; std::unique_ptr store_; std::shared_ptr listener_; }; diff --git a/src/ray/thirdparty/ae/ae.c b/src/ray/thirdparty/ae/ae.c deleted file mode 100644 index 883b2b0c6ddfe..0000000000000 --- a/src/ray/thirdparty/ae/ae.c +++ /dev/null @@ -1,465 +0,0 @@ -/* A simple event-driven programming library. Originally I wrote this code - * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated - * it in form of a library for easy reuse. - * - * Copyright (c) 2006-2010, Salvatore Sanfilippo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Redis nor the names of its contributors may be used - * to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "ray/thirdparty/ae/ae.h" -#include "ray/thirdparty/ae/zmalloc.h" -#include "ray/thirdparty/ae/config.h" - -/* Include the best multiplexing layer supported by this system. - * The following should be ordered by performances, descending. */ -#ifdef HAVE_EVPORT -#include "ray/thirdparty/ae/ae_evport.c" -#else - #ifdef HAVE_EPOLL - #include "ray/thirdparty/ae/ae_epoll.c" - #else - #ifdef HAVE_KQUEUE - #include "ray/thirdparty/ae/ae_kqueue.c" - #else - #include "ray/thirdparty/ae/ae_select.c" - #endif - #endif -#endif - -aeEventLoop *aeCreateEventLoop(int setsize) { - aeEventLoop *eventLoop; - int i; - - if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; - eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); - eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); - if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; - eventLoop->setsize = setsize; - eventLoop->lastTime = time(NULL); - eventLoop->timeEventHead = NULL; - eventLoop->timeEventNextId = 0; - eventLoop->stop = 0; - eventLoop->maxfd = -1; - eventLoop->beforesleep = NULL; - if (aeApiCreate(eventLoop) == -1) goto err; - /* Events with mask == AE_NONE are not set. So let's initialize the - * vector with it. */ - for (i = 0; i < setsize; i++) - eventLoop->events[i].mask = AE_NONE; - return eventLoop; - -err: - if (eventLoop) { - zfree(eventLoop->events); - zfree(eventLoop->fired); - zfree(eventLoop); - } - return NULL; -} - -/* Return the current set size. */ -int aeGetSetSize(aeEventLoop *eventLoop) { - return eventLoop->setsize; -} - -/* Resize the maximum set size of the event loop. - * If the requested set size is smaller than the current set size, but - * there is already a file descriptor in use that is >= the requested - * set size minus one, AE_ERR is returned and the operation is not - * performed at all. - * - * Otherwise AE_OK is returned and the operation is successful. */ -int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) { - int i; - - if (setsize == eventLoop->setsize) return AE_OK; - if (eventLoop->maxfd >= setsize) return AE_ERR; - if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR; - - eventLoop->events = zrealloc(eventLoop->events,sizeof(aeFileEvent)*setsize); - eventLoop->fired = zrealloc(eventLoop->fired,sizeof(aeFiredEvent)*setsize); - eventLoop->setsize = setsize; - - /* Make sure that if we created new slots, they are initialized with - * an AE_NONE mask. */ - for (i = eventLoop->maxfd+1; i < setsize; i++) - eventLoop->events[i].mask = AE_NONE; - return AE_OK; -} - -void aeDeleteEventLoop(aeEventLoop *eventLoop) { - aeApiFree(eventLoop); - zfree(eventLoop->events); - zfree(eventLoop->fired); - zfree(eventLoop); -} - -void aeStop(aeEventLoop *eventLoop) { - eventLoop->stop = 1; -} - -int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, - aeFileProc *proc, void *clientData) -{ - if (fd >= eventLoop->setsize) { - errno = ERANGE; - return AE_ERR; - } - aeFileEvent *fe = &eventLoop->events[fd]; - - if (aeApiAddEvent(eventLoop, fd, mask) == -1) - return AE_ERR; - fe->mask |= mask; - if (mask & AE_READABLE) fe->rfileProc = proc; - if (mask & AE_WRITABLE) fe->wfileProc = proc; - fe->clientData = clientData; - if (fd > eventLoop->maxfd) - eventLoop->maxfd = fd; - return AE_OK; -} - -void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) -{ - if (fd >= eventLoop->setsize) return; - aeFileEvent *fe = &eventLoop->events[fd]; - if (fe->mask == AE_NONE) return; - - aeApiDelEvent(eventLoop, fd, mask); - fe->mask = fe->mask & (~mask); - if (fd == eventLoop->maxfd && fe->mask == AE_NONE) { - /* Update the max fd */ - int j; - - for (j = eventLoop->maxfd-1; j >= 0; j--) - if (eventLoop->events[j].mask != AE_NONE) break; - eventLoop->maxfd = j; - } -} - -int aeGetFileEvents(aeEventLoop *eventLoop, int fd) { - if (fd >= eventLoop->setsize) return 0; - aeFileEvent *fe = &eventLoop->events[fd]; - - return fe->mask; -} - -static void aeGetTime(long *seconds, long *milliseconds) -{ - struct timeval tv; - - gettimeofday(&tv, NULL); - *seconds = tv.tv_sec; - *milliseconds = tv.tv_usec/1000; -} - -static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) { - long cur_sec, cur_ms, when_sec, when_ms; - - aeGetTime(&cur_sec, &cur_ms); - when_sec = cur_sec + milliseconds/1000; - when_ms = cur_ms + milliseconds%1000; - if (when_ms >= 1000) { - when_sec ++; - when_ms -= 1000; - } - *sec = when_sec; - *ms = when_ms; -} - -long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, - aeTimeProc *proc, void *clientData, - aeEventFinalizerProc *finalizerProc) -{ - long long id = eventLoop->timeEventNextId++; - aeTimeEvent *te; - - te = zmalloc(sizeof(*te)); - if (te == NULL) return AE_ERR; - te->id = id; - aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms); - te->timeProc = proc; - te->finalizerProc = finalizerProc; - te->clientData = clientData; - te->next = eventLoop->timeEventHead; - eventLoop->timeEventHead = te; - return id; -} - -int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) -{ - aeTimeEvent *te = eventLoop->timeEventHead; - while(te) { - if (te->id == id) { - te->id = AE_DELETED_EVENT_ID; - return AE_OK; - } - te = te->next; - } - return AE_ERR; /* NO event with the specified ID found */ -} - -/* Search the first timer to fire. - * This operation is useful to know how many time the select can be - * put in sleep without to delay any event. - * If there are no timers NULL is returned. - * - * Note that's O(N) since time events are unsorted. - * Possible optimizations (not needed by Redis so far, but...): - * 1) Insert the event in order, so that the nearest is just the head. - * Much better but still insertion or deletion of timers is O(N). - * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)). - */ -static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) -{ - aeTimeEvent *te = eventLoop->timeEventHead; - aeTimeEvent *nearest = NULL; - - while(te) { - if (!nearest || te->when_sec < nearest->when_sec || - (te->when_sec == nearest->when_sec && - te->when_ms < nearest->when_ms)) - nearest = te; - te = te->next; - } - return nearest; -} - -/* Process time events */ -static int processTimeEvents(aeEventLoop *eventLoop) { - int processed = 0; - aeTimeEvent *te, *prev; - long long maxId; - time_t now = time(NULL); - - /* If the system clock is moved to the future, and then set back to the - * right value, time events may be delayed in a random way. Often this - * means that scheduled operations will not be performed soon enough. - * - * Here we try to detect system clock skews, and force all the time - * events to be processed ASAP when this happens: the idea is that - * processing events earlier is less dangerous than delaying them - * indefinitely, and practice suggests it is. */ - if (now < eventLoop->lastTime) { - te = eventLoop->timeEventHead; - while(te) { - te->when_sec = 0; - te = te->next; - } - } - eventLoop->lastTime = now; - - prev = NULL; - te = eventLoop->timeEventHead; - maxId = eventLoop->timeEventNextId-1; - while(te) { - long now_sec, now_ms; - long long id; - - /* Remove events scheduled for deletion. */ - if (te->id == AE_DELETED_EVENT_ID) { - aeTimeEvent *next = te->next; - if (prev == NULL) - eventLoop->timeEventHead = te->next; - else - prev->next = te->next; - if (te->finalizerProc) - te->finalizerProc(eventLoop, te->clientData); - zfree(te); - te = next; - continue; - } - - /* Make sure we don't process time events created by time events in - * this iteration. Note that this check is currently useless: we always - * add new timers on the head, however if we change the implementation - * detail, this check may be useful again: we keep it here for future - * defense. */ - if (te->id > maxId) { - te = te->next; - continue; - } - aeGetTime(&now_sec, &now_ms); - if (now_sec > te->when_sec || - (now_sec == te->when_sec && now_ms >= te->when_ms)) - { - int retval; - - id = te->id; - retval = te->timeProc(eventLoop, id, te->clientData); - processed++; - if (retval != AE_NOMORE) { - aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); - } else { - te->id = AE_DELETED_EVENT_ID; - } - } - prev = te; - te = te->next; - } - return processed; -} - -/* Process every pending time event, then every pending file event - * (that may be registered by time event callbacks just processed). - * Without special flags the function sleeps until some file event - * fires, or when the next time event occurs (if any). - * - * If flags is 0, the function does nothing and returns. - * if flags has AE_ALL_EVENTS set, all the kind of events are processed. - * if flags has AE_FILE_EVENTS set, file events are processed. - * if flags has AE_TIME_EVENTS set, time events are processed. - * if flags has AE_DONT_WAIT set the function returns ASAP until all - * the events that's possible to process without to wait are processed. - * - * The function returns the number of events processed. */ -int aeProcessEvents(aeEventLoop *eventLoop, int flags) -{ - int processed = 0, numevents; - - /* Nothing to do? return ASAP */ - if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; - - /* Note that we want call select() even if there are no - * file events to process as long as we want to process time - * events, in order to sleep until the next time event is ready - * to fire. */ - if (eventLoop->maxfd != -1 || - ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { - int j; - aeTimeEvent *shortest = NULL; - struct timeval tv, *tvp; - - if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) - shortest = aeSearchNearestTimer(eventLoop); - if (shortest) { - long now_sec, now_ms; - - aeGetTime(&now_sec, &now_ms); - tvp = &tv; - - /* How many milliseconds we need to wait for the next - * time event to fire? */ - long long ms = - (shortest->when_sec - now_sec)*1000 + - shortest->when_ms - now_ms; - - if (ms > 0) { - tvp->tv_sec = ms/1000; - tvp->tv_usec = (ms % 1000)*1000; - } else { - tvp->tv_sec = 0; - tvp->tv_usec = 0; - } - } else { - /* If we have to check for events but need to return - * ASAP because of AE_DONT_WAIT we need to set the timeout - * to zero */ - if (flags & AE_DONT_WAIT) { - tv.tv_sec = tv.tv_usec = 0; - tvp = &tv; - } else { - /* Otherwise we can block */ - tvp = NULL; /* wait forever */ - } - } - - numevents = aeApiPoll(eventLoop, tvp); - for (j = 0; j < numevents; j++) { - aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; - int mask = eventLoop->fired[j].mask; - int fd = eventLoop->fired[j].fd; - int rfired = 0; - - /* note the fe->mask & mask & ... code: maybe an already processed - * event removed an element that fired and we still didn't - * processed, so we check if the event is still valid. */ - if (fe->mask & mask & AE_READABLE) { - rfired = 1; - fe->rfileProc(eventLoop,fd,fe->clientData,mask); - } - if (fe->mask & mask & AE_WRITABLE) { - if (!rfired || fe->wfileProc != fe->rfileProc) - fe->wfileProc(eventLoop,fd,fe->clientData,mask); - } - processed++; - } - } - /* Check time events */ - if (flags & AE_TIME_EVENTS) - processed += processTimeEvents(eventLoop); - - return processed; /* return the number of processed file/time events */ -} - -/* Wait for milliseconds until the given file descriptor becomes - * writable/readable/exception */ -int aeWait(int fd, int mask, long long milliseconds) { - struct pollfd pfd; - int retmask = 0, retval; - - memset(&pfd, 0, sizeof(pfd)); - pfd.fd = fd; - if (mask & AE_READABLE) pfd.events |= POLLIN; - if (mask & AE_WRITABLE) pfd.events |= POLLOUT; - - if ((retval = poll(&pfd, 1, milliseconds))== 1) { - if (pfd.revents & POLLIN) retmask |= AE_READABLE; - if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE; - if (pfd.revents & POLLERR) retmask |= AE_WRITABLE; - if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE; - return retmask; - } else { - return retval; - } -} - -void aeMain(aeEventLoop *eventLoop) { - eventLoop->stop = 0; - while (!eventLoop->stop) { - if (eventLoop->beforesleep != NULL) - eventLoop->beforesleep(eventLoop); - aeProcessEvents(eventLoop, AE_ALL_EVENTS); - } -} - -char *aeGetApiName(void) { - return aeApiName(); -} - -void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) { - eventLoop->beforesleep = beforesleep; -} diff --git a/src/ray/thirdparty/ae/ae.h b/src/ray/thirdparty/ae/ae.h deleted file mode 100644 index 827c4c9e4e59e..0000000000000 --- a/src/ray/thirdparty/ae/ae.h +++ /dev/null @@ -1,123 +0,0 @@ -/* A simple event-driven programming library. Originally I wrote this code - * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated - * it in form of a library for easy reuse. - * - * Copyright (c) 2006-2012, Salvatore Sanfilippo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Redis nor the names of its contributors may be used - * to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#ifndef __AE_H__ -#define __AE_H__ - -#include - -#define AE_OK 0 -#define AE_ERR -1 - -#define AE_NONE 0 -#define AE_READABLE 1 -#define AE_WRITABLE 2 - -#define AE_FILE_EVENTS 1 -#define AE_TIME_EVENTS 2 -#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) -#define AE_DONT_WAIT 4 - -#define AE_NOMORE -1 -#define AE_DELETED_EVENT_ID -1 - -/* Macros */ -#define AE_NOTUSED(V) ((void) V) - -struct aeEventLoop; - -/* Types and data structures */ -typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); -typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); -typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); -typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop); - -/* File event structure */ -typedef struct aeFileEvent { - int mask; /* one of AE_(READABLE|WRITABLE) */ - aeFileProc *rfileProc; - aeFileProc *wfileProc; - void *clientData; -} aeFileEvent; - -/* Time event structure */ -typedef struct aeTimeEvent { - long long id; /* time event identifier. */ - long when_sec; /* seconds */ - long when_ms; /* milliseconds */ - aeTimeProc *timeProc; - aeEventFinalizerProc *finalizerProc; - void *clientData; - struct aeTimeEvent *next; -} aeTimeEvent; - -/* A fired event */ -typedef struct aeFiredEvent { - int fd; - int mask; -} aeFiredEvent; - -/* State of an event based program */ -typedef struct aeEventLoop { - int maxfd; /* highest file descriptor currently registered */ - int setsize; /* max number of file descriptors tracked */ - long long timeEventNextId; - time_t lastTime; /* Used to detect system clock skew */ - aeFileEvent *events; /* Registered events */ - aeFiredEvent *fired; /* Fired events */ - aeTimeEvent *timeEventHead; - int stop; - void *apidata; /* This is used for polling API specific data */ - aeBeforeSleepProc *beforesleep; -} aeEventLoop; - -/* Prototypes */ -aeEventLoop *aeCreateEventLoop(int setsize); -void aeDeleteEventLoop(aeEventLoop *eventLoop); -void aeStop(aeEventLoop *eventLoop); -int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, - aeFileProc *proc, void *clientData); -void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); -int aeGetFileEvents(aeEventLoop *eventLoop, int fd); -long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, - aeTimeProc *proc, void *clientData, - aeEventFinalizerProc *finalizerProc); -int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id); -int aeProcessEvents(aeEventLoop *eventLoop, int flags); -int aeWait(int fd, int mask, long long milliseconds); -void aeMain(aeEventLoop *eventLoop); -char *aeGetApiName(void); -void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); -int aeGetSetSize(aeEventLoop *eventLoop); -int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); - -#endif diff --git a/src/ray/thirdparty/ae/ae_epoll.c b/src/ray/thirdparty/ae/ae_epoll.c deleted file mode 100644 index 2f70550a9803a..0000000000000 --- a/src/ray/thirdparty/ae/ae_epoll.c +++ /dev/null @@ -1,137 +0,0 @@ -/* Linux epoll(2) based ae.c module - * - * Copyright (c) 2009-2012, Salvatore Sanfilippo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Redis nor the names of its contributors may be used - * to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - - -#include - -typedef struct aeApiState { - int epfd; - struct epoll_event *events; -} aeApiState; - -static int aeApiCreate(aeEventLoop *eventLoop) { - aeApiState *state = zmalloc(sizeof(aeApiState)); - - if (!state) return -1; - state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); - if (!state->events) { - zfree(state); - return -1; - } - state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ - if (state->epfd == -1) { - zfree(state->events); - zfree(state); - return -1; - } - eventLoop->apidata = state; - return 0; -} - -static int aeApiResize(aeEventLoop *eventLoop, int setsize) { - aeApiState *state = eventLoop->apidata; - - state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize); - return 0; -} - -static void aeApiFree(aeEventLoop *eventLoop) { - aeApiState *state = eventLoop->apidata; - - close(state->epfd); - zfree(state->events); - zfree(state); -} - -static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { - aeApiState *state = eventLoop->apidata; - struct epoll_event ee; - memset(&ee, 0, sizeof(struct epoll_event)); // avoid valgrind warning - /* If the fd was already monitored for some event, we need a MOD - * operation. Otherwise we need an ADD operation. */ - int op = eventLoop->events[fd].mask == AE_NONE ? - EPOLL_CTL_ADD : EPOLL_CTL_MOD; - - ee.events = 0; - mask |= eventLoop->events[fd].mask; /* Merge old events */ - if (mask & AE_READABLE) ee.events |= EPOLLIN; - if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; - ee.data.fd = fd; - if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; - return 0; -} - -static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) { - aeApiState *state = eventLoop->apidata; - struct epoll_event ee; - memset(&ee, 0, sizeof(struct epoll_event)); // avoid valgrind warning - int mask = eventLoop->events[fd].mask & (~delmask); - - ee.events = 0; - if (mask & AE_READABLE) ee.events |= EPOLLIN; - if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; - ee.data.fd = fd; - if (mask != AE_NONE) { - epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee); - } else { - /* Note, Kernel < 2.6.9 requires a non null event pointer even for - * EPOLL_CTL_DEL. */ - epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee); - } -} - -static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { - aeApiState *state = eventLoop->apidata; - int retval, numevents = 0; - - retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, - tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); - if (retval > 0) { - int j; - - numevents = retval; - for (j = 0; j < numevents; j++) { - int mask = 0; - struct epoll_event *e = state->events+j; - - if (e->events & EPOLLIN) mask |= AE_READABLE; - if (e->events & EPOLLOUT) mask |= AE_WRITABLE; - if (e->events & EPOLLERR) mask |= AE_WRITABLE; - if (e->events & EPOLLHUP) mask |= AE_WRITABLE; - eventLoop->fired[j].fd = e->data.fd; - eventLoop->fired[j].mask = mask; - } - } - return numevents; -} - -static char *aeApiName(void) { - return "epoll"; -} diff --git a/src/ray/thirdparty/ae/ae_evport.c b/src/ray/thirdparty/ae/ae_evport.c deleted file mode 100644 index b79ed9bc73daa..0000000000000 --- a/src/ray/thirdparty/ae/ae_evport.c +++ /dev/null @@ -1,320 +0,0 @@ -/* ae.c module for illumos event ports. - * - * Copyright (c) 2012, Joyent, Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Redis nor the names of its contributors may be used - * to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - - -#include -#include -#include -#include - -#include -#include - -#include - -static int evport_debug = 0; - -/* - * This file implements the ae API using event ports, present on Solaris-based - * systems since Solaris 10. Using the event port interface, we associate file - * descriptors with the port. Each association also includes the set of poll(2) - * events that the consumer is interested in (e.g., POLLIN and POLLOUT). - * - * There's one tricky piece to this implementation: when we return events via - * aeApiPoll, the corresponding file descriptors become dissociated from the - * port. This is necessary because poll events are level-triggered, so if the - * fd didn't become dissociated, it would immediately fire another event since - * the underlying state hasn't changed yet. We must re-associate the file - * descriptor, but only after we know that our caller has actually read from it. - * The ae API does not tell us exactly when that happens, but we do know that - * it must happen by the time aeApiPoll is called again. Our solution is to - * keep track of the last fds returned by aeApiPoll and re-associate them next - * time aeApiPoll is invoked. - * - * To summarize, in this module, each fd association is EITHER (a) represented - * only via the in-kernel association OR (b) represented by pending_fds and - * pending_masks. (b) is only true for the last fds we returned from aeApiPoll, - * and only until we enter aeApiPoll again (at which point we restore the - * in-kernel association). - */ -#define MAX_EVENT_BATCHSZ 512 - -typedef struct aeApiState { - int portfd; /* event port */ - int npending; /* # of pending fds */ - int pending_fds[MAX_EVENT_BATCHSZ]; /* pending fds */ - int pending_masks[MAX_EVENT_BATCHSZ]; /* pending fds' masks */ -} aeApiState; - -static int aeApiCreate(aeEventLoop *eventLoop) { - int i; - aeApiState *state = zmalloc(sizeof(aeApiState)); - if (!state) return -1; - - state->portfd = port_create(); - if (state->portfd == -1) { - zfree(state); - return -1; - } - - state->npending = 0; - - for (i = 0; i < MAX_EVENT_BATCHSZ; i++) { - state->pending_fds[i] = -1; - state->pending_masks[i] = AE_NONE; - } - - eventLoop->apidata = state; - return 0; -} - -static int aeApiResize(aeEventLoop *eventLoop, int setsize) { - /* Nothing to resize here. */ - return 0; -} - -static void aeApiFree(aeEventLoop *eventLoop) { - aeApiState *state = eventLoop->apidata; - - close(state->portfd); - zfree(state); -} - -static int aeApiLookupPending(aeApiState *state, int fd) { - int i; - - for (i = 0; i < state->npending; i++) { - if (state->pending_fds[i] == fd) - return (i); - } - - return (-1); -} - -/* - * Helper function to invoke port_associate for the given fd and mask. - */ -static int aeApiAssociate(const char *where, int portfd, int fd, int mask) { - int events = 0; - int rv, err; - - if (mask & AE_READABLE) - events |= POLLIN; - if (mask & AE_WRITABLE) - events |= POLLOUT; - - if (evport_debug) - fprintf(stderr, "%s: port_associate(%d, 0x%x) = ", where, fd, events); - - rv = port_associate(portfd, PORT_SOURCE_FD, fd, events, - (void *)(uintptr_t)mask); - err = errno; - - if (evport_debug) - fprintf(stderr, "%d (%s)\n", rv, rv == 0 ? "no error" : strerror(err)); - - if (rv == -1) { - fprintf(stderr, "%s: port_associate: %s\n", where, strerror(err)); - - if (err == EAGAIN) - fprintf(stderr, "aeApiAssociate: event port limit exceeded."); - } - - return rv; -} - -static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { - aeApiState *state = eventLoop->apidata; - int fullmask, pfd; - - if (evport_debug) - fprintf(stderr, "aeApiAddEvent: fd %d mask 0x%x\n", fd, mask); - - /* - * Since port_associate's "events" argument replaces any existing events, we - * must be sure to include whatever events are already associated when - * we call port_associate() again. - */ - fullmask = mask | eventLoop->events[fd].mask; - pfd = aeApiLookupPending(state, fd); - - if (pfd != -1) { - /* - * This fd was recently returned from aeApiPoll. It should be safe to - * assume that the consumer has processed that poll event, but we play - * it safer by simply updating pending_mask. The fd will be - * re-associated as usual when aeApiPoll is called again. - */ - if (evport_debug) - fprintf(stderr, "aeApiAddEvent: adding to pending fd %d\n", fd); - state->pending_masks[pfd] |= fullmask; - return 0; - } - - return (aeApiAssociate("aeApiAddEvent", state->portfd, fd, fullmask)); -} - -static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) { - aeApiState *state = eventLoop->apidata; - int fullmask, pfd; - - if (evport_debug) - fprintf(stderr, "del fd %d mask 0x%x\n", fd, mask); - - pfd = aeApiLookupPending(state, fd); - - if (pfd != -1) { - if (evport_debug) - fprintf(stderr, "deleting event from pending fd %d\n", fd); - - /* - * This fd was just returned from aeApiPoll, so it's not currently - * associated with the port. All we need to do is update - * pending_mask appropriately. - */ - state->pending_masks[pfd] &= ~mask; - - if (state->pending_masks[pfd] == AE_NONE) - state->pending_fds[pfd] = -1; - - return; - } - - /* - * The fd is currently associated with the port. Like with the add case - * above, we must look at the full mask for the file descriptor before - * updating that association. We don't have a good way of knowing what the - * events are without looking into the eventLoop state directly. We rely on - * the fact that our caller has already updated the mask in the eventLoop. - */ - - fullmask = eventLoop->events[fd].mask; - if (fullmask == AE_NONE) { - /* - * We're removing *all* events, so use port_dissociate to remove the - * association completely. Failure here indicates a bug. - */ - if (evport_debug) - fprintf(stderr, "aeApiDelEvent: port_dissociate(%d)\n", fd); - - if (port_dissociate(state->portfd, PORT_SOURCE_FD, fd) != 0) { - perror("aeApiDelEvent: port_dissociate"); - abort(); /* will not return */ - } - } else if (aeApiAssociate("aeApiDelEvent", state->portfd, fd, - fullmask) != 0) { - /* - * ENOMEM is a potentially transient condition, but the kernel won't - * generally return it unless things are really bad. EAGAIN indicates - * we've reached a resource limit, for which it doesn't make sense to - * retry (counter-intuitively). All other errors indicate a bug. In any - * of these cases, the best we can do is to abort. - */ - abort(); /* will not return */ - } -} - -static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { - aeApiState *state = eventLoop->apidata; - struct timespec timeout, *tsp; - int mask, i; - uint_t nevents; - port_event_t event[MAX_EVENT_BATCHSZ]; - - /* - * If we've returned fd events before, we must re-associate them with the - * port now, before calling port_get(). See the block comment at the top of - * this file for an explanation of why. - */ - for (i = 0; i < state->npending; i++) { - if (state->pending_fds[i] == -1) - /* This fd has since been deleted. */ - continue; - - if (aeApiAssociate("aeApiPoll", state->portfd, - state->pending_fds[i], state->pending_masks[i]) != 0) { - /* See aeApiDelEvent for why this case is fatal. */ - abort(); - } - - state->pending_masks[i] = AE_NONE; - state->pending_fds[i] = -1; - } - - state->npending = 0; - - if (tvp != NULL) { - timeout.tv_sec = tvp->tv_sec; - timeout.tv_nsec = tvp->tv_usec * 1000; - tsp = &timeout; - } else { - tsp = NULL; - } - - /* - * port_getn can return with errno == ETIME having returned some events (!). - * So if we get ETIME, we check nevents, too. - */ - nevents = 1; - if (port_getn(state->portfd, event, MAX_EVENT_BATCHSZ, &nevents, - tsp) == -1 && (errno != ETIME || nevents == 0)) { - if (errno == ETIME || errno == EINTR) - return 0; - - /* Any other error indicates a bug. */ - perror("aeApiPoll: port_get"); - abort(); - } - - state->npending = nevents; - - for (i = 0; i < nevents; i++) { - mask = 0; - if (event[i].portev_events & POLLIN) - mask |= AE_READABLE; - if (event[i].portev_events & POLLOUT) - mask |= AE_WRITABLE; - - eventLoop->fired[i].fd = event[i].portev_object; - eventLoop->fired[i].mask = mask; - - if (evport_debug) - fprintf(stderr, "aeApiPoll: fd %d mask 0x%x\n", - (int)event[i].portev_object, mask); - - state->pending_fds[i] = event[i].portev_object; - state->pending_masks[i] = (uintptr_t)event[i].portev_user; - } - - return nevents; -} - -static char *aeApiName(void) { - return "evport"; -} diff --git a/src/ray/thirdparty/ae/ae_kqueue.c b/src/ray/thirdparty/ae/ae_kqueue.c deleted file mode 100644 index 6796f4ceb5939..0000000000000 --- a/src/ray/thirdparty/ae/ae_kqueue.c +++ /dev/null @@ -1,138 +0,0 @@ -/* Kqueue(2)-based ae.c module - * - * Copyright (C) 2009 Harish Mallipeddi - harish.mallipeddi@gmail.com - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Redis nor the names of its contributors may be used - * to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - - -#include -#include -#include - -typedef struct aeApiState { - int kqfd; - struct kevent *events; -} aeApiState; - -static int aeApiCreate(aeEventLoop *eventLoop) { - aeApiState *state = zmalloc(sizeof(aeApiState)); - - if (!state) return -1; - state->events = zmalloc(sizeof(struct kevent)*eventLoop->setsize); - if (!state->events) { - zfree(state); - return -1; - } - state->kqfd = kqueue(); - if (state->kqfd == -1) { - zfree(state->events); - zfree(state); - return -1; - } - eventLoop->apidata = state; - return 0; -} - -static int aeApiResize(aeEventLoop *eventLoop, int setsize) { - aeApiState *state = eventLoop->apidata; - - state->events = zrealloc(state->events, sizeof(struct kevent)*setsize); - return 0; -} - -static void aeApiFree(aeEventLoop *eventLoop) { - aeApiState *state = eventLoop->apidata; - - close(state->kqfd); - zfree(state->events); - zfree(state); -} - -static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { - aeApiState *state = eventLoop->apidata; - struct kevent ke; - - if (mask & AE_READABLE) { - EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL); - if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1; - } - if (mask & AE_WRITABLE) { - EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); - if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1; - } - return 0; -} - -static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) { - aeApiState *state = eventLoop->apidata; - struct kevent ke; - - if (mask & AE_READABLE) { - EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); - kevent(state->kqfd, &ke, 1, NULL, 0, NULL); - } - if (mask & AE_WRITABLE) { - EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); - kevent(state->kqfd, &ke, 1, NULL, 0, NULL); - } -} - -static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { - aeApiState *state = eventLoop->apidata; - int retval, numevents = 0; - - if (tvp != NULL) { - struct timespec timeout; - timeout.tv_sec = tvp->tv_sec; - timeout.tv_nsec = tvp->tv_usec * 1000; - retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, - &timeout); - } else { - retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, - NULL); - } - - if (retval > 0) { - int j; - - numevents = retval; - for(j = 0; j < numevents; j++) { - int mask = 0; - struct kevent *e = state->events+j; - - if (e->filter == EVFILT_READ) mask |= AE_READABLE; - if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE; - eventLoop->fired[j].fd = e->ident; - eventLoop->fired[j].mask = mask; - } - } - return numevents; -} - -static char *aeApiName(void) { - return "kqueue"; -} diff --git a/src/ray/thirdparty/ae/ae_select.c b/src/ray/thirdparty/ae/ae_select.c deleted file mode 100644 index 0afd7816842d4..0000000000000 --- a/src/ray/thirdparty/ae/ae_select.c +++ /dev/null @@ -1,110 +0,0 @@ -/* Select()-based ae.c module. - * - * Copyright (c) 2009-2012, Salvatore Sanfilippo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Redis nor the names of its contributors may be used - * to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - - -#ifdef _WIN32 -#include -#else -#include -#endif -#include - -typedef struct aeApiState { - fd_set rfds, wfds; - /* We need to have a copy of the fd sets as it's not safe to reuse - * FD sets after select(). */ - fd_set _rfds, _wfds; -} aeApiState; - -static int aeApiCreate(aeEventLoop *eventLoop) { - aeApiState *state = zmalloc(sizeof(aeApiState)); - - if (!state) return -1; - FD_ZERO(&state->rfds); - FD_ZERO(&state->wfds); - eventLoop->apidata = state; - return 0; -} - -static int aeApiResize(aeEventLoop *eventLoop, int setsize) { - /* Just ensure we have enough room in the fd_set type. */ - if (setsize >= FD_SETSIZE) return -1; - return 0; -} - -static void aeApiFree(aeEventLoop *eventLoop) { - zfree(eventLoop->apidata); -} - -static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { - aeApiState *state = eventLoop->apidata; - - if (mask & AE_READABLE) FD_SET(fd,&state->rfds); - if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds); - return 0; -} - -static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) { - aeApiState *state = eventLoop->apidata; - - if (mask & AE_READABLE) FD_CLR(fd,&state->rfds); - if (mask & AE_WRITABLE) FD_CLR(fd,&state->wfds); -} - -static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { - aeApiState *state = eventLoop->apidata; - int retval, j, numevents = 0; - - memcpy(&state->_rfds,&state->rfds,sizeof(fd_set)); - memcpy(&state->_wfds,&state->wfds,sizeof(fd_set)); - - retval = select(eventLoop->maxfd+1, - &state->_rfds,&state->_wfds,NULL,tvp); - if (retval > 0) { - for (j = 0; j <= eventLoop->maxfd; j++) { - int mask = 0; - aeFileEvent *fe = &eventLoop->events[j]; - - if (fe->mask == AE_NONE) continue; - if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds)) - mask |= AE_READABLE; - if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds)) - mask |= AE_WRITABLE; - eventLoop->fired[numevents].fd = j; - eventLoop->fired[numevents].mask = mask; - numevents++; - } - } - return numevents; -} - -static char *aeApiName(void) { - return "select"; -} diff --git a/src/ray/thirdparty/ae/config.h b/src/ray/thirdparty/ae/config.h deleted file mode 100644 index 4f8e1ea1bc38c..0000000000000 --- a/src/ray/thirdparty/ae/config.h +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2009-2012, Salvatore Sanfilippo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Redis nor the names of its contributors may be used - * to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#ifndef __CONFIG_H -#define __CONFIG_H - -#ifdef __APPLE__ -#include -#endif - -/* Test for polling API */ -#ifdef __linux__ -#define HAVE_EPOLL 1 -#endif - -#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__) -#define HAVE_KQUEUE 1 -#endif - -#ifdef __sun -#include -#ifdef _DTRACE_VERSION -#define HAVE_EVPORT 1 -#endif -#endif - - -#endif diff --git a/src/ray/thirdparty/ae/zmalloc.h b/src/ray/thirdparty/ae/zmalloc.h deleted file mode 100644 index 6c27dd4e5c3d3..0000000000000 --- a/src/ray/thirdparty/ae/zmalloc.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2009-2012, Salvatore Sanfilippo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Redis nor the names of its contributors may be used - * to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#ifndef _ZMALLOC_H -#define _ZMALLOC_H - -#ifndef zmalloc -#define zmalloc malloc -#endif - -#ifndef zfree -#define zfree free -#endif - -#ifndef zrealloc -#define zrealloc realloc -#endif - -#endif /* _ZMALLOC_H */