Skip to content

Commit

Permalink
[core] Deflakey ray syncer test due to race condition. (ray-project#4…
Browse files Browse the repository at this point in the history
…1541)

The test is flakey because of the race condition. In the code we break the transaction into several parts for performance but it causes some issues.

- Disconnect is requested by the user => RPC terminates async
- Read/Write arrived => Read next is requested async
- 1 and 2 has race condition. The ordering matters. If Read next happens after RPC terminates, it'll crash.
  • Loading branch information
fishbone committed Dec 1, 2023
1 parent 091531c commit 959f508
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 13 deletions.
30 changes: 20 additions & 10 deletions src/ray/common/ray_syncer/ray_syncer-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,20 @@ class RaySyncerBidiReactor {
/// Disconnect will terminate the communication between local and remote node.
/// It also needs to do proper cleanup.
void Disconnect() {
if (!IsDisconnected()) {
disconnected_ = true;
if (!*disconnected_) {
*disconnected_ = true;
DoDisconnect();
}
};

/// Return true if it's disconnected.
bool IsDisconnected() const { return disconnected_; }
std::shared_ptr<bool> IsDisconnected() const { return disconnected_; }

std::string remote_node_id_;

private:
virtual void DoDisconnect() = 0;
std::string remote_node_id_;
bool disconnected_ = false;
std::shared_ptr<bool> disconnected_ = std::make_shared<bool>(false);

FRIEND_TEST(SyncerReactorTest, TestReactorFailure);
};
Expand Down Expand Up @@ -181,7 +182,7 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T {
message_processor_(std::move(message_processor)) {}

bool PushToSendingQueue(std::shared_ptr<const RaySyncMessage> message) override {
if (IsDisconnected()) {
if (*IsDisconnected()) {
return false;
}

Expand All @@ -205,7 +206,7 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T {
return false;
}

virtual ~RaySyncerBidiReactorBase() {}
virtual ~RaySyncerBidiReactorBase() = default;

void StartPull() {
receiving_message_ = std::make_shared<RaySyncMessage>();
Expand All @@ -218,7 +219,7 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T {
instrumented_io_context &io_context_;

private:
/// Handle the udpates sent from the remote node.
/// Handle the updates sent from the remote node.
///
/// \param messages The message received.
void ReceiveUpdate(std::shared_ptr<const RaySyncMessage> message) {
Expand Down Expand Up @@ -285,7 +286,10 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T {

void OnWriteDone(bool ok) override {
io_context_.dispatch(
[this, ok]() {
[this, disconnected = IsDisconnected(), ok]() {
if (*disconnected) {
return;
}
if (ok) {
SendNext();
} else {
Expand All @@ -299,7 +303,13 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T {

void OnReadDone(bool ok) override {
io_context_.dispatch(
[this, ok, msg = std::move(receiving_message_)]() mutable {
[this,
ok,
disconnected = IsDisconnected(),
msg = std::move(receiving_message_)]() mutable {
if (*disconnected) {
return;
}
if (ok) {
RAY_CHECK(!msg->node_id().empty());
ReceiveUpdate(std::move(msg));
Expand Down
4 changes: 2 additions & 2 deletions src/ray/common/ray_syncer/ray_syncer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ void RayServerBidiReactor::OnCancel() {

void RayServerBidiReactor::OnDone() {
io_context_.dispatch(
[this]() {
cleanup_cb_(GetRemoteNodeID(), false);
[this, cleanup_cb = cleanup_cb_, remote_node_id = GetRemoteNodeID()]() {
cleanup_cb(remote_node_id, false);
delete this;
},
"");
Expand Down
2 changes: 1 addition & 1 deletion src/ray/common/test/ray_syncer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ TEST_F(SyncerReactorTest, TestReactorFailure) {
auto [node_s, node_c] = GetNodeID();
ASSERT_TRUE(s != nullptr);
ASSERT_TRUE(c != nullptr);
s->disconnected_ = true;
*s->disconnected_ = true;
s->Finish(grpc::Status::CANCELLED);
auto c_cleanup = client_cleanup.get_future().get();
ASSERT_EQ(node_s, c_cleanup.first);
Expand Down

0 comments on commit 959f508

Please sign in to comment.