From b999fda7ee02dccb28e9972e7f7ba21d70db5ed4 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 22 Feb 2023 04:31:25 +0000 Subject: [PATCH 1/9] reset --- src/ray/common/ray_config_def.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 5dbcc88584ae6..a592aa3fca08e 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -440,7 +440,7 @@ RAY_CONFIG(uint64_t, gcs_grpc_max_request_queued_max_bytes, 1024UL * 1024 * 1024 RAY_CONFIG(int32_t, gcs_client_check_connection_status_interval_milliseconds, 1000) /// Feature flag to use the ray syncer for resource synchronization -RAY_CONFIG(bool, use_ray_syncer, false) +RAY_CONFIG(bool, use_ray_syncer, true) /// Due to the protocol drawback, raylet needs to refresh the message if /// no message is received for a while. /// Refer to https://tinyurl.com/n6kvsp87 for more details From 98ea7ab208bc76bae6e3089d811814905d0a459e Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 22 Feb 2023 21:20:08 +0000 Subject: [PATCH 2/9] fix the crash --- src/ray/common/ray_syncer/ray_syncer-inl.h | 22 +++++++++++++++++----- src/ray/common/ray_syncer/ray_syncer.cc | 18 ++++++------------ 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/src/ray/common/ray_syncer/ray_syncer-inl.h b/src/ray/common/ray_syncer/ray_syncer-inl.h index ebb65f890a10f..fd59f75f39c9f 100644 --- a/src/ray/common/ray_syncer/ray_syncer-inl.h +++ b/src/ray/common/ray_syncer/ray_syncer-inl.h @@ -137,10 +137,20 @@ class RaySyncerBidiReactor { /// Disconnect will terminate the communication between local and remote node. /// It also needs to do proper cleanup. - virtual void Disconnect() = 0; + void Disconnect() { + if(!IsDisconnected()) { + DoDisconnect(); + disconnected_ = true; + } + }; + + /// Return true if it's disconnected. + bool IsDisconnected() const { return disconnected_; } private: + virtual void DoDisconnect() = 0; std::string remote_node_id_; + bool disconnected_ = false; }; /// This class implements the communication between two nodes except the initialization @@ -166,6 +176,10 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T { message_processor_(std::move(message_processor)) {} bool PushToSendingQueue(std::shared_ptr message) override { + if(IsDisconnected()) { + return false; + } + // Try to filter out the messages the target node already has. // Usually it'll be the case when the message is generated from the // target node or it's sent from the target node. @@ -339,9 +353,9 @@ class RayServerBidiReactor : public RaySyncerBidiReactorBase ~RayServerBidiReactor() override = default; - void Disconnect() override; private: + void DoDisconnect() override; void OnCancel() override; void OnDone() override; @@ -350,7 +364,6 @@ class RayServerBidiReactor : public RaySyncerBidiReactorBase /// grpc callback context grpc::CallbackServerContext *server_context_; - bool disconnected_ = false; FRIEND_TEST(SyncerReactorTest, TestReactorFailure); }; @@ -368,9 +381,9 @@ class RayClientBidiReactor : public RaySyncerBidiReactorBase ~RayClientBidiReactor() override = default; - void Disconnect() override; private: + void DoDisconnect() override; /// Callback from gRPC void OnDone(const grpc::Status &status) override; @@ -381,7 +394,6 @@ class RayClientBidiReactor : public RaySyncerBidiReactorBase grpc::ClientContext client_context_; std::unique_ptr stub_; - bool disconnected_ = false; }; } // namespace syncer diff --git a/src/ray/common/ray_syncer/ray_syncer.cc b/src/ray/common/ray_syncer/ray_syncer.cc index 2bb2f0bab311e..8b5bf9803251e 100644 --- a/src/ray/common/ray_syncer/ray_syncer.cc +++ b/src/ray/common/ray_syncer/ray_syncer.cc @@ -104,13 +104,10 @@ RayServerBidiReactor::RayServerBidiReactor( StartPull(); } -void RayServerBidiReactor::Disconnect() { +void RayServerBidiReactor::DoDisconnect() { io_context_.dispatch( [this]() { - if (!disconnected_) { - disconnected_ = true; - Finish(grpc::Status::OK); - } + Finish(grpc::Status::OK); }, ""); } @@ -155,15 +152,12 @@ void RayClientBidiReactor::OnDone(const grpc::Status &status) { ""); } -void RayClientBidiReactor::Disconnect() { +void RayClientBidiReactor::DoDisconnect() { io_context_.dispatch( [this]() { - if (!disconnected_) { - disconnected_ = true; - StartWritesDone(); - // Free the hold to allow OnDone being called. - RemoveHold(); - } + StartWritesDone(); + // Free the hold to allow OnDone being called. + RemoveHold(); }, ""); } From d398ff4d4523ea5d9ab5c0a24be0b3cbae5a3834 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 22 Feb 2023 21:34:57 +0000 Subject: [PATCH 3/9] fix --- src/ray/common/ray_syncer/ray_syncer-inl.h | 6 ++---- src/ray/common/ray_syncer/ray_syncer.cc | 6 +----- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/src/ray/common/ray_syncer/ray_syncer-inl.h b/src/ray/common/ray_syncer/ray_syncer-inl.h index fd59f75f39c9f..5043832d7f379 100644 --- a/src/ray/common/ray_syncer/ray_syncer-inl.h +++ b/src/ray/common/ray_syncer/ray_syncer-inl.h @@ -138,7 +138,7 @@ class RaySyncerBidiReactor { /// Disconnect will terminate the communication between local and remote node. /// It also needs to do proper cleanup. void Disconnect() { - if(!IsDisconnected()) { + if (!IsDisconnected()) { DoDisconnect(); disconnected_ = true; } @@ -176,7 +176,7 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T { message_processor_(std::move(message_processor)) {} bool PushToSendingQueue(std::shared_ptr message) override { - if(IsDisconnected()) { + if (IsDisconnected()) { return false; } @@ -353,7 +353,6 @@ class RayServerBidiReactor : public RaySyncerBidiReactorBase ~RayServerBidiReactor() override = default; - private: void DoDisconnect() override; void OnCancel() override; @@ -381,7 +380,6 @@ class RayClientBidiReactor : public RaySyncerBidiReactorBase ~RayClientBidiReactor() override = default; - private: void DoDisconnect() override; /// Callback from gRPC diff --git a/src/ray/common/ray_syncer/ray_syncer.cc b/src/ray/common/ray_syncer/ray_syncer.cc index 8b5bf9803251e..253a73a77da64 100644 --- a/src/ray/common/ray_syncer/ray_syncer.cc +++ b/src/ray/common/ray_syncer/ray_syncer.cc @@ -105,11 +105,7 @@ RayServerBidiReactor::RayServerBidiReactor( } void RayServerBidiReactor::DoDisconnect() { - io_context_.dispatch( - [this]() { - Finish(grpc::Status::OK); - }, - ""); + io_context_.dispatch([this]() { Finish(grpc::Status::OK); }, ""); } void RayServerBidiReactor::OnCancel() { Disconnect(); } From 2c7669f59ea865f4affc7ab29dcdf210c034d642 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 22 Feb 2023 23:56:45 +0000 Subject: [PATCH 4/9] fix --- src/mock/ray/common/ray_syncer/ray_syncer.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mock/ray/common/ray_syncer/ray_syncer.h b/src/mock/ray/common/ray_syncer/ray_syncer.h index 2ef4304206979..4c5fbf5535bd5 100644 --- a/src/mock/ray/common/ray_syncer/ray_syncer.h +++ b/src/mock/ray/common/ray_syncer/ray_syncer.h @@ -47,7 +47,7 @@ class MockRaySyncerBidiReactor : public RaySyncerBidiReactor { public: using RaySyncerBidiReactor::RaySyncerBidiReactor; - MOCK_METHOD(void, Disconnect, (), (override)); + MOCK_METHOD(void, DoDisconnect, (), (override)); MOCK_METHOD(bool, PushToSendingQueue, @@ -60,7 +60,7 @@ class MockRaySyncerBidiReactorBase : public RaySyncerBidiReactorBase { public: using RaySyncerBidiReactorBase::RaySyncerBidiReactorBase; - MOCK_METHOD(void, Disconnect, (), (override)); + MOCK_METHOD(void, DoDisconnect, (), (override)); }; } // namespace syncer From 9ac7f052fc51a0f943d828cd60b8a7184f2381da Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 23 Feb 2023 00:57:37 +0000 Subject: [PATCH 5/9] fix crash --- src/ray/common/ray_syncer/ray_syncer-inl.h | 43 ++++++++++++---------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/src/ray/common/ray_syncer/ray_syncer-inl.h b/src/ray/common/ray_syncer/ray_syncer-inl.h index 5043832d7f379..b29bc976d265f 100644 --- a/src/ray/common/ray_syncer/ray_syncer-inl.h +++ b/src/ray/common/ray_syncer/ray_syncer-inl.h @@ -139,8 +139,8 @@ class RaySyncerBidiReactor { /// It also needs to do proper cleanup. void Disconnect() { if (!IsDisconnected()) { - DoDisconnect(); disconnected_ = true; + DoDisconnect(); } }; @@ -268,7 +268,8 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T { } RAY_LOG(DEBUG) << "[BidiReactor] Sending message to " << NodeID::FromBinary(GetRemoteNodeID()) << " about node " - << NodeID::FromBinary(sending_message_->node_id()); + << NodeID::FromBinary(sending_message_->node_id()) << " with flush " + << flush; StartWrite(sending_message_.get(), opts); } @@ -278,29 +279,33 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T { using T::StartWrite; void OnWriteDone(bool ok) override { - if (ok) { - io_context_.dispatch([this]() { SendNext(); }, ""); - } else { - RAY_LOG_EVERY_N(ERROR, 100) - << "Failed to send the message to: " << NodeID::FromBinary(GetRemoteNodeID()); - Disconnect(); - } + io_context_.dispatch( + [this, ok]() { + if (ok) { + SendNext(); + } else { + RAY_LOG_EVERY_N(ERROR, 100) << "Failed to send the message to: " + << NodeID::FromBinary(GetRemoteNodeID()); + Disconnect(); + } + }, + ""); } void OnReadDone(bool ok) override { - if (ok) { - io_context_.dispatch( - [this, msg = std::move(receiving_message_)]() mutable { + io_context_.dispatch( + [this, ok, msg = std::move(receiving_message_)]() mutable { + if (ok) { RAY_CHECK(!msg->node_id().empty()); ReceiveUpdate(std::move(msg)); StartPull(); - }, - ""); - } else { - RAY_LOG_EVERY_N(ERROR, 100) - << "Failed to read the message from: " << NodeID::FromBinary(GetRemoteNodeID()); - Disconnect(); - } + } else { + RAY_LOG_EVERY_N(ERROR, 100) << "Failed to read the message from: " + << NodeID::FromBinary(GetRemoteNodeID()); + Disconnect(); + } + }, + ""); } /// grpc requests for sending and receiving From 542047126e043fe1455c5a5ac491639470401277 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 23 Feb 2023 01:35:11 +0000 Subject: [PATCH 6/9] fix test --- src/ray/common/ray_syncer/ray_syncer-inl.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ray/common/ray_syncer/ray_syncer-inl.h b/src/ray/common/ray_syncer/ray_syncer-inl.h index b29bc976d265f..a6e64f7b1b45b 100644 --- a/src/ray/common/ray_syncer/ray_syncer-inl.h +++ b/src/ray/common/ray_syncer/ray_syncer-inl.h @@ -151,6 +151,8 @@ class RaySyncerBidiReactor { virtual void DoDisconnect() = 0; std::string remote_node_id_; bool disconnected_ = false; + + FRIEND_TEST(SyncerReactorTest, TestReactorFailure); }; /// This class implements the communication between two nodes except the initialization From d9f9605c18136848f60672d76f11707c99f284a1 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 23 Feb 2023 02:54:04 +0000 Subject: [PATCH 7/9] fix --- release/release_tests.yaml | 3 ++- src/ray/common/ray_syncer/ray_syncer.cc | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 2d557c8816bc9..18571f2fe3cf6 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3881,6 +3881,7 @@ run: timeout: 3600 script: python distributed/test_many_actors.py + type: sdk_command wait_for_nodes: num_nodes: 65 @@ -4624,7 +4625,7 @@ script: RAY_DATASET_PUSH_BASED_SHUFFLE=1 python dataset/sort.py --num-partitions=1000 --partition-size=1e9 --shuffle wait_for_nodes: num_nodes: 20 - + type: sdk_command ##################### diff --git a/src/ray/common/ray_syncer/ray_syncer.cc b/src/ray/common/ray_syncer/ray_syncer.cc index 253a73a77da64..eb4d51740f88f 100644 --- a/src/ray/common/ray_syncer/ray_syncer.cc +++ b/src/ray/common/ray_syncer/ray_syncer.cc @@ -108,7 +108,11 @@ void RayServerBidiReactor::DoDisconnect() { io_context_.dispatch([this]() { Finish(grpc::Status::OK); }, ""); } -void RayServerBidiReactor::OnCancel() { Disconnect(); } +void RayServerBidiReactor::OnCancel() { + io_context_.dispatch([this]() { + Disconnect(); + }, ""); +} void RayServerBidiReactor::OnDone() { io_context_.dispatch( From 0ce41dd73300b8fb6ed397cedc147ef49232e653 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 23 Feb 2023 02:54:58 +0000 Subject: [PATCH 8/9] fix --- release/release_tests.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 18571f2fe3cf6..ac68e195cf65c 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3881,7 +3881,6 @@ run: timeout: 3600 script: python distributed/test_many_actors.py - type: sdk_command wait_for_nodes: num_nodes: 65 @@ -4625,7 +4624,6 @@ script: RAY_DATASET_PUSH_BASED_SHUFFLE=1 python dataset/sort.py --num-partitions=1000 --partition-size=1e9 --shuffle wait_for_nodes: num_nodes: 20 - type: sdk_command ##################### From 2cc3fa5bf5c28af2a05796af34ab3163ed318a7e Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 23 Feb 2023 21:41:53 +0000 Subject: [PATCH 9/9] fix --- src/ray/common/ray_syncer/ray_syncer.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/ray/common/ray_syncer/ray_syncer.cc b/src/ray/common/ray_syncer/ray_syncer.cc index eb4d51740f88f..3f56ffc945509 100644 --- a/src/ray/common/ray_syncer/ray_syncer.cc +++ b/src/ray/common/ray_syncer/ray_syncer.cc @@ -109,9 +109,7 @@ void RayServerBidiReactor::DoDisconnect() { } void RayServerBidiReactor::OnCancel() { - io_context_.dispatch([this]() { - Disconnect(); - }, ""); + io_context_.dispatch([this]() { Disconnect(); }, ""); } void RayServerBidiReactor::OnDone() {