Skip to content

Commit

Permalink
[autoscaler v2][2/n] introduce instance_manager protobuf (ray-project…
Browse files Browse the repository at this point in the history
…#34977)

Why are these changes needed?
this is the stack of PRs to introduce new node_provider for autoscaler v2.
Stack of PRs
ray-project#34976
ray-project#34977 <- this PR
ray-project#34979
ray-project#34983
ray-project#34985

This PR introduces the instance_manager interface that autoscaler used to requests for new nodes.
InstanceManagerSerivce allows Autoscaler to get current launched/launching nodes belongs to this cluster. To do so it provides 3 APIs

GetInstanceManagerState: Returns both launching and launched nodes belongs to this cluster.
UpdateInstanceManagerState: Launching or killing nodes, conditioned on Instance manager's version id, or number of successfully applied adjustments.
GetAvailableInstanceTypes: Get the list of available instance types.
The InstanceManager is expected to be strongly consistent and durable (or fate share with the cluster at least)
  • Loading branch information
scv119 committed May 3, 2023
1 parent b6c74c0 commit 5759c41
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 45 deletions.
13 changes: 13 additions & 0 deletions src/ray/protobuf/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ proto_library(
],
)

proto_library(
name = "instance_manager_proto",
srcs = ["experimental/instance_manager.proto"],
)

python_grpc_compile(
name = "instance_manager_py_proto",
deps = [":instance_manager_proto"],
)

proto_library(
name = "runtime_env_common_proto",
srcs = ["runtime_env_common.proto"],
Expand Down Expand Up @@ -351,6 +361,9 @@ cc_proto_library(
proto_library(
name = "autoscaler_proto",
srcs = ["experimental/autoscaler.proto"],
deps = [
":instance_manager_proto",
],
)

python_grpc_compile(
Expand Down
48 changes: 3 additions & 45 deletions src/ray/protobuf/experimental/autoscaler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
syntax = "proto3";
option cc_enable_arenas = true;

package ray.rpc;
package ray.autoscaler;

import "src/ray/protobuf/experimental/instance_manager.proto";

// ============= Cluster Resources ====================
//
Expand Down Expand Up @@ -151,50 +153,6 @@ message GetClusterResourceStateReply {
repeated ClusterResourceConstraint cluster_resource_constraints = 6;
}

message Instance {
enum InstanceStatus {
// The unspecified state - most likey it is queued.
INSTANCE_STATUS_UNSPECIFIED = 0;
// Instance is starting. The first state update received from the
// instance.
STARTING = 1;
// The instance is running - one of two states of a healthy instance.
RUNNING = 2;
// The instance is idle - one of two states of a healthy instance.
IDLE = 3;
// The instance is stopping - usually follows from the RUNNING, IDLE,
// PREEMPT_REQUEST or DRAIN_REQUEST state.
STOPPING = 4;
// The instance is stopped - follows from the STOPPING state.
STOPPED = 5;
// The instance is in a bad state - but it is still able to send updates.
FAILING = 6;
// The subscribe service moves instances to this state if they
// have been idle for too long. This allows the cluster manager to
// make a final decision on whether or not to commence a drain
// sequence for this instance.
DRAIN_CONFIRMATION_PENDING = 7;
// The instance should be drained, Ray should start draining process
// but could reject if failed to drain.
DRAIN_REQUEST = 8;
// The instance will be reempted by the instance manager, regardless
// of whether it is drainable or not.
PREEMPT_REQUEST = 9;
}
// an unique id for the instance that's generated by the
// instance manager. This may be optional if
// the instance hasn't be started yet.
string instance_id = 11;
// the status of the instance.
InstanceStatus status = 12;
// the node id of the instance.
string node_type = 13;
// The corresponding total resources on the node.
map<string, double> total_resources = 14;
// timestamp of the last state changed.
int64 timestamp_since_last_state_change = 15;
}

message ReportAutoscalingStateRequest {
int64 last_seen_cluster_resource_state_version = 1;
// A monotonically increasing version identifies
Expand Down
151 changes: 151 additions & 0 deletions src/ray/protobuf/experimental/instance_manager.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";
option cc_enable_arenas = true;

package ray.autoscaler;

enum RayNodeKind {
UNKNOWN = 0;
HEAD = 1;
WORKER = 2;
}

// A node type that's available for the cluster.
message InstanceType {
// the name of the instance type. e.g. "m4.large"
string type_name = 1;
RayNodeKind node_kind = 2;
// avaialble resources on the node. such as {"CPU": 4, "GPU": 1}
map<string, double> resources = 3;
}

message GetAvailableInstanceTypesRequest {}

message GetAvailableInstanceTypesResponse {
repeated InstanceType instance_types = 1;
// number of instances that are available for starting.
// this can change if the cloud provider has a limit on
// number of instances that can be started.
int64 available_instances = 2;
}

// Represents the state of a launched instance.
// An instance is considered launched as long as
// it has a unique instance_id associated with it.
//
// Note a launched instance may be DEAD. In this case,
// the state will be garbage collected after some timeout
// period (by default 30 minutes).
message Instance {
enum InstanceStatus {
// The unspecified state - most likey it is queued.
INSTANCE_STATUS_UNSPECIFIED = 0;
// Instance is starting. The first state update received from the
// instance.
STARTING = 1;
// The instance is running - one of two states of a healthy instance.
RUNNING = 2;
// The instance is idle - one of two states of a healthy instance.
IDLE = 3;
// The instance is stopping - usually follows from the RUNNING, IDLE,
// PREEMPT_REQUEST or DRAIN_REQUEST state.
STOPPING = 4;
// The instance is stopped - follows from the STOPPING state.
STOPPED = 5;
// The instance is in a bad state - but it is still able to send updates.
FAILING = 6;
// The subscribe service moves instances to this state if they
// have been idle for too long. This allows the cluster manager to
// make a final decision on whether or not to commence a drain
// sequence for this instance.
DRAIN_CONFIRMATION_PENDING = 7;
// The instance should be drained, Ray should start draining process
// but could reject if failed to drain.
DRAIN_REQUEST = 8;
// The instance will be preempted by the instance manager, regardless
// of whether it is drainable or not.
PREEMPT_REQUEST = 9;
// An optional state that can be used to indicate that the instance
// is allocated from cloud provider, but ray hasn't been installed yet.
INSTANCE_ALLOCATED = 10;
// An optional state that can be used to indicate that the instance
// is currently installing Ray.
INSTALLING_RAY = 11;
// An optional state that can be used to indicate that the instance
// failed to allocate from cloud provider.
ALLOCATION_FAILED = 12;
// Node is deleted.
GARAGE_COLLECTED = 13;
}
// an unique id for the instance that's generated by the
// instance manager. This may be optional if
// the instance hasn't be started yet.
string instance_id = 11;
// the status of the instance.
InstanceStatus status = 12;
// the node type of the instance.
string node_type = 13;
// The corresponding total resources on the node.
map<string, double> total_resources = 14;
// timestamp of the last state changed.
int64 timestamp_since_last_state_change = 15;
// the external id of the instance that's generated by
// the cloud provider like AWS, GCP, etc.
// Note this id can be reused by different instances.
string cloud_instance_id = 16;
// internal ip address of the instance.
string internal_ip = 17;
// external ip address of the instance.
string external_ip = 18;
// the monotonically increasing version number of the instance.
int64 version = 19;
}

message UpdateInstanceManagerStateRequest {
int64 expected_version = 1;
repeated InstanceType new_nodes_to_start = 2;
repeated string instance_ids_to_terminate = 3;
}

message UpdateInstanceManagerStateReply {
bool success = 1;
string error_message = 2;
int64 version = 3;
}

message InstanceManagerState {
// a monotonically increasing version number.
// the version number is incremented whenever
// the state is updated (either by successful adjusting request,
// or instance state change).
int64 version = 1;
repeated Instance instances = 2;
}

message GetInstanceManagerStateRequest {}

message GetInstanceManagerStateReply {
InstanceManagerState state = 1;
}

service InstanceManagerService {
rpc GetInstanceManagerState(GetInstanceManagerStateRequest)
returns (GetInstanceManagerStateReply);
rpc UpdateInstanceManagerState(UpdateInstanceManagerStateRequest)
returns (UpdateInstanceManagerStateReply);
rpc GetAvailableInstanceTypes(GetAvailableInstanceTypesRequest)
returns (GetAvailableInstanceTypesResponse);
}

0 comments on commit 5759c41

Please sign in to comment.