From 39cb1e5f97633b485d881ba10dd28de3682267de Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Sat, 9 Jul 2022 16:32:31 +0000 Subject: [PATCH] [core][1/2] Improve liveness check in GCS (#26405) CheckAlive in GCS is only for checking GCS's liveness. But we also need to check the liveness for raylet. In KubeRay, we can check the liveness directly by monitoring the raylet's liveness. But it's not good enough given that raylet's process liveness is not directly related to raylet's liveness. For example, during a network partition, raylet is not able to connect to GCS. GCS mark raylet as dead. So for the cluster, although raylet process is still alive, it can't be treated as alive because GCS has told all the nodes that it's dead. So for KubeRay, it also needs to talk with GCS to check whether it's alive or not. This PR extends the CheckAlive API to include raylet address so that we can query GCS to get the cluster status directly. --- BUILD.bazel | 1 + python/ray/_private/gcs_utils.py | 23 +++++++++ python/ray/_private/node.py | 3 +- python/ray/_private/test_utils.py | 8 ++++ python/ray/tests/test_gcs_utils.py | 46 +++++++++++++++++- .../gcs/gcs_client/test/gcs_client_test.cc | 37 +++++++++++++++ .../gcs/gcs_server/gcs_heartbeat_manager.cc | 47 ++++++++++++++----- .../gcs/gcs_server/gcs_heartbeat_manager.h | 17 ++++++- src/ray/gcs/gcs_server/gcs_server.cc | 5 +- src/ray/protobuf/gcs_service.proto | 2 + 10 files changed, 169 insertions(+), 20 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index b7be2e5d20285..32cbcb2e88bfa 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -545,6 +545,7 @@ cc_library( ":raylet_client_lib", ":scheduler", ":worker_rpc", + "@boost//:bimap", "@com_google_absl//absl/container:btree", ], ) diff --git a/python/ray/_private/gcs_utils.py b/python/ray/_private/gcs_utils.py index 7f67c2d40c60a..edb560e60615d 100644 --- a/python/ray/_private/gcs_utils.py +++ b/python/ray/_private/gcs_utils.py @@ -159,6 +159,10 @@ def __init__(self, gcs_address: Optional[str] = None, aio: bool = False): self._gcs_address = gcs_address self._aio = aio + @property + def address(self): + return self._gcs_address + def connect(self): # GCS server uses a cached port, so it should use the same port after # restarting. This means GCS address should stay the same for the @@ -332,11 +336,30 @@ def __init__( self._channel = channel self._connect() + @property + def channel(self): + return self._channel + def _connect(self): self._channel.connect() self._kv_stub = gcs_service_pb2_grpc.InternalKVGcsServiceStub( self._channel.channel() ) + self._heartbeat_info_stub = gcs_service_pb2_grpc.HeartbeatInfoGcsServiceStub( + self._channel.channel() + ) + + async def check_alive( + self, node_ips: List[bytes], timeout: Optional[float] = None + ) -> List[bool]: + req = gcs_service_pb2.CheckAliveRequest(raylet_address=node_ips) + reply = await self._heartbeat_info_stub.CheckAlive(req, timeout=timeout) + + if reply.status.code != GcsCode.OK: + raise RuntimeError( + f"GCS running at {self._channel.address} is unhealthy: {reply.status}" + ) + return list(reply.raylet_alive) async def internal_kv_get( self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 9a5f566105b06..3c9a1f43acd99 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -331,7 +331,8 @@ def __init__( self._raylet_ip_address, redis_password=self.redis_password, ) - self._ray_params.node_manager_port = node_info.node_manager_port + if self._ray_params.node_manager_port == 0: + self._ray_params.node_manager_port = node_info.node_manager_port # Makes sure the Node object has valid addresses after setup. self.validate_ip_port(self.address) diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index a780165eaf9f3..ed0bf3ae01575 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -1369,6 +1369,14 @@ def job_hook(**kwargs): sys.exit(0) +def find_free_port(): + sock = socket.socket() + sock.bind(("", 0)) + port = sock.getsockname()[1] + sock.close() + return port + + @dataclasses.dataclass class TestRayActivityResponse: """ diff --git a/python/ray/tests/test_gcs_utils.py b/python/ray/tests/test_gcs_utils.py index 667185b4c1934..1e5b30819216c 100644 --- a/python/ray/tests/test_gcs_utils.py +++ b/python/ray/tests/test_gcs_utils.py @@ -5,11 +5,15 @@ import grpc import pytest - import ray from ray._private.gcs_utils import GcsClient import ray._private.gcs_utils as gcs_utils -from ray._private.test_utils import enable_external_redis +from ray._private.test_utils import ( + enable_external_redis, + find_free_port, + async_wait_for_condition_async_predicate, +) +import ray._private.ray_constants as ray_constants @contextlib.contextmanager @@ -147,6 +151,44 @@ def test_external_storage_namespace_isolation(shutdown_only): assert gcs_client.internal_kv_get(b"ABC", None) == b"DEF" +@pytest.mark.asyncio +async def test_check_liveness(monkeypatch, ray_start_cluster): + monkeypatch.setenv("RAY_num_heartbeats_timeout", "2") + cluster = ray_start_cluster + h = cluster.add_node(node_manager_port=find_free_port()) + n1 = cluster.add_node(node_manager_port=find_free_port()) + n2 = cluster.add_node(node_manager_port=find_free_port()) + gcs_client = gcs_utils.GcsAioClient(address=cluster.address) + node_manager_addresses = [ + f"{n.raylet_ip_address}:{n.node_manager_port}" for n in [h, n1, n2] + ] + + ret = await gcs_client.check_alive(node_manager_addresses) + assert ret == [True, True, True] + + cluster.remove_node(n1) + + async def check(expect_liveness): + ret = await gcs_client.check_alive(node_manager_addresses) + return ret == expect_liveness + + await async_wait_for_condition_async_predicate( + check, expect_liveness=[True, False, True] + ) + + n2_raylet_process = n2.all_processes[ray_constants.PROCESS_TYPE_RAYLET][0].process + n2_raylet_process.kill() + + # GCS hasn't marked it as dead yet. + ret = await gcs_client.check_alive(node_manager_addresses) + assert ret == [True, False, True] + + # GCS will notice node dead soon + await async_wait_for_condition_async_predicate( + check, expect_liveness=[True, False, False] + ) + + if __name__ == "__main__": import sys diff --git a/src/ray/gcs/gcs_client/test/gcs_client_test.cc b/src/ray/gcs/gcs_client/test/gcs_client_test.cc index d3c06d73eaaf9..a4b91183b4eb0 100644 --- a/src/ray/gcs/gcs_client/test/gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/gcs_client_test.cc @@ -459,6 +459,43 @@ class GcsClientTest : public ::testing::TestWithParam { INSTANTIATE_TEST_SUITE_P(RedisMigration, GcsClientTest, testing::Bool()); +TEST_P(GcsClientTest, TestCheckAlive) { + auto node_info1 = Mocker::GenNodeInfo(); + node_info1->set_node_manager_address("172.1.2.3"); + node_info1->set_node_manager_port(31292); + + auto node_info2 = Mocker::GenNodeInfo(); + node_info2->set_node_manager_address("172.1.2.4"); + node_info2->set_node_manager_port(31293); + + auto channel = grpc::CreateChannel(absl::StrCat("127.0.0.1:", gcs_server_->GetPort()), + grpc::InsecureChannelCredentials()); + auto stub = rpc::HeartbeatInfoGcsService::NewStub(std::move(channel)); + rpc::CheckAliveRequest request; + *(request.mutable_raylet_address()->Add()) = "172.1.2.3:31292"; + *(request.mutable_raylet_address()->Add()) = "172.1.2.4:31293"; + { + grpc::ClientContext context; + context.set_deadline(std::chrono::system_clock::now() + 1s); + rpc::CheckAliveReply reply; + ASSERT_TRUE(stub->CheckAlive(&context, request, &reply).ok()); + ASSERT_EQ(2, reply.raylet_alive_size()); + ASSERT_FALSE(reply.raylet_alive().at(0)); + ASSERT_FALSE(reply.raylet_alive().at(1)); + } + + ASSERT_TRUE(RegisterNode(*node_info1)); + { + grpc::ClientContext context; + context.set_deadline(std::chrono::system_clock::now() + 1s); + rpc::CheckAliveReply reply; + ASSERT_TRUE(stub->CheckAlive(&context, request, &reply).ok()); + ASSERT_EQ(2, reply.raylet_alive_size()); + ASSERT_TRUE(reply.raylet_alive().at(0)); + ASSERT_FALSE(reply.raylet_alive().at(1)); + } +} + TEST_P(GcsClientTest, TestJobInfo) { // Create job table data. JobID add_job_id = JobID::FromInt(1); diff --git a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc index 54d0c14d5cb86..19e68d1b126fe 100644 --- a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc @@ -41,7 +41,7 @@ GcsHeartbeatManager::GcsHeartbeatManager( void GcsHeartbeatManager::Initialize(const GcsInitData &gcs_init_data) { for (const auto &item : gcs_init_data.Nodes()) { if (item.second.state() == rpc::GcsNodeInfo::ALIVE) { - heartbeats_.emplace(item.first, num_heartbeats_timeout_); + AddNode(item.second); } } } @@ -67,9 +67,24 @@ void GcsHeartbeatManager::Stop() { } } -void GcsHeartbeatManager::AddNode(const NodeID &node_id) { +void GcsHeartbeatManager::RemoveNode(const NodeID &node_id) { + io_service_.dispatch( + [this, node_id] { + node_map_.left.erase(node_id); + heartbeats_.erase(node_id); + }, + "GcsHeartbeatManager::RemoveNode"); +} + +void GcsHeartbeatManager::AddNode(const rpc::GcsNodeInfo &node_info) { + auto node_id = NodeID::FromBinary(node_info.node_id()); + auto node_addr = node_info.node_manager_address() + ":" + + std::to_string(node_info.node_manager_port()); io_service_.post( - [this, node_id] { heartbeats_.emplace(node_id, num_heartbeats_timeout_); }, + [this, node_id, node_addr] { + node_map_.insert(NodeIDAddrBiMap::value_type(node_id, node_addr)); + heartbeats_.emplace(node_id, num_heartbeats_timeout_); + }, "GcsHeartbeatManager.AddNode"); } @@ -94,20 +109,26 @@ void GcsHeartbeatManager::HandleCheckAlive(const rpc::CheckAliveRequest &request rpc::CheckAliveReply *reply, rpc::SendReplyCallback send_reply_callback) { reply->set_ray_version(kRayVersion); + for (const auto &addr : request.raylet_address()) { + reply->mutable_raylet_alive()->Add(node_map_.right.count(addr) != 0); + } + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); } void GcsHeartbeatManager::DetectDeadNodes() { - for (auto it = heartbeats_.begin(); it != heartbeats_.end();) { - auto current = it++; - current->second = current->second - 1; - if (current->second == 0) { - auto node_id = current->first; - RAY_LOG(WARNING) << "Node timed out: " << node_id; - heartbeats_.erase(current); - if (on_node_death_callback_) { - on_node_death_callback_(node_id); - } + std::vector dead_nodes; + for (auto ¤t : heartbeats_) { + current.second = current.second - 1; + if (current.second == 0) { + RAY_LOG(WARNING) << "Node timed out: " << current.first; + dead_nodes.push_back(current.first); + } + } + for (const auto &node_id : dead_nodes) { + RemoveNode(node_id); + if (on_node_death_callback_) { + on_node_death_callback_(node_id); } } } diff --git a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h index 706c46e2fbe00..69e00ac1b756c 100644 --- a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h +++ b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h @@ -15,6 +15,9 @@ #pragma once +#include +#include + #include "absl/container/flat_hash_map.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/asio/periodical_runner.h" @@ -66,8 +69,13 @@ class GcsHeartbeatManager : public rpc::HeartbeatInfoHandler { /// Register node to this detector. /// Only if the node has registered, its heartbeat data will be accepted. /// - /// \param node_id ID of the node to be registered. - void AddNode(const NodeID &node_id); + /// \param node_info The node to be registered. + void AddNode(const rpc::GcsNodeInfo &node_info); + + /// Remove a node from this detector. + /// + /// \param node_id The node to be removed. + void RemoveNode(const NodeID &node_id); protected: /// Check that if any raylet is inactive due to no heartbeat for a period of time. @@ -89,6 +97,11 @@ class GcsHeartbeatManager : public rpc::HeartbeatInfoHandler { absl::flat_hash_map heartbeats_; /// Is the detect started. bool is_started_ = false; + /// A map of NodeId <-> ip:port of raylet + using NodeIDAddrBiMap = + boost::bimap>, + boost::bimaps::unordered_set_of>; + NodeIDAddrBiMap node_map_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index e817badec2455..a3ff396d668ff 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -600,7 +600,7 @@ void GcsServer::InstallEventListeners() { gcs_resource_manager_->OnNodeAdd(*node); gcs_placement_group_manager_->OnNodeAdd(node_id); gcs_actor_manager_->SchedulePendingActors(); - gcs_heartbeat_manager_->AddNode(NodeID::FromBinary(node->node_id())); + gcs_heartbeat_manager_->AddNode(*node); cluster_task_manager_->ScheduleAndDispatchTasks(); if (RayConfig::instance().use_ray_syncer()) { rpc::Address address; @@ -623,7 +623,8 @@ void GcsServer::InstallEventListeners() { gcs_resource_manager_->OnNodeDead(node_id); gcs_placement_group_manager_->OnNodeDead(node_id); gcs_actor_manager_->OnNodeDead(node_id, node_ip_address); - raylet_client_pool_->Disconnect(NodeID::FromBinary(node->node_id())); + raylet_client_pool_->Disconnect(node_id); + gcs_heartbeat_manager_->RemoveNode(node_id); if (RayConfig::instance().use_ray_syncer()) { ray_syncer_->Disconnect(node_id.Binary()); } else { diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index e97eebc1a4e03..3a2022cacce7d 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -189,11 +189,13 @@ message ReportHeartbeatReply { } message CheckAliveRequest { + repeated string raylet_address = 1; } message CheckAliveReply { GcsStatus status = 1; string ray_version = 2; + repeated bool raylet_alive = 3; } message GetInternalConfigRequest {