diff --git a/bazel/BUILD.plasma b/bazel/BUILD.plasma index d3a9c86f4f93c..93bb6598ea857 100644 --- a/bazel/BUILD.plasma +++ b/bazel/BUILD.plasma @@ -9,19 +9,28 @@ cc_library( "cpp/src/arrow/io/interfaces.cc", "cpp/src/arrow/memory_pool.cc", "cpp/src/arrow/status.cc", - "cpp/src/arrow/util/io-util.cc", + "cpp/src/arrow/util/io_util.cc", "cpp/src/arrow/util/logging.cc", "cpp/src/arrow/util/memory.cc", + "cpp/src/arrow/util/string.cc", "cpp/src/arrow/util/string_builder.cc", - "cpp/src/arrow/util/thread-pool.cc", + "cpp/src/arrow/util/thread_pool.cc", ], hdrs = [ "cpp/src/arrow/buffer.h", + "cpp/src/arrow/io/concurrency.h", "cpp/src/arrow/io/interfaces.h", + "cpp/src/arrow/io/util_internal.h", "cpp/src/arrow/memory_pool.h", + "cpp/src/arrow/result.h", "cpp/src/arrow/status.h", - "cpp/src/arrow/util/bit-util.h", - "cpp/src/arrow/util/io-util.h", + "cpp/src/arrow/type_fwd.h", + "cpp/src/arrow/util/bit_util.h", + "cpp/src/arrow/util/checked_cast.h", + "cpp/src/arrow/util/compare.h", + "cpp/src/arrow/util/functional.h", + "cpp/src/arrow/util/io_util.h", + "cpp/src/arrow/util/iterator.h", "cpp/src/arrow/util/logging.h", "cpp/src/arrow/util/macros.h", "cpp/src/arrow/util/memory.h", @@ -29,12 +38,16 @@ cc_library( "cpp/src/arrow/util/string.h", "cpp/src/arrow/util/string_builder.h", "cpp/src/arrow/util/string_view.h", - "cpp/src/arrow/util/thread-pool.h", + "cpp/src/arrow/util/thread_pool.h", "cpp/src/arrow/util/type_traits.h", "cpp/src/arrow/util/ubsan.h", + "cpp/src/arrow/util/variant.h", "cpp/src/arrow/util/visibility.h", "cpp/src/arrow/util/windows_compatibility.h", "cpp/src/arrow/vendored/string_view.hpp", + "cpp/src/arrow/vendored/variant.hpp", + "cpp/src/arrow/vendored/xxhash.h", + "cpp/src/arrow/vendored/xxhash/xxh3.h", "cpp/src/arrow/vendored/xxhash/xxhash.c", "cpp/src/arrow/vendored/xxhash/xxhash.h", ], @@ -176,17 +189,17 @@ FLATC_ARGS = [ flatbuffer_cc_library( name = "common_fbs", - srcs = ["cpp/src/plasma/format/common.fbs"], + srcs = ["cpp/src/plasma/common.fbs"], flatc_args = FLATC_ARGS, out_prefix = "cpp/src/plasma/", ) flatbuffer_cc_library( name = "plasma_fbs", - srcs = ["cpp/src/plasma/format/plasma.fbs"], + srcs = ["cpp/src/plasma/plasma.fbs"], flatc_args = FLATC_ARGS, - includes = ["cpp/src/plasma/format/common.fbs"], + includes = ["cpp/src/plasma/common.fbs"], out_prefix = "cpp/src/plasma/", ) -exports_files(["cpp/src/plasma/format/common.fbs"]) +exports_files(["cpp/src/plasma/common.fbs"]) diff --git a/bazel/ray_deps_build_all.bzl b/bazel/ray_deps_build_all.bzl index eda88bece7d22..541661a3f22f2 100644 --- a/bazel/ray_deps_build_all.bzl +++ b/bazel/ray_deps_build_all.bzl @@ -17,4 +17,3 @@ def ray_deps_build_all(): grpc_deps() java_proto_compile() python_proto_compile() - diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index 4127e977e191a..42bb097dd9ba5 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -74,7 +74,7 @@ def ray_deps_setup(): new_git_repository( name = "plasma", build_file = "@//bazel:BUILD.plasma", - commit = "141a213a54f4979ab0b94b94928739359a2ee9ad", + commit = "86f34aa07e611787d9cc98c6a33b0a0a536dce57", remote = "https://github.com/apache/arrow", ) diff --git a/build.sh b/build.sh index 392482ed86506..89c6893c62e42 100755 --- a/build.sh +++ b/build.sh @@ -99,7 +99,7 @@ pushd "$BUILD_DIR" if [ -z "$SKIP_PYARROW_INSTALL" ]; then "$PYTHON_EXECUTABLE" -m pip install -q \ --target="$ROOT_DIR/python/ray/pyarrow_files" pyarrow==0.14.0.RAY \ - --find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/516e15028091b5e287200b5df77d77f72d9a6c9a/index.html + --find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/3a11193d9530fe8ec7fdb98057f853b708f6f6ae/index.html fi export PYTHON_BIN_PATH="$PYTHON_EXECUTABLE" diff --git a/python/ray/experimental/async_api.py b/python/ray/experimental/async_api.py index 18b511190a975..8f1eb7246e13e 100644 --- a/python/ray/experimental/async_api.py +++ b/python/ray/experimental/async_api.py @@ -72,7 +72,7 @@ async def _async_init(): if handler is None: worker = ray.worker.global_worker plasma_client = thread_safe_client( - plasma.connect(worker.node.plasma_store_socket_name, None, 0, 300)) + plasma.connect(worker.node.plasma_store_socket_name, 300)) loop = asyncio.get_event_loop() plasma_client.subscribe() rsock = plasma_client.get_notification_socket() diff --git a/python/ray/experimental/async_plasma.py b/python/ray/experimental/async_plasma.py index 674c6089a4607..53e240afcdaf9 100644 --- a/python/ray/experimental/async_plasma.py +++ b/python/ray/experimental/async_plasma.py @@ -39,7 +39,12 @@ def data_received(self, data): i += INT64_SIZE segment = self._buffer[i:i + msg_len] i += msg_len - messages.append(self.plasma_client.decode_notification(segment)) + (object_ids, object_sizes, + metadata_sizes) = self.plasma_client.decode_notifications(segment) + assert len(object_ids) == len(object_sizes) == len(metadata_sizes) + for j in range(len(object_ids)): + messages.append((object_ids[j], object_sizes[j], + metadata_sizes[j])) self._buffer = self._buffer[i:] self.plasma_event_handler.process_notifications(messages) diff --git a/src/ray/object_manager/format/object_manager.fbs b/src/ray/object_manager/format/object_manager.fbs index c928d69e672d8..1388ee9970075 100644 --- a/src/ray/object_manager/format/object_manager.fbs +++ b/src/ray/object_manager/format/object_manager.fbs @@ -3,7 +3,7 @@ namespace ray.object_manager.protocol; // Object information data structure. // NOTE(pcm): This structure is replicated in -// https://github.com/apache/arrow/blob/master/cpp/src/plasma/format/common.fbs, +// https://github.com/apache/arrow/blob/master/cpp/src/plasma/common.fbs, // so if you modify it, you should also modify that one. table ObjectInfo { // Object ID of this object. @@ -24,3 +24,10 @@ table ObjectInfo { // Specifies if this object was deleted or added. is_deletion: bool; } + +// NOTE(pcm): This structure is replicated in +// https://github.com/apache/arrow/blob/master/cpp/src/plasma/plasma.fbs +// so if you modify it, you should also modify that one. +table PlasmaNotification { + object_info: [ObjectInfo]; +} diff --git a/src/ray/object_manager/object_store_notification_manager.cc b/src/ray/object_manager/object_store_notification_manager.cc index 7bef7785e00c8..3b29ab2d5569d 100644 --- a/src/ray/object_manager/object_store_notification_manager.cc +++ b/src/ray/object_manager/object_store_notification_manager.cc @@ -61,16 +61,20 @@ void ObjectStoreNotificationManager::ProcessStoreNotification( << "dmesg for previous errors: " << boost_to_ray_status(error).ToString(); } - const auto &object_info = - flatbuffers::GetRoot(notification_.data()); - const ObjectID object_id = - ObjectID::FromPlasmaIdBinary(object_info->object_id()->str()); - if (object_info->is_deletion()) { - ProcessStoreRemove(object_id); - } else { - object_manager::protocol::ObjectInfoT result; - object_info->UnPackTo(&result); - ProcessStoreAdd(result); + const auto &object_notification = + flatbuffers::GetRoot( + notification_.data()); + for (size_t i = 0; i < object_notification->object_info()->size(); ++i) { + auto object_info = object_notification->object_info()->Get(i); + const ObjectID object_id = + ObjectID::FromPlasmaIdBinary(object_info->object_id()->str()); + if (object_info->is_deletion()) { + ProcessStoreRemove(object_id); + } else { + object_manager::protocol::ObjectInfoT result; + object_info->UnPackTo(&result); + ProcessStoreAdd(result); + } } NotificationWait(); }