Skip to content

Commit

Permalink
[autoscaler v2][4/n] introducing node-provider and node-provider-conf…
Browse files Browse the repository at this point in the history
…ig (ray-project#34983)

Why are these changes needed?
this is the stack of PRs to introduce new node_provider for autoscaler v2.

Stack of PRs
ray-project#34976
ray-project#34977
ray-project#34979
ray-project#34983 <- this PR
ray-project#34985

This PR introduces node provider where instance manager can allocates instances from. Implementation wise, it's a wrapper around the v1 node provider, node launcher and node updater
  • Loading branch information
scv119 committed May 15, 2023
1 parent 21e9d38 commit a0e318b
Show file tree
Hide file tree
Showing 3 changed files with 304 additions and 0 deletions.
102 changes: 102 additions & 0 deletions python/ray/autoscaler/v2/instance_manager/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import copy
from typing import Any, Dict, List

from ray.autoscaler._private.util import hash_runtime_conf
from ray.core.generated.instance_manager_pb2 import Instance


class NodeProviderConfig(object):
"""
NodeProviderConfig is the helper class to provide instance
related configs.
"""

def __init__(self, node_configs: Dict[str, Any]) -> None:
self._sync_continuously = False
self.update_configs(node_configs)

def update_configs(self, node_configs: Dict[str, Any]) -> None:
self._node_configs = node_configs
self._calculate_hashes()
self._sync_continuously = self._node_configs.get(
"generate_file_mounts_contents_hash", True
)

def _calculate_hashes(self) -> None:
self._runtime_hash, self._file_mounts_contents_hash = hash_runtime_conf(
self._node_configs["file_mounts"],
self._node_configs["cluster_synced_files"],
[
self._node_configs["worker_setup_commands"],
self._node_configs["worker_start_ray_commands"],
],
generate_file_mounts_contents_hash=self._node_configs.get(
"generate_file_mounts_contents_hash", True
),
)

def get_node_config(self, instance_type_name: str) -> Dict[str, Any]:
return copy.deepcopy(
self._node_configs["available_node_types"][instance_type_name][
"node_config"
]
)

def get_docker_config(self, instance_type_name: str) -> Dict[str, Any]:
if "docker" not in self._node_configs:
return {}
docker_config = copy.deepcopy(self._node_configs.get("docker", {}))
node_specific_docker_config = self._node_configs["available_node_types"][
instance_type_name
].get("docker", {})
docker_config.update(node_specific_docker_config)
return docker_config

def get_worker_start_ray_commands(self, instance: Instance) -> List[str]:
if (
instance.num_successful_updates > 0
and not self._node_config_provider.restart_only
):
return []
return self._node_configs["worker_start_ray_commands"]

def get_worker_setup_commands(self, instance: Instance) -> List[str]:
if (
instance.num_successful_updates > 0
and self._node_config_provider.restart_only
):
return []

return self._node_configs["available_node_types"][instance.name][
"worker_setup_commands"
]

def get_node_type_specific_config(
self, instance_type_name: str, config_name: str
) -> Any:
config = self._node_config_provider.get_config(config_name)
node_specific_config = self._node_configs["available_node_types"][
instance_type_name
]
if config_name in node_specific_config:
config = node_specific_config[config_name]
return config

def get_config(self, config_name, default=None) -> Any:
return self._node_configs.get(config_name, default)

@property
def restart_only(self) -> bool:
return self._node_configs.get("restart_only", False)

@property
def no_restart(self) -> bool:
return self._node_configs.get("no_restart", False)

@property
def runtime_hash(self) -> str:
return self._runtime_hash

@property
def file_mounts_contents_hash(self) -> str:
return self._file_mounts_contents_hash
136 changes: 136 additions & 0 deletions python/ray/autoscaler/v2/instance_manager/node_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import logging
from abc import ABCMeta, abstractmethod
from typing import Dict, List, Set, override

from ray.autoscaler._private.node_launcher import BaseNodeLauncher
from ray.autoscaler.node_provider import NodeProvider as NodeProviderV1
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE
from ray.autoscaler.v2.instance_manager.config import NodeProviderConfig
from ray.core.generated.instance_manager_pb2 import Instance, InstanceType

logger = logging.getLogger(__name__)


class NodeProvider(metaclass=ABCMeta):
"""NodeProvider defines the interface for
interacting with cloud provider, such as AWS, GCP, Azure, etc.
"""

@abstractmethod
def create_nodes(self, instance_type: InstanceType, count: int) -> List[str]:
"""Create new nodes synchronously, returns all non-terminated nodes in the cluster.
Note that create_nodes could fail partially.
"""
pass

@abstractmethod
def async_terminate_nodes(self, cloud_instance_ids: List[str]) -> None:
"""
Terminate nodes asynchronously, returns immediately."""
pass

@abstractmethod
def get_non_terminated_nodes(
self,
) -> Dict[str, Instance]:
"""Get all non-terminated nodes in the cluster"""
pass

@abstractmethod
def get_nodes_by_cloud_id(
self,
cloud_instance_ids: List[str],
) -> Dict[str, Instance]:
"""Get nodes by node ids, including terminated nodes"""
pass

@abstractmethod
def is_readonly(self) -> bool:
return False


class NodeProviderAdapter(NodeProvider):
"""
Warps a NodeProviderV1 to a NodeProvider.
"""

def __init__(
self,
provider: NodeProviderV1,
node_launcher: BaseNodeLauncher,
instance_config_provider: NodeProviderConfig,
) -> None:
super().__init__()
self._provider = provider
self._node_launcher = node_launcher
self._config = instance_config_provider

def _filter_instances(
self,
instances: Dict[str, Instance],
instance_ids_filter: Set[str],
instance_states_filter: Set[int],
) -> Dict[str, Instance]:
filtered = {}
for instance_id, instance in instances.items():
if instance_ids_filter and instance_id not in instance_ids_filter:
continue
if instance_states_filter and instance.state not in instance_states_filter:
continue
filtered[instance_id] = instance
return filtered

@override
def create_nodes(self, instance_type: InstanceType, count: int) -> List[Instance]:
result = self._node_launcher.launch_node(
self._config.get_node_config(instance_type.name),
count,
instance_type.name,
)
# TODO: we should handle failures where the instance type is
# not available
if result:
return [
self._get_instance(cloud_instance_id)
for cloud_instance_id in result.keys()
]
return []

@override
def async_terminate_nodes(self, clould_instance_ids: List[str]) -> None:
self._provider.terminate_node(clould_instance_ids)

@override
def is_readonly(self) -> bool:
return self._provider.is_readonly()

@override
def get_non_terminated_nodes(self):
clould_instance_ids = self._provider.non_terminated_nodes()
return self.get_nodes_by_id(clould_instance_ids)

@override
def get_nodes_by_cloud_id(
self,
cloud_instance_ids: List[str],
) -> Dict[str, Instance]:
instances = {}
for cloud_instance_id in cloud_instance_ids:
instances[cloud_instance_id] = self._get_instance(cloud_instance_id)
return instances

def _get_instance(self, cloud_instance_id: str) -> Instance:
instance = Instance()
instance.cloud_instance_id = cloud_instance_id
if self._provider.is_running(cloud_instance_id):
instance.state = Instance.STARTING
elif self._provider.is_terminated(cloud_instance_id):
instance.state = Instance.STOPPED
else:
instance.state = Instance.INSTANCE_STATUS_UNSPECIFIED
instance.interal_ip = self._provider.internal_ip(cloud_instance_id)
instance.external_ip = self._provider.external_ip(cloud_instance_id)
instance.instance_type = self._provider.node_tags(cloud_instance_id)[
TAG_RAY_USER_NODE_TYPE
]
return instance
66 changes: 66 additions & 0 deletions python/ray/autoscaler/v2/instance_manager/ray_installer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import logging

from ray.autoscaler._private.updater import NodeUpdater
from ray.autoscaler._private.util import with_head_node_ip
from ray.autoscaler.node_provider import NodeProvider as NodeProviderV1
from ray.autoscaler.v2.instance_manager.config import NodeProviderConfig
from ray.core.generated.instance_manager_pb2 import Instance

logger = logging.getLogger(__name__)


class RayInstaller(object):
"""
RayInstaller is responsible for installing ray on the target instance.
"""

def __init__(
self,
provider: NodeProviderV1,
config: NodeProviderConfig,
) -> None:
self._provider = provider
self._config = config

def install_ray(self, instance: Instance, head_node_ip: str) -> bool:
"""
Install ray on the target instance synchronously.
"""

setup_commands = self._config.get_worker_setup_commands(instance)
ray_start_commands = self._config.get_worker_start_ray_commands(instance)
docker_config = self._config.get_docker_config(instance)

logger.info(
f"Creating new (spawn_updater) updater thread for node"
f" {instance.cloud_instance_id}."
)
updater = NodeUpdater(
node_id=instance.instance_id,
provider_config=self._config.get_config("provider"),
provider=self._provider,
auth_config=self._config.get_config("auth"),
cluster_name=self._config.get_config("cluster_name"),
file_mounts=self._config.get_config("file_mounts"),
initialization_commands=with_head_node_ip(
self.get_node_type_specific_config(
instance.instance_id, "initialization_commands"
),
head_node_ip,
),
setup_commands=with_head_node_ip(setup_commands, head_node_ip),
ray_start_commands=with_head_node_ip(ray_start_commands, head_node_ip),
runtime_hash=self._config.runtime_hash,
file_mounts_contents_hash=self._config.file_mounts_contents_hash,
is_head_node=False,
cluster_synced_files=self._config.get_config("cluster_synced_files"),
rsync_options={
"rsync_exclude": self._config.get_config("rsync_exclude"),
"rsync_filter": self._config.get_config("rsync_filter"),
},
use_internal_ip=True,
docker_config=docker_config,
node_resources=instance.node_resources,
)
updater.run()
# TODO: handle failures

0 comments on commit a0e318b

Please sign in to comment.