From 7bd88865aef4008407ce8ff60edd66e63405979e Mon Sep 17 00:00:00 2001 From: Ricky Xu Date: Thu, 1 Jun 2023 06:26:35 +0800 Subject: [PATCH] [core][autoscaler] GCS Autoscaler V2: Node states and resource requests [2/x] (#35596) Why are these changes needed? This PR adds part of the logic for HandleGetClusterResourceState in a newly introduced GCS manager GcsAutoscalerStateManager It populates the node_states field of the response: a collection of node's available/total resources + node status. It populates the pending_resource_requests field of the response: information of resource requests by count (it's shapes + number of pending requests) It's part of the issue: #35595 --- BUILD.bazel | 17 ++ .../gcs_autoscaler_state_manager.cc | 135 ++++++++++ .../gcs_server/gcs_autoscaler_state_manager.h | 109 +++++++++ src/ray/gcs/gcs_server/gcs_node_manager.h | 8 + .../gcs/gcs_server/gcs_resource_manager.cc | 18 +- src/ray/gcs/gcs_server/gcs_resource_manager.h | 6 +- src/ray/gcs/gcs_server/gcs_server.cc | 16 ++ src/ray/gcs/gcs_server/gcs_server.h | 8 + .../test/gcs_autoscaler_state_manager_test.cc | 230 ++++++++++++++++++ src/ray/gcs/test/gcs_test_util.h | 50 ++++ .../protobuf/experimental/autoscaler.proto | 2 +- 11 files changed, 596 insertions(+), 3 deletions(-) create mode 100644 src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc create mode 100644 src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h create mode 100644 src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc diff --git a/BUILD.bazel b/BUILD.bazel index 1ef6e23822e7f..e60f49df74dcf 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -2176,6 +2176,23 @@ cc_test( ], ) +cc_test( + name = "gcs_autoscaler_state_manager_test", + size = "small", + srcs = [ + "src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc", + ], + copts = COPTS, + tags = ["team:core"], + deps = [ + ":gcs_server_lib", + ":gcs_server_test_util", + ":gcs_test_util_lib", + ":ray_mock", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "gcs_resource_manager_test", size = "small", diff --git a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc new file mode 100644 index 0000000000000..f6e812a83a167 --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc @@ -0,0 +1,135 @@ +// Copyright 2023 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/gcs/gcs_server/gcs_autoscaler_state_manager.h" + +#include "ray/gcs/gcs_server/gcs_node_manager.h" +#include "ray/gcs/gcs_server/gcs_resource_manager.h" +#include "ray/raylet/scheduling/cluster_resource_manager.h" + +namespace ray { +namespace gcs { + +GcsAutoscalerStateManager::GcsAutoscalerStateManager( + const ClusterResourceManager &cluster_resource_manager, + const GcsResourceManager &gcs_resource_manager, + const GcsNodeManager &gcs_node_manager) + : cluster_resource_manager_(cluster_resource_manager), + gcs_node_manager_(gcs_node_manager), + gcs_resource_manager_(gcs_resource_manager), + last_cluster_resource_state_version_(0), + last_seen_autoscaler_state_version_(0) {} + +void GcsAutoscalerStateManager::HandleGetClusterResourceState( + rpc::autoscaler::GetClusterResourceStateRequest request, + rpc::autoscaler::GetClusterResourceStateReply *reply, + rpc::SendReplyCallback send_reply_callback) { + RAY_CHECK(request.last_seen_cluster_resource_state_version() <= + last_cluster_resource_state_version_); + reply->set_last_seen_autoscaler_state_version(last_seen_autoscaler_state_version_); + reply->set_cluster_resource_state_version( + IncrementAndGetNextClusterResourceStateVersion()); + + GetNodeStates(reply); + GetPendingResourceRequests(reply); + // GetPendingGangResourceRequests(reply); + // GetClusterResourceConstraints(reply); + + // We are not using GCS_RPC_SEND_REPLY like other GCS managers to avoid the client + // having to parse the gcs status code embedded. + send_reply_callback(ray::Status::OK(), nullptr, nullptr); +} + +void GcsAutoscalerStateManager::HandleReportAutoscalingState( + rpc::autoscaler::ReportAutoscalingStateRequest request, + rpc::autoscaler::ReportAutoscalingStateReply *reply, + rpc::SendReplyCallback send_reply_callback) { + // Unimplemented. + throw std::runtime_error("Unimplemented"); +} + +void GcsAutoscalerStateManager::GetPendingGangResourceRequests( + rpc::autoscaler::GetClusterResourceStateReply *reply) { + throw std::runtime_error("Unimplemented"); +} + +void GcsAutoscalerStateManager::GetClusterResourceConstraints( + rpc::autoscaler::GetClusterResourceStateReply *reply) { + throw std::runtime_error("Unimplemented"); +} + +void GcsAutoscalerStateManager::GetPendingResourceRequests( + rpc::autoscaler::GetClusterResourceStateReply *reply) { + // TODO(rickyx): We could actually get the load of each node from the cluster resource + // manager. Need refactoring on the GcsResourceManager. + // We could then do cluster_resource_manager_GetResourceLoad(), and decouple it + // from gcs_resource_manager_. + auto aggregate_load = gcs_resource_manager_.GetAggregatedResourceLoad(); + for (const auto &[shape, demand] : aggregate_load) { + auto num_pending = demand.num_infeasible_requests_queued() + demand.backlog_size() + + demand.num_ready_requests_queued(); + if (num_pending > 0) { + auto pending_req = reply->add_pending_resource_requests(); + pending_req->set_count(num_pending); + auto req = pending_req->mutable_request(); + req->mutable_resources_bundle()->insert(shape.begin(), shape.end()); + } + } +} + +void GcsAutoscalerStateManager::GetNodeStates( + rpc::autoscaler::GetClusterResourceStateReply *reply) { + auto populate_node_state = [&](const rpc::GcsNodeInfo &gcs_node_info, + rpc::autoscaler::NodeState::NodeStatus status) { + auto node_state_proto = reply->add_node_states(); + node_state_proto->set_node_id(gcs_node_info.node_id()); + node_state_proto->set_instance_id(gcs_node_info.instance_id()); + node_state_proto->set_node_state_version(last_cluster_resource_state_version_); + node_state_proto->set_status(status); + + if (status == rpc::autoscaler::NodeState::ALIVE) { + auto const &node_resource_data = cluster_resource_manager_.GetNodeResources( + scheduling::NodeID(node_state_proto->node_id())); + + // Copy resource available + const auto &available = node_resource_data.available.ToResourceMap(); + node_state_proto->mutable_available_resources()->insert(available.begin(), + available.end()); + + // Copy total resources + const auto &total = node_resource_data.total.ToResourceMap(); + node_state_proto->mutable_total_resources()->insert(total.begin(), total.end()); + + // TODO(rickyx): support dynamic labels + } + }; + + const auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes(); + std::for_each(alive_nodes.begin(), alive_nodes.end(), [&](const auto &gcs_node_info) { + populate_node_state(*gcs_node_info.second, rpc::autoscaler::NodeState::ALIVE); + }); + + // This might be large if there are many nodes for a long-running cluster. + // However, since we don't report resources for a dead node, the data size being + // reported by dead node should be small. + // TODO(rickyx): We will need to GC the head nodes in the future. + // https://github.com/ray-project/ray/issues/35874 + const auto &dead_nodes = gcs_node_manager_.GetAllDeadNodes(); + std::for_each(dead_nodes.begin(), dead_nodes.end(), [&](const auto &gcs_node_info) { + populate_node_state(*gcs_node_info.second, rpc::autoscaler::NodeState::DEAD); + }); +} + +} // namespace gcs +} // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h new file mode 100644 index 0000000000000..1c925211d7eaf --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h @@ -0,0 +1,109 @@ +// Copyright 2023 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "ray/rpc/gcs_server/gcs_rpc_server.h" +#include "src/ray/protobuf/gcs.pb.h" + +namespace ray { +class ClusterResourceManager; +namespace gcs { + +class GcsResourceManager; +class GcsNodeManager; + +class GcsAutoscalerStateManager : public rpc::AutoscalerStateHandler { + public: + GcsAutoscalerStateManager(const ClusterResourceManager &cluster_resource_manager, + const GcsResourceManager &gcs_resource_manager, + const GcsNodeManager &gcs_node_manager); + + void HandleGetClusterResourceState( + rpc::autoscaler::GetClusterResourceStateRequest request, + rpc::autoscaler::GetClusterResourceStateReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + + void HandleReportAutoscalingState( + rpc::autoscaler::ReportAutoscalingStateRequest request, + rpc::autoscaler::ReportAutoscalingStateReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + + void RecordMetrics() const { throw std::runtime_error("Unimplemented"); } + + std::string DebugString() const { throw std::runtime_error("Unimplemented"); } + + private: + /// \brief Increment and get the next cluster resource state version. + /// \return The incremented cluster resource state version. + int64_t IncrementAndGetNextClusterResourceStateVersion() { + return ++last_cluster_resource_state_version_; + } + + /// \brief Get the current cluster resource state. + /// \param reply The reply to be filled. + /// + /// See rpc::autoscaler::GetClusterResourceStateReply::node_states for more details. + void GetNodeStates(rpc::autoscaler::GetClusterResourceStateReply *reply); + + /// \brief Get the resource requests state. + /// \param reply The reply to be filled. + /// + /// See rpc::autoscaler::GetClusterResourceStateReply::pending_resource_requests for + /// more details. + void GetPendingResourceRequests(rpc::autoscaler::GetClusterResourceStateReply *reply); + + /// \brief Get the gang resource requests (e.g. from placement group) state. + /// \param reply The reply to be filled. + /// + /// See rpc::autoscaler::GetClusterResourceStateReply::pending_gang_resource_requests + /// for more + void GetPendingGangResourceRequests( + rpc::autoscaler::GetClusterResourceStateReply *reply); + + /// \brief Get the cluster resource constraints state. + /// \param reply The reply to be filled. + /// + /// See rpc::autoscaler::GetClusterResourceStateReply::cluster_resource_constraints for + /// more details. This is requested through autoscaler SDK for request_resources(). + void GetClusterResourceConstraints( + rpc::autoscaler::GetClusterResourceStateReply *reply); + + /// Cluster resources manager that provides cluster resources information. + const ClusterResourceManager &cluster_resource_manager_; + + /// Gcs node manager that provides node status information. + const GcsNodeManager &gcs_node_manager_; + + /// GCS resource manager that provides resource demand/load information. + const GcsResourceManager &gcs_resource_manager_; + + // The default value of the last seen version for the request is 0, which indicates + // no version has been reported. So the first reported version should be 1. + // We currently provide two guarantees for this version: + // 1. It will increase monotonically. + // 2. If a state is updated, the version will be higher. + // Ideally we would want to have a guarantee where consecutive versions will always + // be different, but it's currently hard to do. + // TODO(rickyx): https://github.com/ray-project/ray/issues/35873 + // We will need to make the version correct when GCS fails over. + int64_t last_cluster_resource_state_version_ = 0; + + /// The last seen autoscaler state version. Use 0 as the default value to indicate + /// no previous autoscaler state has been seen. + int64_t last_seen_autoscaler_state_version_ = 0; +}; + +} // namespace gcs +} // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index d76e94fbd8eae..6767f1bd6ef33 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -36,6 +36,7 @@ namespace ray { namespace gcs { +class GcsAutoscalerStateManagerTest; /// GcsNodeManager is responsible for managing and monitoring nodes as well as handing /// node and resource related rpc requests. /// This class is not thread-safe. @@ -104,6 +105,12 @@ class GcsNodeManager : public rpc::NodeInfoHandler { return alive_nodes_; } + /// Get all dead nodes. + const absl::flat_hash_map> &GetAllDeadNodes() + const { + return dead_nodes_; + } + /// Add listener to monitor the remove action of nodes. /// /// \param listener The handler which process the remove of nodes. @@ -178,6 +185,7 @@ class GcsNodeManager : public rpc::NodeInfoHandler { NodeIDAddrBiMap node_map_; friend GcsMonitorServerTest; + friend GcsAutoscalerStateManagerTest; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index 07e969a990695..96b458d164eeb 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -157,10 +157,26 @@ void GcsResourceManager::HandleReportResourceUsage( ++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST]; } +// TODO(rickyx): We could update the cluster resource manager when we update the load +// so that we will no longer need node_resource_usages_. +std::unordered_map, rpc::ResourceDemand> +GcsResourceManager::GetAggregatedResourceLoad() const { + std::unordered_map, rpc::ResourceDemand> + aggregate_load; + if (node_resource_usages_.empty()) { + return aggregate_load; + } + for (const auto &usage : node_resource_usages_) { + // Aggregate the load reported by each raylet. + FillAggregateLoad(usage.second, &aggregate_load); + } + return aggregate_load; +} + void GcsResourceManager::FillAggregateLoad( const rpc::ResourcesData &resources_data, std::unordered_map, rpc::ResourceDemand> - *aggregate_load) { + *aggregate_load) const { auto load = resources_data.resource_load_by_shape(); for (const auto &demand : load.resource_demands()) { auto &aggregate_demand = (*aggregate_load)[demand.shape()]; diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.h b/src/ray/gcs/gcs_server/gcs_resource_manager.h index c145d0ed4e544..fef829923bb5a 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.h +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.h @@ -143,6 +143,10 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler, /// \returns The mapping from node id to latest resource report. const absl::flat_hash_map &NodeResourceReportView() const; + /// Get aggregated resource load of all nodes. + std::unordered_map, rpc::ResourceDemand> + GetAggregatedResourceLoad() const; + private: /// Aggregate nodes' pending task info. /// @@ -150,7 +154,7 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler, /// \param aggregate_load[out] The aggregate pending task info (across the cluster). void FillAggregateLoad(const rpc::ResourcesData &resources_data, std::unordered_map, - rpc::ResourceDemand> *aggregate_load); + rpc::ResourceDemand> *aggregate_load) const; /// io context. This is to ensure thread safety. Ideally, all public /// funciton needs to post job to this io_context. diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 1c5b3c40df8b4..f965a1d2c5cdb 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -22,6 +22,7 @@ #include "ray/common/ray_config.h" #include "ray/gcs/gcs_client/gcs_client.h" #include "ray/gcs/gcs_server/gcs_actor_manager.h" +#include "ray/gcs/gcs_server/gcs_autoscaler_state_manager.h" #include "ray/gcs/gcs_server/gcs_job_manager.h" #include "ray/gcs/gcs_server/gcs_node_manager.h" #include "ray/gcs/gcs_server/gcs_placement_group_manager.h" @@ -173,6 +174,9 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) { // Install event listeners. InstallEventListeners(); + // Init autoscaling manager + InitGcsAutoscalerStateManager(); + // Start RPC server when all tables have finished loading initial // data. rpc_server_.Run(); @@ -575,6 +579,18 @@ void GcsServer::InitGcsWorkerManager() { rpc_server_.RegisterService(*worker_info_service_); } +void GcsServer::InitGcsAutoscalerStateManager() { + gcs_autoscaler_state_manager_ = std::make_unique( + cluster_resource_scheduler_->GetClusterResourceManager(), + *gcs_resource_manager_, + *gcs_node_manager_); + + autoscaler_state_service_.reset( + new rpc::AutoscalerStateGrpcService(main_service_, *gcs_autoscaler_state_manager_)); + + rpc_server_.RegisterService(*autoscaler_state_service_); +} + void GcsServer::InitGcsTaskManager() { gcs_task_manager_ = std::make_unique(); // Register service. diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index dadc1f81e6e73..ea5b85f8f6c10 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -67,6 +67,7 @@ class GcsWorkerManager; class GcsPlacementGroupScheduler; class GcsPlacementGroupManager; class GcsTaskManager; +class GcsAutoscalerStateManager; /// The GcsServer will take over all requests from GcsClient and transparent /// transmit the command to the backend reliable storage for the time being. @@ -134,6 +135,9 @@ class GcsServer { /// Initialize gcs task manager. void InitGcsTaskManager(); + /// Initialize gcs autoscaling manager. + void InitGcsAutoscalerStateManager(); + /// Initialize usage stats client. void InitUsageStatsClient(); @@ -196,6 +200,8 @@ class GcsServer { std::shared_ptr cluster_resource_scheduler_; /// The cluster task manager. std::shared_ptr cluster_task_manager_; + /// The autoscaler state manager. + std::unique_ptr gcs_autoscaler_state_manager_; /// The gcs node manager. std::shared_ptr gcs_node_manager_; /// The health check manager. @@ -259,6 +265,8 @@ class GcsServer { std::unique_ptr gcs_task_manager_; /// Independent task info service from the main grpc service. std::unique_ptr task_info_service_; + /// Gcs Autoscaler state manager. + std::unique_ptr autoscaler_state_service_; /// Backend client. std::shared_ptr redis_client_; /// A publisher for publishing gcs messages. diff --git a/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc new file mode 100644 index 0000000000000..3cbc1693bac4f --- /dev/null +++ b/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc @@ -0,0 +1,230 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// clang-format off +#include + +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include "ray/common/asio/instrumented_io_context.h" +#include "ray/gcs/test/gcs_test_util.h" +#include "src/ray/gcs/gcs_server/gcs_node_manager.h" +#include "ray/raylet/scheduling/cluster_resource_manager.h" +#include "mock/ray/gcs/gcs_server/gcs_node_manager.h" + +#include "ray/gcs/gcs_server/gcs_autoscaler_state_manager.h" +// clang-format on + +namespace ray { + +namespace gcs { +using ::testing::_; + +// Test suite for AutoscalerState related functionality. +class GcsAutoscalerStateManagerTest : public ::testing::Test { + public: + GcsAutoscalerStateManagerTest() : cluster_resource_manager_(io_service_) { + gcs_resource_manager_ = std::make_shared( + io_service_, cluster_resource_manager_, NodeID::FromRandom()); + gcs_node_manager_ = std::make_shared(); + + gcs_autoscaler_state_manager_.reset(new GcsAutoscalerStateManager( + cluster_resource_manager_, *gcs_resource_manager_, *gcs_node_manager_)); + } + + protected: + instrumented_io_context io_service_; + ClusterResourceManager cluster_resource_manager_; + std::shared_ptr gcs_resource_manager_; + std::shared_ptr gcs_node_manager_; + std::unique_ptr gcs_autoscaler_state_manager_; + + public: + void AddNode(const std::shared_ptr &node) { + gcs_node_manager_->alive_nodes_[NodeID::FromBinary(node->node_id())] = node; + gcs_resource_manager_->OnNodeAdd(*node); + } + + void RemoveNode(const NodeID &node_id) { + gcs_node_manager_->alive_nodes_.erase(node_id); + gcs_resource_manager_->OnNodeDead(node_id); + } + + void CheckNodeResources( + const rpc::autoscaler::NodeState &node_state, + const absl::flat_hash_map &total_resources, + const absl::flat_hash_map &available_resources) { + ASSERT_EQ(node_state.total_resources_size(), total_resources.size()); + ASSERT_EQ(node_state.available_resources_size(), available_resources.size()); + for (const auto &resource : total_resources) { + ASSERT_EQ(node_state.total_resources().at(resource.first), resource.second); + } + for (const auto &resource : available_resources) { + ASSERT_EQ(node_state.available_resources().at(resource.first), resource.second); + } + } + + rpc::autoscaler::GetClusterResourceStateReply GetClusterResourceStateSync() { + rpc::autoscaler::GetClusterResourceStateRequest request; + rpc::autoscaler::GetClusterResourceStateReply reply; + auto send_reply_callback = + [](ray::Status status, std::function f1, std::function f2) {}; + gcs_autoscaler_state_manager_->HandleGetClusterResourceState( + request, &reply, send_reply_callback); + return reply; + } + + void UpdateFromResourceReportSync( + const NodeID &node_id, + const absl::flat_hash_map &available_resources, + const absl::flat_hash_map &total_resources, + bool available_resources_changed) { + rpc::ResourcesData resources_data; + Mocker::FillResourcesData(resources_data, + node_id, + available_resources, + total_resources, + available_resources_changed); + gcs_resource_manager_->UpdateFromResourceReport(resources_data); + } + + void UpdateResourceLoads(const std::string &node_id, + std::vector demands, + bool resource_load_changed = true) { + rpc::ResourcesData data; + Mocker::FillResourcesData(data, node_id, demands, resource_load_changed); + gcs_resource_manager_->UpdateResourceLoads(data); + } + + std::string ShapeToString(const rpc::autoscaler::ResourceRequest &request) { + // Ordered map with bundle name as the key + std::map m; + for (const auto &resource : request.resources_bundle()) { + m[resource.first] = resource.second; + } + std::stringstream ss; + for (const auto &resource : m) { + ss << resource.first << ":" << resource.second << ","; + } + auto s = ss.str(); + // Remove last "," + return s.empty() ? "" : s.substr(0, s.size() - 1); + } + + void CheckPendingRequests( + const rpc::autoscaler::GetClusterResourceStateReply &reply, + const std::unordered_map &expect_requests_by_count) { + auto pending_reqs = reply.pending_resource_requests(); + ASSERT_EQ(pending_reqs.size(), expect_requests_by_count.size()); + std::unordered_map actual_requests_by_count; + for (int i = 0; i < pending_reqs.size(); i++) { + auto req_by_count = pending_reqs[i]; + auto req_str = ShapeToString(req_by_count.request()); + actual_requests_by_count[req_str] = req_by_count.count(); + } + + ASSERT_EQ(actual_requests_by_count.size(), expect_requests_by_count.size()); + for (const auto &req : expect_requests_by_count) { + ASSERT_EQ(actual_requests_by_count[req.first], req.second) + << "Request: " << req.first; + } + } +}; + +TEST_F(GcsAutoscalerStateManagerTest, TestNodeAddUpdateRemove) { + auto node = Mocker::GenNodeInfo(); + + // Adding a node. + { + node->mutable_resources_total()->insert({"CPU", 2}); + node->mutable_resources_total()->insert({"GPU", 1}); + node->set_instance_id("instance_1"); + AddNode(node); + + auto reply = GetClusterResourceStateSync(); + ASSERT_EQ(reply.node_states_size(), 1); + CheckNodeResources(reply.node_states(0), + /* available */ {{"CPU", 2}, {"GPU", 1}}, + /* total */ {{"CPU", 2}, {"GPU", 1}}); + } + + // Update available resources. + { + UpdateFromResourceReportSync(NodeID::FromBinary(node->node_id()), + {/* available */ {"CPU", 1.75}}, + /* total*/ {{"CPU", 2}, {"GPU", 1}}, + /* available_changed*/ true); + + auto reply = GetClusterResourceStateSync(); + ASSERT_EQ(reply.node_states_size(), 1); + CheckNodeResources(reply.node_states(0), + /*total*/ {{"CPU", 2}, {"GPU", 1}}, + /*available*/ {{"CPU", 1.75}}); + } + + // Remove a node - test node states correct. + { + RemoveNode(NodeID::FromBinary(node->node_id())); + gcs_resource_manager_->OnNodeDead(NodeID::FromBinary(node->node_id())); + auto reply = GetClusterResourceStateSync(); + ASSERT_EQ(reply.node_states_size(), 0); + } +} + +TEST_F(GcsAutoscalerStateManagerTest, TestBasicResourceRequests) { + auto node = Mocker::GenNodeInfo(); + node->mutable_resources_total()->insert({"CPU", 2}); + node->mutable_resources_total()->insert({"GPU", 1}); + node->set_instance_id("instance_1"); + // Adding a node. + AddNode(node); + + // Get empty requests + { + auto reply = GetClusterResourceStateSync(); + ASSERT_EQ(reply.pending_resource_requests_size(), 0); + } + + // Update resource usages. + { + UpdateResourceLoads(node->node_id(), + {Mocker::GenResourceDemand({{"CPU", 1}}, + /* nun_ready_queued */ 1, + /* nun_infeasible */ 1, + /* num_backlog */ 0), + Mocker::GenResourceDemand({{"CPU", 4}, {"GPU", 2}}, + /* num_ready_queued */ 0, + /* num_infeasible */ 1, + /* num_backlog */ 1)}); + + auto reply = GetClusterResourceStateSync(); + // Expect each pending resources shape to be num_infeasible + num_backlog. + CheckPendingRequests(reply, {{"CPU:1", 1 + 1}, {"CPU:4,GPU:2", 1 + 1}}); + } + + // Remove node should clear it. + { + RemoveNode(NodeID::FromBinary(node->node_id())); + auto reply = GetClusterResourceStateSync(); + ASSERT_EQ(reply.pending_resource_requests_size(), 0); + } +} + +} // namespace gcs +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index 07acdc20237d5..c778d38824b99 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -201,6 +201,7 @@ struct Mocker { node->set_node_manager_port(port); node->set_node_manager_address(address); node->set_node_name(node_name); + node->set_instance_id("instance_x"); return node; } @@ -272,6 +273,55 @@ struct Mocker { return data; } + + static rpc::ResourceDemand GenResourceDemand( + const absl::flat_hash_map &resource_demands, + int64_t num_ready_queued, + int64_t num_infeasible, + int64_t num_backlog) { + rpc::ResourceDemand resource_demand; + for (const auto &resource : resource_demands) { + (*resource_demand.mutable_shape())[resource.first] = resource.second; + } + resource_demand.set_num_ready_requests_queued(num_ready_queued); + resource_demand.set_num_infeasible_requests_queued(num_infeasible); + resource_demand.set_backlog_size(num_backlog); + return resource_demand; + } + + static void FillResourcesData( + rpc::ResourcesData &resources_data, + const NodeID &node_id, + const absl::flat_hash_map &available_resources, + const absl::flat_hash_map &total_resources, + bool available_resources_changed) { + resources_data.set_node_id(node_id.Binary()); + for (const auto &resource : available_resources) { + (*resources_data.mutable_resources_available())[resource.first] = resource.second; + } + for (const auto &resource : total_resources) { + (*resources_data.mutable_resources_total())[resource.first] = resource.second; + } + resources_data.set_resources_available_changed(available_resources_changed); + } + + static void FillResourcesData(rpc::ResourcesData &data, + const std::string &node_id, + std::vector demands, + bool resource_load_changed = true) { + auto load_by_shape = data.mutable_resource_load_by_shape(); + auto agg_load = data.mutable_resource_load(); + for (const auto &demand : demands) { + load_by_shape->add_resource_demands()->CopyFrom(demand); + for (const auto &resource : demand.shape()) { + (*agg_load)[resource.first] += + (resource.second * (demand.num_ready_requests_queued() + + demand.num_infeasible_requests_queued())); + } + } + data.set_resource_load_changed(resource_load_changed); + data.set_node_id(node_id); + } }; } // namespace ray diff --git a/src/ray/protobuf/experimental/autoscaler.proto b/src/ray/protobuf/experimental/autoscaler.proto index 2e0406c8df142..967561a7a1ab1 100644 --- a/src/ray/protobuf/experimental/autoscaler.proto +++ b/src/ray/protobuf/experimental/autoscaler.proto @@ -165,7 +165,7 @@ message ReportAutoscalingStateRequest { repeated Instance instances = 3; // infeasible resource requests. repeated ResourceRequest infeasible_resource_requests = 4; - repeated ClusterResourceConstraint infeasible_gange_resource_requests = 5; + repeated GangResourceRequest infeasible_gange_resource_requests = 5; repeated ClusterResourceConstraint infeasible_cluster_resource_constraints = 6; }