Skip to content

Commit

Permalink
[core][autoscaler] GCS Autoscaler V2: Node states and resource reques…
Browse files Browse the repository at this point in the history
…ts [2/x] (ray-project#35596)

Why are these changes needed?
This PR adds part of the logic for HandleGetClusterResourceState in a newly introduced GCS manager GcsAutoscalerStateManager

It populates the node_states field of the response: a collection of node's available/total resources + node status.
It populates the pending_resource_requests field of the response: information of resource requests by count (it's shapes + number of pending requests)
It's part of the issue: ray-project#35595
  • Loading branch information
rickyyx committed May 31, 2023
1 parent e16a4bd commit 7bd8886
Show file tree
Hide file tree
Showing 11 changed files with 596 additions and 3 deletions.
17 changes: 17 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2176,6 +2176,23 @@ cc_test(
],
)

cc_test(
name = "gcs_autoscaler_state_manager_test",
size = "small",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc",
],
copts = COPTS,
tags = ["team:core"],
deps = [
":gcs_server_lib",
":gcs_server_test_util",
":gcs_test_util_lib",
":ray_mock",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "gcs_resource_manager_test",
size = "small",
Expand Down
135 changes: 135 additions & 0 deletions src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2023 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/gcs/gcs_server/gcs_autoscaler_state_manager.h"

#include "ray/gcs/gcs_server/gcs_node_manager.h"
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
#include "ray/raylet/scheduling/cluster_resource_manager.h"

namespace ray {
namespace gcs {

GcsAutoscalerStateManager::GcsAutoscalerStateManager(
const ClusterResourceManager &cluster_resource_manager,
const GcsResourceManager &gcs_resource_manager,
const GcsNodeManager &gcs_node_manager)
: cluster_resource_manager_(cluster_resource_manager),
gcs_node_manager_(gcs_node_manager),
gcs_resource_manager_(gcs_resource_manager),
last_cluster_resource_state_version_(0),
last_seen_autoscaler_state_version_(0) {}

void GcsAutoscalerStateManager::HandleGetClusterResourceState(
rpc::autoscaler::GetClusterResourceStateRequest request,
rpc::autoscaler::GetClusterResourceStateReply *reply,
rpc::SendReplyCallback send_reply_callback) {
RAY_CHECK(request.last_seen_cluster_resource_state_version() <=
last_cluster_resource_state_version_);
reply->set_last_seen_autoscaler_state_version(last_seen_autoscaler_state_version_);
reply->set_cluster_resource_state_version(
IncrementAndGetNextClusterResourceStateVersion());

GetNodeStates(reply);
GetPendingResourceRequests(reply);
// GetPendingGangResourceRequests(reply);
// GetClusterResourceConstraints(reply);

// We are not using GCS_RPC_SEND_REPLY like other GCS managers to avoid the client
// having to parse the gcs status code embedded.
send_reply_callback(ray::Status::OK(), nullptr, nullptr);
}

void GcsAutoscalerStateManager::HandleReportAutoscalingState(
rpc::autoscaler::ReportAutoscalingStateRequest request,
rpc::autoscaler::ReportAutoscalingStateReply *reply,
rpc::SendReplyCallback send_reply_callback) {
// Unimplemented.
throw std::runtime_error("Unimplemented");
}

void GcsAutoscalerStateManager::GetPendingGangResourceRequests(
rpc::autoscaler::GetClusterResourceStateReply *reply) {
throw std::runtime_error("Unimplemented");
}

void GcsAutoscalerStateManager::GetClusterResourceConstraints(
rpc::autoscaler::GetClusterResourceStateReply *reply) {
throw std::runtime_error("Unimplemented");
}

void GcsAutoscalerStateManager::GetPendingResourceRequests(
rpc::autoscaler::GetClusterResourceStateReply *reply) {
// TODO(rickyx): We could actually get the load of each node from the cluster resource
// manager. Need refactoring on the GcsResourceManager.
// We could then do cluster_resource_manager_GetResourceLoad(), and decouple it
// from gcs_resource_manager_.
auto aggregate_load = gcs_resource_manager_.GetAggregatedResourceLoad();
for (const auto &[shape, demand] : aggregate_load) {
auto num_pending = demand.num_infeasible_requests_queued() + demand.backlog_size() +
demand.num_ready_requests_queued();
if (num_pending > 0) {
auto pending_req = reply->add_pending_resource_requests();
pending_req->set_count(num_pending);
auto req = pending_req->mutable_request();
req->mutable_resources_bundle()->insert(shape.begin(), shape.end());
}
}
}

void GcsAutoscalerStateManager::GetNodeStates(
rpc::autoscaler::GetClusterResourceStateReply *reply) {
auto populate_node_state = [&](const rpc::GcsNodeInfo &gcs_node_info,
rpc::autoscaler::NodeState::NodeStatus status) {
auto node_state_proto = reply->add_node_states();
node_state_proto->set_node_id(gcs_node_info.node_id());
node_state_proto->set_instance_id(gcs_node_info.instance_id());
node_state_proto->set_node_state_version(last_cluster_resource_state_version_);
node_state_proto->set_status(status);

if (status == rpc::autoscaler::NodeState::ALIVE) {
auto const &node_resource_data = cluster_resource_manager_.GetNodeResources(
scheduling::NodeID(node_state_proto->node_id()));

// Copy resource available
const auto &available = node_resource_data.available.ToResourceMap();
node_state_proto->mutable_available_resources()->insert(available.begin(),
available.end());

// Copy total resources
const auto &total = node_resource_data.total.ToResourceMap();
node_state_proto->mutable_total_resources()->insert(total.begin(), total.end());

// TODO(rickyx): support dynamic labels
}
};

const auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes();
std::for_each(alive_nodes.begin(), alive_nodes.end(), [&](const auto &gcs_node_info) {
populate_node_state(*gcs_node_info.second, rpc::autoscaler::NodeState::ALIVE);
});

// This might be large if there are many nodes for a long-running cluster.
// However, since we don't report resources for a dead node, the data size being
// reported by dead node should be small.
// TODO(rickyx): We will need to GC the head nodes in the future.
// https://github.com/ray-project/ray/issues/35874
const auto &dead_nodes = gcs_node_manager_.GetAllDeadNodes();
std::for_each(dead_nodes.begin(), dead_nodes.end(), [&](const auto &gcs_node_info) {
populate_node_state(*gcs_node_info.second, rpc::autoscaler::NodeState::DEAD);
});
}

} // namespace gcs
} // namespace ray
109 changes: 109 additions & 0 deletions src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2023 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "ray/rpc/gcs_server/gcs_rpc_server.h"
#include "src/ray/protobuf/gcs.pb.h"

namespace ray {
class ClusterResourceManager;
namespace gcs {

class GcsResourceManager;
class GcsNodeManager;

class GcsAutoscalerStateManager : public rpc::AutoscalerStateHandler {
public:
GcsAutoscalerStateManager(const ClusterResourceManager &cluster_resource_manager,
const GcsResourceManager &gcs_resource_manager,
const GcsNodeManager &gcs_node_manager);

void HandleGetClusterResourceState(
rpc::autoscaler::GetClusterResourceStateRequest request,
rpc::autoscaler::GetClusterResourceStateReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void HandleReportAutoscalingState(
rpc::autoscaler::ReportAutoscalingStateRequest request,
rpc::autoscaler::ReportAutoscalingStateReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void RecordMetrics() const { throw std::runtime_error("Unimplemented"); }

std::string DebugString() const { throw std::runtime_error("Unimplemented"); }

private:
/// \brief Increment and get the next cluster resource state version.
/// \return The incremented cluster resource state version.
int64_t IncrementAndGetNextClusterResourceStateVersion() {
return ++last_cluster_resource_state_version_;
}

/// \brief Get the current cluster resource state.
/// \param reply The reply to be filled.
///
/// See rpc::autoscaler::GetClusterResourceStateReply::node_states for more details.
void GetNodeStates(rpc::autoscaler::GetClusterResourceStateReply *reply);

/// \brief Get the resource requests state.
/// \param reply The reply to be filled.
///
/// See rpc::autoscaler::GetClusterResourceStateReply::pending_resource_requests for
/// more details.
void GetPendingResourceRequests(rpc::autoscaler::GetClusterResourceStateReply *reply);

/// \brief Get the gang resource requests (e.g. from placement group) state.
/// \param reply The reply to be filled.
///
/// See rpc::autoscaler::GetClusterResourceStateReply::pending_gang_resource_requests
/// for more
void GetPendingGangResourceRequests(
rpc::autoscaler::GetClusterResourceStateReply *reply);

/// \brief Get the cluster resource constraints state.
/// \param reply The reply to be filled.
///
/// See rpc::autoscaler::GetClusterResourceStateReply::cluster_resource_constraints for
/// more details. This is requested through autoscaler SDK for request_resources().
void GetClusterResourceConstraints(
rpc::autoscaler::GetClusterResourceStateReply *reply);

/// Cluster resources manager that provides cluster resources information.
const ClusterResourceManager &cluster_resource_manager_;

/// Gcs node manager that provides node status information.
const GcsNodeManager &gcs_node_manager_;

/// GCS resource manager that provides resource demand/load information.
const GcsResourceManager &gcs_resource_manager_;

// The default value of the last seen version for the request is 0, which indicates
// no version has been reported. So the first reported version should be 1.
// We currently provide two guarantees for this version:
// 1. It will increase monotonically.
// 2. If a state is updated, the version will be higher.
// Ideally we would want to have a guarantee where consecutive versions will always
// be different, but it's currently hard to do.
// TODO(rickyx): https://github.com/ray-project/ray/issues/35873
// We will need to make the version correct when GCS fails over.
int64_t last_cluster_resource_state_version_ = 0;

/// The last seen autoscaler state version. Use 0 as the default value to indicate
/// no previous autoscaler state has been seen.
int64_t last_seen_autoscaler_state_version_ = 0;
};

} // namespace gcs
} // namespace ray
8 changes: 8 additions & 0 deletions src/ray/gcs/gcs_server/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
namespace ray {
namespace gcs {

class GcsAutoscalerStateManagerTest;
/// GcsNodeManager is responsible for managing and monitoring nodes as well as handing
/// node and resource related rpc requests.
/// This class is not thread-safe.
Expand Down Expand Up @@ -104,6 +105,12 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
return alive_nodes_;
}

/// Get all dead nodes.
const absl::flat_hash_map<NodeID, std::shared_ptr<rpc::GcsNodeInfo>> &GetAllDeadNodes()
const {
return dead_nodes_;
}

/// Add listener to monitor the remove action of nodes.
///
/// \param listener The handler which process the remove of nodes.
Expand Down Expand Up @@ -178,6 +185,7 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
NodeIDAddrBiMap node_map_;

friend GcsMonitorServerTest;
friend GcsAutoscalerStateManagerTest;
};

} // namespace gcs
Expand Down
18 changes: 17 additions & 1 deletion src/ray/gcs/gcs_server/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,26 @@ void GcsResourceManager::HandleReportResourceUsage(
++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST];
}

// TODO(rickyx): We could update the cluster resource manager when we update the load
// so that we will no longer need node_resource_usages_.
std::unordered_map<google::protobuf::Map<std::string, double>, rpc::ResourceDemand>
GcsResourceManager::GetAggregatedResourceLoad() const {
std::unordered_map<google::protobuf::Map<std::string, double>, rpc::ResourceDemand>
aggregate_load;
if (node_resource_usages_.empty()) {
return aggregate_load;
}
for (const auto &usage : node_resource_usages_) {
// Aggregate the load reported by each raylet.
FillAggregateLoad(usage.second, &aggregate_load);
}
return aggregate_load;
}

void GcsResourceManager::FillAggregateLoad(
const rpc::ResourcesData &resources_data,
std::unordered_map<google::protobuf::Map<std::string, double>, rpc::ResourceDemand>
*aggregate_load) {
*aggregate_load) const {
auto load = resources_data.resource_load_by_shape();
for (const auto &demand : load.resource_demands()) {
auto &aggregate_demand = (*aggregate_load)[demand.shape()];
Expand Down
6 changes: 5 additions & 1 deletion src/ray/gcs/gcs_server/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,18 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler,
/// \returns The mapping from node id to latest resource report.
const absl::flat_hash_map<NodeID, rpc::ResourcesData> &NodeResourceReportView() const;

/// Get aggregated resource load of all nodes.
std::unordered_map<google::protobuf::Map<std::string, double>, rpc::ResourceDemand>
GetAggregatedResourceLoad() const;

private:
/// Aggregate nodes' pending task info.
///
/// \param resources_data A node's pending task info (by shape).
/// \param aggregate_load[out] The aggregate pending task info (across the cluster).
void FillAggregateLoad(const rpc::ResourcesData &resources_data,
std::unordered_map<google::protobuf::Map<std::string, double>,
rpc::ResourceDemand> *aggregate_load);
rpc::ResourceDemand> *aggregate_load) const;

/// io context. This is to ensure thread safety. Ideally, all public
/// funciton needs to post job to this io_context.
Expand Down
16 changes: 16 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "ray/common/ray_config.h"
#include "ray/gcs/gcs_client/gcs_client.h"
#include "ray/gcs/gcs_server/gcs_actor_manager.h"
#include "ray/gcs/gcs_server/gcs_autoscaler_state_manager.h"
#include "ray/gcs/gcs_server/gcs_job_manager.h"
#include "ray/gcs/gcs_server/gcs_node_manager.h"
#include "ray/gcs/gcs_server/gcs_placement_group_manager.h"
Expand Down Expand Up @@ -173,6 +174,9 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Install event listeners.
InstallEventListeners();

// Init autoscaling manager
InitGcsAutoscalerStateManager();

// Start RPC server when all tables have finished loading initial
// data.
rpc_server_.Run();
Expand Down Expand Up @@ -575,6 +579,18 @@ void GcsServer::InitGcsWorkerManager() {
rpc_server_.RegisterService(*worker_info_service_);
}

void GcsServer::InitGcsAutoscalerStateManager() {
gcs_autoscaler_state_manager_ = std::make_unique<GcsAutoscalerStateManager>(
cluster_resource_scheduler_->GetClusterResourceManager(),
*gcs_resource_manager_,
*gcs_node_manager_);

autoscaler_state_service_.reset(
new rpc::AutoscalerStateGrpcService(main_service_, *gcs_autoscaler_state_manager_));

rpc_server_.RegisterService(*autoscaler_state_service_);
}

void GcsServer::InitGcsTaskManager() {
gcs_task_manager_ = std::make_unique<GcsTaskManager>();
// Register service.
Expand Down
Loading

0 comments on commit 7bd8886

Please sign in to comment.