From 68c11a862bdfa97ba18e92fdea326d71a154a5d7 Mon Sep 17 00:00:00 2001 From: Chen Shen Date: Wed, 31 May 2023 11:00:32 -0700 Subject: [PATCH] [autoscaler v2] test ray-installer (#35875) added unit test for ray installer. also add helper function to NodeProviderConfig. --- python/ray/autoscaler/v2/BUILD | 8 +++ .../autoscaler/v2/instance_manager/config.py | 7 ++ .../v2/instance_manager/ray_installer.py | 24 ++++--- python/ray/autoscaler/v2/tests/test_config.py | 4 ++ .../autoscaler/v2/tests/test_ray_installer.py | 64 +++++++++++++++++++ .../test_cli_patterns/test_ray_complex.yaml | 3 +- 6 files changed, 101 insertions(+), 9 deletions(-) create mode 100644 python/ray/autoscaler/v2/tests/test_ray_installer.py diff --git a/python/ray/autoscaler/v2/BUILD b/python/ray/autoscaler/v2/BUILD index bc4657c3e37ff..37e31ce870506 100644 --- a/python/ray/autoscaler/v2/BUILD +++ b/python/ray/autoscaler/v2/BUILD @@ -38,3 +38,11 @@ py_test( tags = ["team:core"], deps = ["//:ray_lib",], ) + +py_test( + name = "test_ray_installer", + size = "small", + srcs = ["tests/test_ray_installer.py"], + tags = ["team:core"], + deps = ["//:ray_lib",], +) diff --git a/python/ray/autoscaler/v2/instance_manager/config.py b/python/ray/autoscaler/v2/instance_manager/config.py index c6400e23a2e48..86e423b27d8d5 100644 --- a/python/ray/autoscaler/v2/instance_manager/config.py +++ b/python/ray/autoscaler/v2/instance_manager/config.py @@ -82,6 +82,13 @@ def get_node_type_specific_config( config = node_specific_config[config_name] return config + def get_node_resources(self, instance_type_name: str) -> Dict[str, float]: + return copy.deepcopy( + self._node_configs["available_node_types"][instance_type_name].get( + "resources", {} + ) + ) + def get_config(self, config_name, default=None) -> Any: return self._node_configs.get(config_name, default) diff --git a/python/ray/autoscaler/v2/instance_manager/ray_installer.py b/python/ray/autoscaler/v2/instance_manager/ray_installer.py index 375db2f49d528..6596f01d0a233 100644 --- a/python/ray/autoscaler/v2/instance_manager/ray_installer.py +++ b/python/ray/autoscaler/v2/instance_manager/ray_installer.py @@ -1,4 +1,5 @@ import logging +import subprocess from ray.autoscaler._private.updater import NodeUpdater from ray.autoscaler._private.util import with_envs, with_head_node_ip @@ -18,18 +19,20 @@ def __init__( self, provider: NodeProviderV1, config: NodeProviderConfig, + process_runner=subprocess, ) -> None: self._provider = provider self._config = config + self._process_runner = process_runner def install_ray(self, instance: Instance, head_node_ip: str) -> bool: """ Install ray on the target instance synchronously. """ - setup_commands = self._config.get_worker_setup_commands(instance) - ray_start_commands = self._config.get_worker_start_ray_commands(instance) - docker_config = self._config.get_docker_config(instance) + setup_commands = self._config.get_worker_setup_commands(instance.instance_type) + ray_start_commands = self._config.get_worker_start_ray_commands() + docker_config = self._config.get_docker_config(instance.instance_type) logger.info( f"Creating new (spawn_updater) updater thread for node" @@ -43,8 +46,8 @@ def install_ray(self, instance: Instance, head_node_ip: str) -> bool: cluster_name=self._config.get_config("cluster_name"), file_mounts=self._config.get_config("file_mounts"), initialization_commands=with_head_node_ip( - self.get_node_type_specific_config( - instance.instance_id, "initialization_commands" + self._config.get_node_type_specific_config( + instance.instance_type, "initialization_commands" ), head_node_ip, ), @@ -70,7 +73,12 @@ def install_ray(self, instance: Instance, head_node_ip: str) -> bool: }, use_internal_ip=True, docker_config=docker_config, - node_resources=instance.node_resources, + node_resources=self._config.get_node_resources(instance.instance_type), + process_runner=self._process_runner, ) - updater.run() - # TODO: handle failures + try: + updater.run() + except Exception: + # Errors has already been handled. + return False + return True diff --git a/python/ray/autoscaler/v2/tests/test_config.py b/python/ray/autoscaler/v2/tests/test_config.py index b88a805ed172c..c8d7144369746 100644 --- a/python/ray/autoscaler/v2/tests/test_config.py +++ b/python/ray/autoscaler/v2/tests/test_config.py @@ -73,6 +73,10 @@ def test_complex(): "worker_nodes1", "initialization_commands" ) == ["echo init"] + assert config.get_node_resources("worker_nodes1") == {"CPU": 2} + + assert config.get_node_resources("worker_nodes") == {} + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): diff --git a/python/ray/autoscaler/v2/tests/test_ray_installer.py b/python/ray/autoscaler/v2/tests/test_ray_installer.py new file mode 100644 index 0000000000000..586a8e30d50c3 --- /dev/null +++ b/python/ray/autoscaler/v2/tests/test_ray_installer.py @@ -0,0 +1,64 @@ +# coding: utf-8 +import os +import sys +import unittest + +import pytest # noqa + +from ray._private.test_utils import load_test_config +from ray.autoscaler.tags import TAG_RAY_NODE_KIND +from ray.autoscaler.v2.instance_manager.config import NodeProviderConfig +from ray.autoscaler.v2.instance_manager.ray_installer import RayInstaller +from ray.core.generated.instance_manager_pb2 import Instance +from ray.tests.autoscaler_test_utils import MockProcessRunner, MockProvider + + +class RayInstallerTest(unittest.TestCase): + def setUp(self): + self.base_provider = MockProvider() + self.instance_config_provider = NodeProviderConfig( + load_test_config("test_ray_complex.yaml") + ) + self.runner = MockProcessRunner() + self.ray_installer = RayInstaller( + self.base_provider, self.instance_config_provider, self.runner + ) + + def test_install_succeeded(self): + self.base_provider.create_node({}, {TAG_RAY_NODE_KIND: "worker_nodes1"}, 1) + self.runner.respond_to_call("json .Config.Env", ["[]" for i in range(1)]) + + assert self.ray_installer.install_ray( + Instance( + instance_id="0", instance_type="worker_nodes1", cloud_instance_id="0" + ), + head_node_ip="1.2.3.4", + ) + + def test_install_failed(self): + # creation failed because no such node. + assert not self.ray_installer.install_ray( + Instance( + instance_id="0", instance_type="worker_nodes1", cloud_instance_id="0" + ), + head_node_ip="1.2.3.4", + ) + + self.base_provider.create_node({}, {TAG_RAY_NODE_KIND: "worker_nodes1"}, 1) + self.runner.fail_cmds = ["setup_cmd"] + self.runner.respond_to_call("json .Config.Env", ["[]" for i in range(1)]) + + # creation failed because setup command failed. + assert self.ray_installer.install_ray( + Instance( + instance_id="0", instance_type="worker_nodes1", cloud_instance_id="0" + ), + head_node_ip="1.2.3.4", + ) + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/test_cli_patterns/test_ray_complex.yaml b/python/ray/tests/test_cli_patterns/test_ray_complex.yaml index f32094662ed3d..533ebe058dc2d 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_complex.yaml +++ b/python/ray/tests/test_cli_patterns/test_ray_complex.yaml @@ -24,7 +24,8 @@ available_node_types: initialization_commands: - echo what worker_nodes1: - resources: {} + resources: + CPU: 2 max_workers: 2 min_workers: 1 node_config: