Skip to content

Commit

Permalink
GCS adapts to node table pub sub (ray-project#8209)
Browse files Browse the repository at this point in the history
  • Loading branch information
ffbin authored May 5, 2020
1 parent ee0eb44 commit 97430b2
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 46 deletions.
1 change: 0 additions & 1 deletion python/ray/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def f(self):
# Check that the rest of the processes are workers, 1 for each CPU.
assert len(reply.workers_stats) == num_cpus + 1
views = [view.view_name for view in reply.view_data]
assert "redis_latency" in views
assert "local_available_resource" in views
# Check that all processes are Python.
pids = [worker.pid for worker in reply.workers_stats]
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,8 @@ class NodeInfoAccessor {
/// Subscribe to node addition and removal events from GCS and cache those information.
///
/// \param subscribe Callback that will be called if a node is
/// added or a node is removed.
/// added or a node is removed. The callback needs to be idempotent because it will also
/// be called for existing nodes.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribeToNodeChange(
Expand Down
85 changes: 72 additions & 13 deletions src/ray/gcs/gcs_client/service_based_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,33 +399,53 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToNodeChange(
const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing node change.";
RAY_CHECK(subscribe != nullptr);
ClientTable &client_table = client_impl_->GetRedisGcsClient().client_table();
auto status = client_table.SubscribeToNodeChange(subscribe, done);
RAY_CHECK(node_change_callback_ == nullptr);
node_change_callback_ = subscribe;

auto on_subscribe = [this](const std::string &id, const std::string &data) {
GcsNodeInfo node_info;
node_info.ParseFromString(data);
HandleNotification(node_info);
};

auto on_done = [this, subscribe, done](const Status &status) {
// Get nodes from GCS Service.
auto callback = [this, subscribe, done](
const Status &status,
const std::vector<GcsNodeInfo> &node_info_list) {
for (auto &node_info : node_info_list) {
HandleNotification(node_info);
}
if (done) {
done(status);
}
};
RAY_CHECK_OK(AsyncGetAll(callback));
};

auto status =
client_impl_->GetGcsPubSub().SubscribeAll(NODE_CHANNEL, on_subscribe, on_done);
RAY_LOG(DEBUG) << "Finished subscribing node change.";
return status;
}

boost::optional<GcsNodeInfo> ServiceBasedNodeInfoAccessor::Get(
const ClientID &node_id) const {
GcsNodeInfo node_info;
ClientTable &client_table = client_impl_->GetRedisGcsClient().client_table();
bool found = client_table.GetClient(node_id, &node_info);
boost::optional<GcsNodeInfo> optional_node;
if (found) {
optional_node = std::move(node_info);
RAY_CHECK(!node_id.IsNil());
auto entry = node_cache_.find(node_id);
if (entry != node_cache_.end()) {
return entry->second;
}
return optional_node;
return boost::none;
}

const std::unordered_map<ClientID, GcsNodeInfo> &ServiceBasedNodeInfoAccessor::GetAll()
const {
ClientTable &client_table = client_impl_->GetRedisGcsClient().client_table();
return client_table.GetAllClients();
return node_cache_;
}

bool ServiceBasedNodeInfoAccessor::IsRemoved(const ClientID &node_id) const {
ClientTable &client_table = client_impl_->GetRedisGcsClient().client_table();
return client_table.IsRemoved(node_id);
return removed_nodes_.count(node_id) == 1;
}

Status ServiceBasedNodeInfoAccessor::AsyncGetResources(
Expand Down Expand Up @@ -567,6 +587,45 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeBatchHeartbeat(
return status;
}

void ServiceBasedNodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_info) {
ClientID node_id = ClientID::FromBinary(node_info.node_id());
bool is_alive = (node_info.state() == GcsNodeInfo::ALIVE);
auto entry = node_cache_.find(node_id);
bool is_notif_new;
if (entry == node_cache_.end()) {
// If the entry is not in the cache, then the notification is new.
is_notif_new = true;
} else {
// If the entry is in the cache, then the notification is new if the node
// was alive and is now dead or resources have been updated.
bool was_alive = (entry->second.state() == GcsNodeInfo::ALIVE);
is_notif_new = was_alive && !is_alive;
// Once a node with a given ID has been removed, it should never be added
// again. If the entry was in the cache and the node was deleted, check
// that this new notification is not an insertion.
if (!was_alive) {
RAY_CHECK(!is_alive)
<< "Notification for addition of a node that was already removed:" << node_id;
}
}

// Add the notification to our cache.
RAY_LOG(INFO) << "Received notification for node id = " << node_id
<< ", IsAlive = " << is_alive;
node_cache_[node_id] = node_info;

// If the notification is new, call registered callback.
if (is_notif_new) {
if (is_alive) {
RAY_CHECK(removed_nodes_.find(node_id) == removed_nodes_.end());
} else {
removed_nodes_.insert(node_id);
}
GcsNodeInfo &cache_data = node_cache_[node_id];
node_change_callback_(node_id, cache_data);
}
}

ServiceBasedTaskInfoAccessor::ServiceBasedTaskInfoAccessor(
ServiceBasedGcsClient *client_impl)
: client_impl_(client_impl) {}
Expand Down
13 changes: 13 additions & 0 deletions src/ray/gcs/gcs_client/service_based_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,13 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor {
const StatusCallback &done) override;

private:
void HandleNotification(const GcsNodeInfo &node_info);

ServiceBasedGcsClient *client_impl_;

using NodeChangeCallback =
std::function<void(const ClientID &id, const GcsNodeInfo &node_info)>;

typedef SubscriptionExecutor<ClientID, ResourceChangeNotification, DynamicResourceTable>
DynamicResourceSubscriptionExecutor;
DynamicResourceSubscriptionExecutor resource_sub_executor_;
Expand All @@ -188,6 +193,14 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor {
ClientID local_node_id_;

Sequencer<ClientID> sequencer_;

/// The callback to call when a new node is added or a node is removed.
NodeChangeCallback node_change_callback_{nullptr};

/// A cache for information about all nodes.
std::unordered_map<ClientID, GcsNodeInfo> node_cache_;
/// The set of removed nodes.
std::unordered_set<ClientID> removed_nodes_;
};

/// \class ServiceBasedTaskInfoAccessor
Expand Down
33 changes: 23 additions & 10 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,25 @@ void GcsNodeManager::NodeFailureDetector::ScheduleTick() {
//////////////////////////////////////////////////////////////////////////////////////////
GcsNodeManager::GcsNodeManager(boost::asio::io_service &io_service,
gcs::NodeInfoAccessor &node_info_accessor,
gcs::ErrorInfoAccessor &error_info_accessor)
gcs::ErrorInfoAccessor &error_info_accessor,
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub)
: node_info_accessor_(node_info_accessor),
error_info_accessor_(error_info_accessor),
node_failure_detector_(new NodeFailureDetector(
io_service, node_info_accessor, [this](const ClientID &node_id) {
io_service, node_info_accessor,
[this](const ClientID &node_id) {
if (auto node = RemoveNode(node_id, /* is_intended = */ false)) {
node->set_state(rpc::GcsNodeInfo::DEAD);
RAY_CHECK(dead_nodes_.emplace(node_id, node).second);
RAY_CHECK_OK(node_info_accessor_.AsyncUnregister(node_id, nullptr));
auto on_done = [this, node_id, node](const Status &status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(),
node->SerializeAsString(), nullptr));
};
RAY_CHECK_OK(node_info_accessor_.AsyncUnregister(node_id, on_done));
// TODO(Shanly): Remove node resources from resource table.
}
})) {
})),
gcs_pub_sub_(gcs_pub_sub) {
// TODO(Shanly): Load node info list from storage synchronously.
// TODO(Shanly): Load cluster resources from storage synchronously.
}
Expand All @@ -122,9 +129,12 @@ void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request,
ClientID node_id = ClientID::FromBinary(request.node_info().node_id());
RAY_LOG(INFO) << "Registering node info, node id = " << node_id;
AddNode(std::make_shared<rpc::GcsNodeInfo>(request.node_info()));
auto on_done = [node_id, reply, send_reply_callback](Status status) {
auto on_done = [this, node_id, request, reply,
send_reply_callback](const Status &status) {
RAY_CHECK_OK(status);
RAY_LOG(INFO) << "Finished registering node info, node id = " << node_id;
RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(),
request.node_info().SerializeAsString(), nullptr));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
RAY_CHECK_OK(node_info_accessor_.AsyncRegister(request.node_info(), on_done));
Expand All @@ -135,14 +145,17 @@ void GcsNodeManager::HandleUnregisterNode(const rpc::UnregisterNodeRequest &requ
rpc::SendReplyCallback send_reply_callback) {
ClientID node_id = ClientID::FromBinary(request.node_id());
RAY_LOG(INFO) << "Unregistering node info, node id = " << node_id;
auto on_done = [node_id, request, reply, send_reply_callback](Status status) {
RAY_CHECK_OK(status);
RAY_LOG(INFO) << "Finished unregistering node info, node id = " << node_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
if (auto node = RemoveNode(node_id, /* is_intended = */ true)) {
node->set_state(rpc::GcsNodeInfo::DEAD);
RAY_CHECK(dead_nodes_.emplace(node_id, node).second);

auto on_done = [this, node_id, node, reply, send_reply_callback](Status status) {
RAY_CHECK_OK(status);
RAY_LOG(INFO) << "Finished unregistering node info, node id = " << node_id;
RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(),
node->SerializeAsString(), nullptr));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
RAY_CHECK_OK(node_info_accessor_.AsyncUnregister(node_id, on_done));
// TODO(Shanly): Remove node resources from resource table.
}
Expand Down
6 changes: 5 additions & 1 deletion src/ray/gcs/gcs_server/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <ray/rpc/gcs_server/gcs_rpc_server.h>
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h"

namespace ray {
namespace gcs {
Expand All @@ -39,7 +40,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
/// when detecting the death of nodes.
explicit GcsNodeManager(boost::asio::io_service &io_service,
gcs::NodeInfoAccessor &node_info_accessor,
gcs::ErrorInfoAccessor &error_info_accessor);
gcs::ErrorInfoAccessor &error_info_accessor,
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub);

/// Handle register rpc request come from raylet.
void HandleRegisterNode(const rpc::RegisterNodeRequest &request,
Expand Down Expand Up @@ -196,6 +198,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
/// Listeners which monitors the removal of nodes.
std::vector<std::function<void(std::shared_ptr<rpc::GcsNodeInfo>)>>
node_removed_listeners_;
/// A publisher for publishing gcs messages.
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
};

} // namespace gcs
Expand Down
11 changes: 6 additions & 5 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ void GcsServer::Start() {
// Init backend client.
InitBackendClient();

// Init gcs node_manager.
InitGcsNodeManager();

// Init gcs pub sub instance.
gcs_pub_sub_ = std::make_shared<gcs::GcsPubSub>(redis_gcs_client_->GetRedisClient());

// Init gcs node_manager.
InitGcsNodeManager();

// Init gcs detector.
gcs_redis_failure_detector_ = std::make_shared<GcsRedisFailureDetector>(
main_service_, redis_gcs_client_->primary_context(), [this]() { Stop(); });
Expand Down Expand Up @@ -128,8 +128,9 @@ void GcsServer::InitBackendClient() {

void GcsServer::InitGcsNodeManager() {
RAY_CHECK(redis_gcs_client_ != nullptr);
gcs_node_manager_ = std::make_shared<GcsNodeManager>(
main_service_, redis_gcs_client_->Nodes(), redis_gcs_client_->Errors());
gcs_node_manager_ =
std::make_shared<GcsNodeManager>(main_service_, redis_gcs_client_->Nodes(),
redis_gcs_client_->Errors(), gcs_pub_sub_);
}

void GcsServer::InitGcsActorManager() {
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class GcsActorSchedulerTest : public ::testing::Test {
raylet_client_ = std::make_shared<GcsServerMocker::MockRayletClient>();
worker_client_ = std::make_shared<GcsServerMocker::MockWorkerClient>();
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
io_service_, node_info_accessor_, error_info_accessor_);
io_service_, node_info_accessor_, error_info_accessor_, gcs_pub_sub_);
gcs_actor_scheduler_ = std::make_shared<GcsServerMocker::MockedGcsActorScheduler>(
io_service_, actor_info_accessor_, *gcs_node_manager_,
/*schedule_failure_handler=*/
Expand Down Expand Up @@ -55,6 +55,7 @@ class GcsActorSchedulerTest : public ::testing::Test {
std::shared_ptr<GcsServerMocker::MockedGcsActorScheduler> gcs_actor_scheduler_;
std::vector<std::shared_ptr<gcs::GcsActor>> success_actors_;
std::vector<std::shared_ptr<gcs::GcsActor>> failure_actors_;
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
};

TEST_F(GcsActorSchedulerTest, TestScheduleFailedWithZeroNode) {
Expand Down
11 changes: 8 additions & 3 deletions src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
#include "gtest/gtest.h"

namespace ray {
class GcsNodeManagerTest : public ::testing::Test {};
class GcsNodeManagerTest : public ::testing::Test {
protected:
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
};

TEST_F(GcsNodeManagerTest, TestManagement) {
boost::asio::io_service io_service;
auto node_info_accessor = GcsServerMocker::MockedNodeInfoAccessor();
auto error_info_accessor = GcsServerMocker::MockedErrorInfoAccessor();
gcs::GcsNodeManager node_manager(io_service, node_info_accessor, error_info_accessor);
gcs::GcsNodeManager node_manager(io_service, node_info_accessor, error_info_accessor,
gcs_pub_sub_);
// Test Add/Get/Remove functionality.
auto node = Mocker::GenNodeInfo();
auto node_id = ClientID::FromBinary(node->node_id());
Expand All @@ -41,7 +45,8 @@ TEST_F(GcsNodeManagerTest, TestListener) {
boost::asio::io_service io_service;
auto node_info_accessor = GcsServerMocker::MockedNodeInfoAccessor();
auto error_info_accessor = GcsServerMocker::MockedErrorInfoAccessor();
gcs::GcsNodeManager node_manager(io_service, node_info_accessor, error_info_accessor);
gcs::GcsNodeManager node_manager(io_service, node_info_accessor, error_info_accessor,
gcs_pub_sub_);
// Test AddNodeAddedListener.
int node_count = 1000;
std::vector<std::shared_ptr<rpc::GcsNodeInfo>> added_nodes;
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/pubsub/gcs_pub_sub.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace ray {
namespace gcs {

#define JOB_CHANNEL "JOB"
#define NODE_CHANNEL "NODE"
#define WORKER_FAILURE_CHANNEL "WORKER_FAILURE"
#define OBJECT_CHANNEL "OBJECT"
#define TASK_CHANNEL "TASK"
Expand Down
22 changes: 11 additions & 11 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -514,20 +514,19 @@ void NodeManager::NodeAdded(const GcsNodeInfo &node_info) {
const ClientID node_id = ClientID::FromBinary(node_info.node_id());

RAY_LOG(DEBUG) << "[NodeAdded] Received callback from client id " << node_id;
if (1 == cluster_resource_map_.count(node_id)) {
RAY_LOG(DEBUG) << "Received notification of a new node that already exists: "
<< node_id;
return;
}

if (node_id == self_node_id_) {
// We got a notification for ourselves, so we are connected to the GCS now.
// Save this NodeManager's resource information in the cluster resource map.
cluster_resource_map_[node_id] = initial_config_.resource_config;
return;
}

auto entry = remote_node_manager_clients_.find(node_id);
if (entry != remote_node_manager_clients_.end()) {
RAY_LOG(DEBUG) << "Received notification of a new client that already exists: "
<< node_id;
return;
}

// Initialize a rpc client to the new node manager.
std::unique_ptr<rpc::NodeManagerClient> client(
new rpc::NodeManagerClient(node_info.node_manager_address(),
Expand Down Expand Up @@ -565,15 +564,16 @@ void NodeManager::NodeRemoved(const GcsNodeInfo &node_info) {
// not be necessary.

// Remove the client from the resource map.
cluster_resource_map_.erase(node_id);
if (0 == cluster_resource_map_.erase(node_id)) {
RAY_LOG(DEBUG) << "Received NodeRemoved callback for an unknown node: " << node_id
<< ".";
return;
}

// Remove the node manager client.
const auto client_entry = remote_node_manager_clients_.find(node_id);
if (client_entry != remote_node_manager_clients_.end()) {
remote_node_manager_clients_.erase(client_entry);
} else {
RAY_LOG(WARNING) << "Received NodeRemoved callback for an unknown client " << node_id
<< ".";
}

// For any live actors that were on the dead node, broadcast a notification
Expand Down

0 comments on commit 97430b2

Please sign in to comment.