Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dashboard][2/2] Add endpoints to dashboard and dashboard_agent for liveness check of raylet and gcs #26408

Merged
merged 21 commits into from
Jul 9, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add test
  • Loading branch information
fishbone committed Jul 8, 2022
commit 40586b2e23f7d3ef4f684ac7162bb57b00c31b3e
Empty file.
3 changes: 2 additions & 1 deletion python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ def __init__(
self._raylet_ip_address,
redis_password=self.redis_password,
)
self._ray_params.node_manager_port = node_info.node_manager_port
if self._ray_params.node_manager_port == 0:
self._ray_params.node_manager_port = node_info.node_manager_port

# Makes sure the Node object has valid addresses after setup.
self.validate_ip_port(self.address)
Expand Down
8 changes: 8 additions & 0 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1366,3 +1366,11 @@ def job_hook(**kwargs):
cmd = " ".join(kwargs["entrypoint"])
print(f"hook intercepted: {cmd}")
sys.exit(0)


def find_free_port():
sock = socket.socket()
sock.bind(("", 0))
port = sock.getsockname()[1]
sock.close()
return port
47 changes: 45 additions & 2 deletions python/ray/tests/test_gcs_utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import contextlib
import os
import time
import signal
import sys

import grpc
import pytest

import ray
from ray._private.gcs_utils import GcsClient
import ray._private.gcs_utils as gcs_utils
from ray._private.test_utils import enable_external_redis
from ray._private.test_utils import (
enable_external_redis,
find_free_port,
async_wait_for_condition_async_predicate,
)
import ray._private.ray_constants as ray_constants


@contextlib.contextmanager
Expand Down Expand Up @@ -147,6 +152,44 @@ def test_external_storage_namespace_isolation(shutdown_only):
assert gcs_client.internal_kv_get(b"ABC", None) == b"DEF"


@pytest.mark.asyncio
async def test_check_liveness(monkeypatch, ray_start_cluster):
monkeypatch.setenv("RAY_num_heartbeats_timeout", "2")
cluster = ray_start_cluster
h = cluster.add_node(node_manager_port=find_free_port())
n1 = cluster.add_node(node_manager_port=find_free_port())
n2 = cluster.add_node(node_manager_port=find_free_port())
gcs_client = gcs_utils.GcsAioClient(address=cluster.address)
node_manager_addresses = [
f"{n.raylet_ip_address}:{n.node_manager_port}" for n in [h, n1, n2]
]

ret = await gcs_client.check_alive(node_manager_addresses)
assert ret == [True, True, True]

cluster.remove_node(n1)

async def check(expect_liveness):
ret = await gcs_client.check_alive(node_manager_addresses)
return ret == expect_liveness

await async_wait_for_condition_async_predicate(
check, expect_liveness=[True, False, True]
)

n2_raylet_process = n2.all_processes[ray_constants.PROCESS_TYPE_RAYLET][0].process
n2_raylet_process.kill()

# GCS hasn't marked it as dead yet.
ret = await gcs_client.check_alive(node_manager_addresses)
assert ret == [True, False, True]

# GCS will notice node dead soon
await async_wait_for_condition_async_predicate(
check, expect_liveness=[True, False, False]
)


if __name__ == "__main__":
import sys

Expand Down
37 changes: 37 additions & 0 deletions src/ray/gcs/gcs_client/test/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,43 @@ class GcsClientTest : public ::testing::TestWithParam<bool> {

INSTANTIATE_TEST_SUITE_P(RedisMigration, GcsClientTest, testing::Bool());

TEST_P(GcsClientTest, TestCheckAlive) {
auto node_info1 = Mocker::GenNodeInfo();
node_info1->set_node_manager_address("172.1.2.3");
node_info1->set_node_manager_port(31292);

auto node_info2 = Mocker::GenNodeInfo();
node_info2->set_node_manager_address("172.1.2.4");
node_info2->set_node_manager_port(31293);

auto channel = grpc::CreateChannel(absl::StrCat("127.0.0.1:", gcs_server_->GetPort()),
grpc::InsecureChannelCredentials());
auto stub = rpc::HeartbeatInfoGcsService::NewStub(std::move(channel));
rpc::CheckAliveRequest request;
*(request.mutable_raylet_address()->Add()) = "172.1.2.3:31292";
*(request.mutable_raylet_address()->Add()) = "172.1.2.4:31293";
{
grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() + 1s);
rpc::CheckAliveReply reply;
ASSERT_TRUE(stub->CheckAlive(&context, request, &reply).ok());
ASSERT_EQ(2, reply.raylet_alive_size());
ASSERT_FALSE(reply.raylet_alive().at(0));
ASSERT_FALSE(reply.raylet_alive().at(1));
}

ASSERT_TRUE(RegisterNode(*node_info1));
{
grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() + 1s);
rpc::CheckAliveReply reply;
ASSERT_TRUE(stub->CheckAlive(&context, request, &reply).ok());
ASSERT_EQ(2, reply.raylet_alive_size());
ASSERT_TRUE(reply.raylet_alive().at(0));
ASSERT_FALSE(reply.raylet_alive().at(1));
}
}

TEST_P(GcsClientTest, TestJobInfo) {
// Create job table data.
JobID add_job_id = JobID::FromInt(1);
Expand Down
37 changes: 23 additions & 14 deletions src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,22 @@ void GcsHeartbeatManager::Stop() {
}
}

void GcsHeartbeatManager::RemoveNode(const NodeID& node_id) {
io_service_.dispatch([this, node_id] {
node_map_.left.erase(node_id);
heartbeats_.erase(node_id);
}, "GcsHeartbeatManager::RemoveNode");
}

void GcsHeartbeatManager::AddNode(const rpc::GcsNodeInfo &node_info) {
auto node_id = NodeID::FromBinary(node_info.node_id());
auto node_addr = node_info.node_manager_address() + ":" +
std::to_string(node_info.node_manager_port());
node_map_.insert(NodeIDAddrBiMap::value_type(node_info.node_id(), node_addr));
io_service_.post(
[this, node_id] { heartbeats_.emplace(node_id, num_heartbeats_timeout_); },
[this, node_id, node_addr] {
node_map_.insert(NodeIDAddrBiMap::value_type(node_id, node_addr));
heartbeats_.emplace(node_id, num_heartbeats_timeout_);
},
"GcsHeartbeatManager.AddNode");
}

Expand All @@ -99,24 +108,24 @@ void GcsHeartbeatManager::HandleCheckAlive(const rpc::CheckAliveRequest &request
rpc::SendReplyCallback send_reply_callback) {
reply->set_ray_version(kRayVersion);
for (const auto &addr : request.raylet_address()) {
*reply->mutable_raylet_alive()->Add() = node_map_.right.count(addr) != 0;
reply->mutable_raylet_alive()->Add(node_map_.right.count(addr) != 0);
}

GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}

void GcsHeartbeatManager::DetectDeadNodes() {
for (auto it = heartbeats_.begin(); it != heartbeats_.end();) {
auto current = it++;
current->second = current->second - 1;
if (current->second == 0) {
auto node_id = current->first;
RAY_LOG(WARNING) << "Node timed out: " << node_id;
heartbeats_.erase(current);
node_map_.left.erase(node_id.Binary());
if (on_node_death_callback_) {
on_node_death_callback_(node_id);
}
std::vector<NodeID> dead_nodes;
for (auto& current : heartbeats_) {
current.second = current.second - 1;
if (current.second == 0) {
dead_nodes.push_back(current.first);
}
}
for(const auto& node_id : dead_nodes) {
RemoveNode(node_id);
if (on_node_death_callback_) {
on_node_death_callback_(node_id);
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/ray/gcs/gcs_server/gcs_heartbeat_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ class GcsHeartbeatManager : public rpc::HeartbeatInfoHandler {
/// \param node_info The node to be registered.
void AddNode(const rpc::GcsNodeInfo &node_info);

/// Remove a node from this detector.
///
/// \param node_id The node to be removed.
void RemoveNode(const NodeID& node_id);

protected:
/// Check that if any raylet is inactive due to no heartbeat for a period of time.
/// If found any, mark it as dead.
Expand All @@ -93,7 +98,7 @@ class GcsHeartbeatManager : public rpc::HeartbeatInfoHandler {
/// Is the detect started.
bool is_started_ = false;
/// A map of NodeId <-> ip:port of raylet
using NodeIDAddrBiMap = boost::bimap<boost::bimaps::unordered_set_of<std::string>,
using NodeIDAddrBiMap = boost::bimap<boost::bimaps::unordered_set_of<NodeID, std::hash<NodeID>>,
boost::bimaps::unordered_set_of<std::string>>;
NodeIDAddrBiMap node_map_;
};
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,8 @@ void GcsServer::InstallEventListeners() {
gcs_resource_manager_->OnNodeDead(node_id);
gcs_placement_group_manager_->OnNodeDead(node_id);
gcs_actor_manager_->OnNodeDead(node_id, node_ip_address);
raylet_client_pool_->Disconnect(NodeID::FromBinary(node->node_id()));
raylet_client_pool_->Disconnect(node_id);
gcs_heartbeat_manager_->RemoveNode(node_id);
if (RayConfig::instance().use_ray_syncer()) {
ray_syncer_->Disconnect(node_id.Binary());
} else {
Expand Down