Skip to content

Commit

Permalink
Use Plasma with LRU refreshing integrated (ray-project#6050)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz committed Nov 4, 2019
1 parent 8948855 commit 1c54468
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 25 deletions.
31 changes: 22 additions & 9 deletions bazel/BUILD.plasma
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,45 @@ cc_library(
"cpp/src/arrow/io/interfaces.cc",
"cpp/src/arrow/memory_pool.cc",
"cpp/src/arrow/status.cc",
"cpp/src/arrow/util/io-util.cc",
"cpp/src/arrow/util/io_util.cc",
"cpp/src/arrow/util/logging.cc",
"cpp/src/arrow/util/memory.cc",
"cpp/src/arrow/util/string.cc",
"cpp/src/arrow/util/string_builder.cc",
"cpp/src/arrow/util/thread-pool.cc",
"cpp/src/arrow/util/thread_pool.cc",
],
hdrs = [
"cpp/src/arrow/buffer.h",
"cpp/src/arrow/io/concurrency.h",
"cpp/src/arrow/io/interfaces.h",
"cpp/src/arrow/io/util_internal.h",
"cpp/src/arrow/memory_pool.h",
"cpp/src/arrow/result.h",
"cpp/src/arrow/status.h",
"cpp/src/arrow/util/bit-util.h",
"cpp/src/arrow/util/io-util.h",
"cpp/src/arrow/type_fwd.h",
"cpp/src/arrow/util/bit_util.h",
"cpp/src/arrow/util/checked_cast.h",
"cpp/src/arrow/util/compare.h",
"cpp/src/arrow/util/functional.h",
"cpp/src/arrow/util/io_util.h",
"cpp/src/arrow/util/iterator.h",
"cpp/src/arrow/util/logging.h",
"cpp/src/arrow/util/macros.h",
"cpp/src/arrow/util/memory.h",
"cpp/src/arrow/util/stl.h",
"cpp/src/arrow/util/string.h",
"cpp/src/arrow/util/string_builder.h",
"cpp/src/arrow/util/string_view.h",
"cpp/src/arrow/util/thread-pool.h",
"cpp/src/arrow/util/thread_pool.h",
"cpp/src/arrow/util/type_traits.h",
"cpp/src/arrow/util/ubsan.h",
"cpp/src/arrow/util/variant.h",
"cpp/src/arrow/util/visibility.h",
"cpp/src/arrow/util/windows_compatibility.h",
"cpp/src/arrow/vendored/string_view.hpp",
"cpp/src/arrow/vendored/variant.hpp",
"cpp/src/arrow/vendored/xxhash.h",
"cpp/src/arrow/vendored/xxhash/xxh3.h",
"cpp/src/arrow/vendored/xxhash/xxhash.c",
"cpp/src/arrow/vendored/xxhash/xxhash.h",
],
Expand Down Expand Up @@ -176,17 +189,17 @@ FLATC_ARGS = [

flatbuffer_cc_library(
name = "common_fbs",
srcs = ["cpp/src/plasma/format/common.fbs"],
srcs = ["cpp/src/plasma/common.fbs"],
flatc_args = FLATC_ARGS,
out_prefix = "cpp/src/plasma/",
)

flatbuffer_cc_library(
name = "plasma_fbs",
srcs = ["cpp/src/plasma/format/plasma.fbs"],
srcs = ["cpp/src/plasma/plasma.fbs"],
flatc_args = FLATC_ARGS,
includes = ["cpp/src/plasma/format/common.fbs"],
includes = ["cpp/src/plasma/common.fbs"],
out_prefix = "cpp/src/plasma/",
)

exports_files(["cpp/src/plasma/format/common.fbs"])
exports_files(["cpp/src/plasma/common.fbs"])
1 change: 0 additions & 1 deletion bazel/ray_deps_build_all.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ def ray_deps_build_all():
grpc_deps()
java_proto_compile()
python_proto_compile()

2 changes: 1 addition & 1 deletion bazel/ray_deps_setup.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def ray_deps_setup():
new_git_repository(
name = "plasma",
build_file = "@//bazel:BUILD.plasma",
commit = "141a213a54f4979ab0b94b94928739359a2ee9ad",
commit = "86f34aa07e611787d9cc98c6a33b0a0a536dce57",
remote = "https://github.com/apache/arrow",
)

Expand Down
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pushd "$BUILD_DIR"
if [ -z "$SKIP_PYARROW_INSTALL" ]; then
"$PYTHON_EXECUTABLE" -m pip install -q \
--target="$ROOT_DIR/python/ray/pyarrow_files" pyarrow==0.14.0.RAY \
--find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/516e15028091b5e287200b5df77d77f72d9a6c9a/index.html
--find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/3a11193d9530fe8ec7fdb98057f853b708f6f6ae/index.html
fi
export PYTHON_BIN_PATH="$PYTHON_EXECUTABLE"

Expand Down
2 changes: 1 addition & 1 deletion python/ray/experimental/async_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async def _async_init():
if handler is None:
worker = ray.worker.global_worker
plasma_client = thread_safe_client(
plasma.connect(worker.node.plasma_store_socket_name, None, 0, 300))
plasma.connect(worker.node.plasma_store_socket_name, 300))
loop = asyncio.get_event_loop()
plasma_client.subscribe()
rsock = plasma_client.get_notification_socket()
Expand Down
7 changes: 6 additions & 1 deletion python/ray/experimental/async_plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ def data_received(self, data):
i += INT64_SIZE
segment = self._buffer[i:i + msg_len]
i += msg_len
messages.append(self.plasma_client.decode_notification(segment))
(object_ids, object_sizes,
metadata_sizes) = self.plasma_client.decode_notifications(segment)
assert len(object_ids) == len(object_sizes) == len(metadata_sizes)
for j in range(len(object_ids)):
messages.append((object_ids[j], object_sizes[j],
metadata_sizes[j]))

self._buffer = self._buffer[i:]
self.plasma_event_handler.process_notifications(messages)
Expand Down
9 changes: 8 additions & 1 deletion src/ray/object_manager/format/object_manager.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ namespace ray.object_manager.protocol;

// Object information data structure.
// NOTE(pcm): This structure is replicated in
// https://github.com/apache/arrow/blob/master/cpp/src/plasma/format/common.fbs,
// https://github.com/apache/arrow/blob/master/cpp/src/plasma/common.fbs,
// so if you modify it, you should also modify that one.
table ObjectInfo {
// Object ID of this object.
Expand All @@ -24,3 +24,10 @@ table ObjectInfo {
// Specifies if this object was deleted or added.
is_deletion: bool;
}

// NOTE(pcm): This structure is replicated in
// https://github.com/apache/arrow/blob/master/cpp/src/plasma/plasma.fbs
// so if you modify it, you should also modify that one.
table PlasmaNotification {
object_info: [ObjectInfo];
}
24 changes: 14 additions & 10 deletions src/ray/object_manager/object_store_notification_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,20 @@ void ObjectStoreNotificationManager::ProcessStoreNotification(
<< "dmesg for previous errors: " << boost_to_ray_status(error).ToString();
}

const auto &object_info =
flatbuffers::GetRoot<object_manager::protocol::ObjectInfo>(notification_.data());
const ObjectID object_id =
ObjectID::FromPlasmaIdBinary(object_info->object_id()->str());
if (object_info->is_deletion()) {
ProcessStoreRemove(object_id);
} else {
object_manager::protocol::ObjectInfoT result;
object_info->UnPackTo(&result);
ProcessStoreAdd(result);
const auto &object_notification =
flatbuffers::GetRoot<object_manager::protocol::PlasmaNotification>(
notification_.data());
for (size_t i = 0; i < object_notification->object_info()->size(); ++i) {
auto object_info = object_notification->object_info()->Get(i);
const ObjectID object_id =
ObjectID::FromPlasmaIdBinary(object_info->object_id()->str());
if (object_info->is_deletion()) {
ProcessStoreRemove(object_id);
} else {
object_manager::protocol::ObjectInfoT result;
object_info->UnPackTo(&result);
ProcessStoreAdd(result);
}
}
NotificationWait();
}
Expand Down

0 comments on commit 1c54468

Please sign in to comment.