diff --git a/bazel/BUILD.plasma b/bazel/BUILD.plasma index 3f4a0e421669f..a6309449a7ce5 100644 --- a/bazel/BUILD.plasma +++ b/bazel/BUILD.plasma @@ -162,6 +162,28 @@ genrule( output_to_bindir = 1, ) +cc_library( + name = "ae", + srcs = [ + "cpp/src/plasma/thirdparty/ae/ae.c", + ], + hdrs = [ + "cpp/src/plasma/thirdparty/ae/ae.h", + "cpp/src/plasma/thirdparty/ae/ae_epoll.c", + "cpp/src/plasma/thirdparty/ae/ae_evport.c", + "cpp/src/plasma/thirdparty/ae/ae_kqueue.c", + "cpp/src/plasma/thirdparty/ae/ae_select.c", + "cpp/src/plasma/thirdparty/ae/config.h", + "cpp/src/plasma/thirdparty/ae/zmalloc.h", + ], + copts = COPTS, + includes = [ + "cpp/src/plasma/thirdparty/ae", + ], + strip_include_prefix = "cpp/src", + visibility = ["//visibility:public"], +) + cc_library( name = "plasma_lib", srcs = [ @@ -171,7 +193,6 @@ cc_library( "cpp/src/plasma/external_store.cc", "cpp/src/plasma/plasma_allocator.cc", "cpp/src/plasma/quota_aware_policy.cc", - "cpp/src/plasma/thirdparty/ae/ae.c", ], hdrs = [ "cpp/src/plasma/events.h", @@ -180,19 +201,13 @@ cc_library( "cpp/src/plasma/plasma_allocator.h", "cpp/src/plasma/quota_aware_policy.h", "cpp/src/plasma/store.h", - "cpp/src/plasma/thirdparty/ae/ae.h", - "cpp/src/plasma/thirdparty/ae/ae_epoll.c", - "cpp/src/plasma/thirdparty/ae/ae_evport.c", - "cpp/src/plasma/thirdparty/ae/ae_kqueue.c", - "cpp/src/plasma/thirdparty/ae/ae_select.c", - "cpp/src/plasma/thirdparty/ae/config.h", - "cpp/src/plasma/thirdparty/ae/zmalloc.h", "cpp/src/plasma/thirdparty/dlmalloc.c", ], copts = COPTS, linkopts = LINKOPTS, strip_include_prefix = "cpp/src", deps = [ + ":ae", ":plasma_client", "@com_github_google_glog//:glog", ], diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index 38e661f0b1a0c..4a5fb69c0a265 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -166,6 +166,13 @@ def ray_deps_setup(): commit = "86f34aa07e611787d9cc98c6a33b0a0a536dce57", remote = "https://github.com/apache/arrow", sha256 = "4f1956e74188fa15078c8ad560bbc298624320d2aafd21fe7a2511afee7ea841", + patches = [ + "//thirdparty/patches:arrow-headers-unused.patch", + "//thirdparty/patches:arrow-windows-export.patch", + "//thirdparty/patches:arrow-windows-poll.patch", + "//thirdparty/patches:arrow-windows-sigpipe.patch", + "//thirdparty/patches:arrow-windows-socket.patch", + ], ) github_repository( diff --git a/thirdparty/patches/arrow-headers-unused.patch b/thirdparty/patches/arrow-headers-unused.patch new file mode 100644 index 0000000000000..57add794ef60f --- /dev/null +++ b/thirdparty/patches/arrow-headers-unused.patch @@ -0,0 +1,28 @@ +diff --git cpp/src/plasma/client.cc cpp/src/plasma/client.cc +index 5142ee435..5266e3e66 100644 +--- cpp/src/plasma/client.cc ++++ cpp/src/plasma/client.cc +@@ -19,10 +19,6 @@ + + #include "plasma/client.h" + +-#ifdef _WIN32 +-#include +-#endif +- + #include + #include + #include +diff --git cpp/src/plasma/plasma.h cpp/src/plasma/plasma.h +index 79e33c2f0..c8241b2fe 100644 +--- cpp/src/plasma/plasma.h ++++ cpp/src/plasma/plasma.h +@@ -25,7 +25,6 @@ + #include + #include + #include +-#include // pid_t + + #include + #include +-- diff --git a/thirdparty/patches/arrow-windows-export.patch b/thirdparty/patches/arrow-windows-export.patch new file mode 100644 index 0000000000000..8a16c73c156b0 --- /dev/null +++ b/thirdparty/patches/arrow-windows-export.patch @@ -0,0 +1,15 @@ +diff --git cpp/src/arrow/util/logging.cc cpp/src/arrow/util/logging.cc +index e54a10e52..3dbfc7a12 100644 +--- cpp/src/arrow/util/logging.cc ++++ cpp/src/arrow/util/logging.cc +@@ -84,7 +84,9 @@ typedef google::LogMessage LoggingProvider; + typedef CerrLog LoggingProvider; + #endif + ++#if !defined(_WIN32) || defined(ARROW_STATIC) || defined(ARROW_EXPORTING) || !defined(ARROW_EXPORT) + ArrowLogLevel ArrowLog::severity_threshold_ = ArrowLogLevel::ARROW_INFO; ++#endif + // Keep the log directory. + static std::unique_ptr log_dir_; + +-- diff --git a/thirdparty/patches/arrow-windows-poll.patch b/thirdparty/patches/arrow-windows-poll.patch new file mode 100644 index 0000000000000..e2833f3dae44b --- /dev/null +++ b/thirdparty/patches/arrow-windows-poll.patch @@ -0,0 +1,49 @@ +diff --git cpp/src/plasma/thirdparty/ae/ae.c cpp/src/plasma/thirdparty/ae/ae.c +index dfb722444..96d9e537a 100644 +--- cpp/src/plasma/thirdparty/ae/ae.c ++++ cpp/src/plasma/thirdparty/ae/ae.c +@@ -428,19 +428,33 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) + /* Wait for milliseconds until the given file descriptor becomes + * writable/readable/exception */ + int aeWait(int fd, int mask, long long milliseconds) { +- struct pollfd pfd; ++ short revents = 0; ++ struct timeval tv = { milliseconds / 1000, (milliseconds % 1000) * 1000 }; + int retmask = 0, retval; + +- memset(&pfd, 0, sizeof(pfd)); +- pfd.fd = fd; +- if (mask & AE_READABLE) pfd.events |= POLLIN; +- if (mask & AE_WRITABLE) pfd.events |= POLLOUT; ++ fd_set rset, wset; ++ FD_ZERO(&rset); ++ FD_ZERO(&wset); ++ if (mask & AE_READABLE) { ++ FD_SET(fd, &rset); ++ } else if (mask & AE_WRITABLE) { ++ FD_SET(fd, &wset); ++ } ++ ++ if ((retval = select(fd + 1, &rset, &wset, NULL, &tv)) > 0) { ++ if (FD_ISSET(fd, &rset)) { ++ revents |= POLLIN; ++ } ++ if (FD_ISSET(fd, &wset)) { ++ revents |= POLLOUT; ++ } ++ } + +- if ((retval = poll(&pfd, 1, milliseconds))== 1) { +- if (pfd.revents & POLLIN) retmask |= AE_READABLE; +- if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE; +- if (pfd.revents & POLLERR) retmask |= AE_WRITABLE; +- if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE; ++ if (retval== 1) { ++ if (revents & POLLIN) retmask |= AE_READABLE; ++ if (revents & POLLOUT) retmask |= AE_WRITABLE; ++ if (revents & POLLERR) retmask |= AE_WRITABLE; ++ if (revents & POLLHUP) retmask |= AE_WRITABLE; + return retmask; + } else { + return retval; +-- diff --git a/thirdparty/patches/arrow-windows-sigpipe.patch b/thirdparty/patches/arrow-windows-sigpipe.patch new file mode 100644 index 0000000000000..67c3889ef3d37 --- /dev/null +++ b/thirdparty/patches/arrow-windows-sigpipe.patch @@ -0,0 +1,17 @@ +diff --git cpp/src/plasma/store.cc cpp/src/plasma/store.cc +index 01aabfc7c..876cb59a6 100644 +--- cpp/src/plasma/store.cc ++++ cpp/src/plasma/store.cc +@@ -1182,9 +1182,11 @@ void HandleSignal(int signal) { + + void StartServer(char* socket_name, std::string plasma_directory, bool hugepages_enabled, + std::shared_ptr external_store) { ++#ifndef _WIN32 // TODO(mehrdadn): Is there an equivalent of this we need for Windows? + // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write + // to a client that has already died, the store could die. + signal(SIGPIPE, SIG_IGN); ++#endif + + g_runner.reset(new PlasmaStoreRunner()); + signal(SIGTERM, HandleSignal); +-- diff --git a/thirdparty/patches/arrow-windows-socket.patch b/thirdparty/patches/arrow-windows-socket.patch new file mode 100644 index 0000000000000..09385ff0e829f --- /dev/null +++ b/thirdparty/patches/arrow-windows-socket.patch @@ -0,0 +1,34 @@ +diff --git cpp/src/plasma/client.cc cpp/src/plasma/client.cc +index 0cb1d81a2..5142ee435 100644 +--- cpp/src/plasma/client.cc ++++ cpp/src/plasma/client.cc +@@ -983,8 +983,13 @@ Status PlasmaClient::Impl::Subscribe(int* fd) { + // notifications from the Plasma store to the client. + socketpair(AF_UNIX, SOCK_STREAM, 0, sock); + // Make the socket non-blocking. ++#ifdef _WINSOCKAPI_ ++ unsigned long value = 1; ++ ARROW_CHECK(ioctlsocket(sock[1], FIONBIO, &value) == 0); ++#else + int flags = fcntl(sock[1], F_GETFL, 0); + ARROW_CHECK(fcntl(sock[1], F_SETFL, flags | O_NONBLOCK) == 0); ++#endif + // Tell the Plasma store about the subscription. + RETURN_NOT_OK(SendSubscribeRequest(store_conn_)); + // Send the file descriptor that the Plasma store should use to push +diff --git cpp/src/plasma/fling.cc cpp/src/plasma/fling.cc +index f0960aab6..2f3997534 100644 +--- cpp/src/plasma/fling.cc ++++ cpp/src/plasma/fling.cc +@@ -18,6 +18,10 @@ + + #include "arrow/util/logging.h" + ++#ifdef _WIN32 ++#include // socklen_t ++#endif ++ + void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len) { + iov->iov_base = buf; + iov->iov_len = 1; +--