Skip to content

Commit

Permalink
[core][1/2] Improve liveness check in GCS (ray-project#26405)
Browse files Browse the repository at this point in the history
CheckAlive in GCS is only for checking GCS's liveness. But we also need to check the liveness for raylet.

In KubeRay, we can check the liveness directly by monitoring the raylet's liveness. But it's not good enough given that raylet's process liveness is not directly related to raylet's liveness.

For example, during a network partition, raylet is not able to connect to GCS. GCS mark raylet as dead. So for the cluster, although raylet process is still alive, it can't be treated as alive because GCS has told all the nodes that it's dead.

So for KubeRay, it also needs to talk with GCS to check whether it's alive or not.

This PR extends the CheckAlive API to include raylet address so that we can query GCS to get the cluster status directly.
  • Loading branch information
fishbone committed Jul 9, 2022
1 parent 0c469e4 commit 39cb1e5
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 20 deletions.
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ cc_library(
":raylet_client_lib",
":scheduler",
":worker_rpc",
"@boost//:bimap",
"@com_google_absl//absl/container:btree",
],
)
Expand Down
23 changes: 23 additions & 0 deletions python/ray/_private/gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ def __init__(self, gcs_address: Optional[str] = None, aio: bool = False):
self._gcs_address = gcs_address
self._aio = aio

@property
def address(self):
return self._gcs_address

def connect(self):
# GCS server uses a cached port, so it should use the same port after
# restarting. This means GCS address should stay the same for the
Expand Down Expand Up @@ -332,11 +336,30 @@ def __init__(
self._channel = channel
self._connect()

@property
def channel(self):
return self._channel

def _connect(self):
self._channel.connect()
self._kv_stub = gcs_service_pb2_grpc.InternalKVGcsServiceStub(
self._channel.channel()
)
self._heartbeat_info_stub = gcs_service_pb2_grpc.HeartbeatInfoGcsServiceStub(
self._channel.channel()
)

async def check_alive(
self, node_ips: List[bytes], timeout: Optional[float] = None
) -> List[bool]:
req = gcs_service_pb2.CheckAliveRequest(raylet_address=node_ips)
reply = await self._heartbeat_info_stub.CheckAlive(req, timeout=timeout)

if reply.status.code != GcsCode.OK:
raise RuntimeError(
f"GCS running at {self._channel.address} is unhealthy: {reply.status}"
)
return list(reply.raylet_alive)

async def internal_kv_get(
self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
Expand Down
3 changes: 2 additions & 1 deletion python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ def __init__(
self._raylet_ip_address,
redis_password=self.redis_password,
)
self._ray_params.node_manager_port = node_info.node_manager_port
if self._ray_params.node_manager_port == 0:
self._ray_params.node_manager_port = node_info.node_manager_port

# Makes sure the Node object has valid addresses after setup.
self.validate_ip_port(self.address)
Expand Down
8 changes: 8 additions & 0 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,14 @@ def job_hook(**kwargs):
sys.exit(0)


def find_free_port():
sock = socket.socket()
sock.bind(("", 0))
port = sock.getsockname()[1]
sock.close()
return port


@dataclasses.dataclass
class TestRayActivityResponse:
"""
Expand Down
46 changes: 44 additions & 2 deletions python/ray/tests/test_gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@

import grpc
import pytest

import ray
from ray._private.gcs_utils import GcsClient
import ray._private.gcs_utils as gcs_utils
from ray._private.test_utils import enable_external_redis
from ray._private.test_utils import (
enable_external_redis,
find_free_port,
async_wait_for_condition_async_predicate,
)
import ray._private.ray_constants as ray_constants


@contextlib.contextmanager
Expand Down Expand Up @@ -147,6 +151,44 @@ def test_external_storage_namespace_isolation(shutdown_only):
assert gcs_client.internal_kv_get(b"ABC", None) == b"DEF"


@pytest.mark.asyncio
async def test_check_liveness(monkeypatch, ray_start_cluster):
monkeypatch.setenv("RAY_num_heartbeats_timeout", "2")
cluster = ray_start_cluster
h = cluster.add_node(node_manager_port=find_free_port())
n1 = cluster.add_node(node_manager_port=find_free_port())
n2 = cluster.add_node(node_manager_port=find_free_port())
gcs_client = gcs_utils.GcsAioClient(address=cluster.address)
node_manager_addresses = [
f"{n.raylet_ip_address}:{n.node_manager_port}" for n in [h, n1, n2]
]

ret = await gcs_client.check_alive(node_manager_addresses)
assert ret == [True, True, True]

cluster.remove_node(n1)

async def check(expect_liveness):
ret = await gcs_client.check_alive(node_manager_addresses)
return ret == expect_liveness

await async_wait_for_condition_async_predicate(
check, expect_liveness=[True, False, True]
)

n2_raylet_process = n2.all_processes[ray_constants.PROCESS_TYPE_RAYLET][0].process
n2_raylet_process.kill()

# GCS hasn't marked it as dead yet.
ret = await gcs_client.check_alive(node_manager_addresses)
assert ret == [True, False, True]

# GCS will notice node dead soon
await async_wait_for_condition_async_predicate(
check, expect_liveness=[True, False, False]
)


if __name__ == "__main__":
import sys

Expand Down
37 changes: 37 additions & 0 deletions src/ray/gcs/gcs_client/test/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,43 @@ class GcsClientTest : public ::testing::TestWithParam<bool> {

INSTANTIATE_TEST_SUITE_P(RedisMigration, GcsClientTest, testing::Bool());

TEST_P(GcsClientTest, TestCheckAlive) {
auto node_info1 = Mocker::GenNodeInfo();
node_info1->set_node_manager_address("172.1.2.3");
node_info1->set_node_manager_port(31292);

auto node_info2 = Mocker::GenNodeInfo();
node_info2->set_node_manager_address("172.1.2.4");
node_info2->set_node_manager_port(31293);

auto channel = grpc::CreateChannel(absl::StrCat("127.0.0.1:", gcs_server_->GetPort()),
grpc::InsecureChannelCredentials());
auto stub = rpc::HeartbeatInfoGcsService::NewStub(std::move(channel));
rpc::CheckAliveRequest request;
*(request.mutable_raylet_address()->Add()) = "172.1.2.3:31292";
*(request.mutable_raylet_address()->Add()) = "172.1.2.4:31293";
{
grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() + 1s);
rpc::CheckAliveReply reply;
ASSERT_TRUE(stub->CheckAlive(&context, request, &reply).ok());
ASSERT_EQ(2, reply.raylet_alive_size());
ASSERT_FALSE(reply.raylet_alive().at(0));
ASSERT_FALSE(reply.raylet_alive().at(1));
}

ASSERT_TRUE(RegisterNode(*node_info1));
{
grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() + 1s);
rpc::CheckAliveReply reply;
ASSERT_TRUE(stub->CheckAlive(&context, request, &reply).ok());
ASSERT_EQ(2, reply.raylet_alive_size());
ASSERT_TRUE(reply.raylet_alive().at(0));
ASSERT_FALSE(reply.raylet_alive().at(1));
}
}

TEST_P(GcsClientTest, TestJobInfo) {
// Create job table data.
JobID add_job_id = JobID::FromInt(1);
Expand Down
47 changes: 34 additions & 13 deletions src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ GcsHeartbeatManager::GcsHeartbeatManager(
void GcsHeartbeatManager::Initialize(const GcsInitData &gcs_init_data) {
for (const auto &item : gcs_init_data.Nodes()) {
if (item.second.state() == rpc::GcsNodeInfo::ALIVE) {
heartbeats_.emplace(item.first, num_heartbeats_timeout_);
AddNode(item.second);
}
}
}
Expand All @@ -67,9 +67,24 @@ void GcsHeartbeatManager::Stop() {
}
}

void GcsHeartbeatManager::AddNode(const NodeID &node_id) {
void GcsHeartbeatManager::RemoveNode(const NodeID &node_id) {
io_service_.dispatch(
[this, node_id] {
node_map_.left.erase(node_id);
heartbeats_.erase(node_id);
},
"GcsHeartbeatManager::RemoveNode");
}

void GcsHeartbeatManager::AddNode(const rpc::GcsNodeInfo &node_info) {
auto node_id = NodeID::FromBinary(node_info.node_id());
auto node_addr = node_info.node_manager_address() + ":" +
std::to_string(node_info.node_manager_port());
io_service_.post(
[this, node_id] { heartbeats_.emplace(node_id, num_heartbeats_timeout_); },
[this, node_id, node_addr] {
node_map_.insert(NodeIDAddrBiMap::value_type(node_id, node_addr));
heartbeats_.emplace(node_id, num_heartbeats_timeout_);
},
"GcsHeartbeatManager.AddNode");
}

Expand All @@ -94,20 +109,26 @@ void GcsHeartbeatManager::HandleCheckAlive(const rpc::CheckAliveRequest &request
rpc::CheckAliveReply *reply,
rpc::SendReplyCallback send_reply_callback) {
reply->set_ray_version(kRayVersion);
for (const auto &addr : request.raylet_address()) {
reply->mutable_raylet_alive()->Add(node_map_.right.count(addr) != 0);
}

GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}

void GcsHeartbeatManager::DetectDeadNodes() {
for (auto it = heartbeats_.begin(); it != heartbeats_.end();) {
auto current = it++;
current->second = current->second - 1;
if (current->second == 0) {
auto node_id = current->first;
RAY_LOG(WARNING) << "Node timed out: " << node_id;
heartbeats_.erase(current);
if (on_node_death_callback_) {
on_node_death_callback_(node_id);
}
std::vector<NodeID> dead_nodes;
for (auto &current : heartbeats_) {
current.second = current.second - 1;
if (current.second == 0) {
RAY_LOG(WARNING) << "Node timed out: " << current.first;
dead_nodes.push_back(current.first);
}
}
for (const auto &node_id : dead_nodes) {
RemoveNode(node_id);
if (on_node_death_callback_) {
on_node_death_callback_(node_id);
}
}
}
Expand Down
17 changes: 15 additions & 2 deletions src/ray/gcs/gcs_server/gcs_heartbeat_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

#pragma once

#include <boost/bimap.hpp>
#include <boost/bimap/unordered_set_of.hpp>

#include "absl/container/flat_hash_map.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/asio/periodical_runner.h"
Expand Down Expand Up @@ -66,8 +69,13 @@ class GcsHeartbeatManager : public rpc::HeartbeatInfoHandler {
/// Register node to this detector.
/// Only if the node has registered, its heartbeat data will be accepted.
///
/// \param node_id ID of the node to be registered.
void AddNode(const NodeID &node_id);
/// \param node_info The node to be registered.
void AddNode(const rpc::GcsNodeInfo &node_info);

/// Remove a node from this detector.
///
/// \param node_id The node to be removed.
void RemoveNode(const NodeID &node_id);

protected:
/// Check that if any raylet is inactive due to no heartbeat for a period of time.
Expand All @@ -89,6 +97,11 @@ class GcsHeartbeatManager : public rpc::HeartbeatInfoHandler {
absl::flat_hash_map<NodeID, int64_t> heartbeats_;
/// Is the detect started.
bool is_started_ = false;
/// A map of NodeId <-> ip:port of raylet
using NodeIDAddrBiMap =
boost::bimap<boost::bimaps::unordered_set_of<NodeID, std::hash<NodeID>>,
boost::bimaps::unordered_set_of<std::string>>;
NodeIDAddrBiMap node_map_;
};

} // namespace gcs
Expand Down
5 changes: 3 additions & 2 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ void GcsServer::InstallEventListeners() {
gcs_resource_manager_->OnNodeAdd(*node);
gcs_placement_group_manager_->OnNodeAdd(node_id);
gcs_actor_manager_->SchedulePendingActors();
gcs_heartbeat_manager_->AddNode(NodeID::FromBinary(node->node_id()));
gcs_heartbeat_manager_->AddNode(*node);
cluster_task_manager_->ScheduleAndDispatchTasks();
if (RayConfig::instance().use_ray_syncer()) {
rpc::Address address;
Expand All @@ -623,7 +623,8 @@ void GcsServer::InstallEventListeners() {
gcs_resource_manager_->OnNodeDead(node_id);
gcs_placement_group_manager_->OnNodeDead(node_id);
gcs_actor_manager_->OnNodeDead(node_id, node_ip_address);
raylet_client_pool_->Disconnect(NodeID::FromBinary(node->node_id()));
raylet_client_pool_->Disconnect(node_id);
gcs_heartbeat_manager_->RemoveNode(node_id);
if (RayConfig::instance().use_ray_syncer()) {
ray_syncer_->Disconnect(node_id.Binary());
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/ray/protobuf/gcs_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,13 @@ message ReportHeartbeatReply {
}

message CheckAliveRequest {
repeated string raylet_address = 1;
}

message CheckAliveReply {
GcsStatus status = 1;
string ray_version = 2;
repeated bool raylet_alive = 3;
}

message GetInternalConfigRequest {
Expand Down

0 comments on commit 39cb1e5

Please sign in to comment.