From de83534c11398f71ba784c78d62ba747db0387c1 Mon Sep 17 00:00:00 2001 From: scv119 Date: Tue, 30 May 2023 00:08:41 -0700 Subject: [PATCH 1/5] add test --- .../v2/instance_manager/ray_installer.py | 12 ++- .../autoscaler/v2/tests/test_ray_install.py | 94 +++++++++++++++++++ 2 files changed, 104 insertions(+), 2 deletions(-) create mode 100644 python/ray/autoscaler/v2/tests/test_ray_install.py diff --git a/python/ray/autoscaler/v2/instance_manager/ray_installer.py b/python/ray/autoscaler/v2/instance_manager/ray_installer.py index 375db2f49d528..2454118f5def5 100644 --- a/python/ray/autoscaler/v2/instance_manager/ray_installer.py +++ b/python/ray/autoscaler/v2/instance_manager/ray_installer.py @@ -1,4 +1,5 @@ import logging +import subprocess from ray.autoscaler._private.updater import NodeUpdater from ray.autoscaler._private.util import with_envs, with_head_node_ip @@ -18,9 +19,11 @@ def __init__( self, provider: NodeProviderV1, config: NodeProviderConfig, + process_runner=subprocess, ) -> None: self._provider = provider self._config = config + self._process_runner = process_runner def install_ray(self, instance: Instance, head_node_ip: str) -> bool: """ @@ -71,6 +74,11 @@ def install_ray(self, instance: Instance, head_node_ip: str) -> bool: use_internal_ip=True, docker_config=docker_config, node_resources=instance.node_resources, + process_runner=self._process_runner, ) - updater.run() - # TODO: handle failures + try: + updater.run() + except Exception as e: + # Errors has already been handled. + return False + return True diff --git a/python/ray/autoscaler/v2/tests/test_ray_install.py b/python/ray/autoscaler/v2/tests/test_ray_install.py new file mode 100644 index 0000000000000..911810c95fa0f --- /dev/null +++ b/python/ray/autoscaler/v2/tests/test_ray_install.py @@ -0,0 +1,94 @@ +# coding: utf-8 +import os +import sys +import unittest + +import pytest # noqa + +from ray._private.test_utils import load_test_config +from ray.autoscaler.v2.instance_manager.config import NodeProviderConfig +from ray.autoscaler.v2.instance_manager.ray_installer import RayInstaller +from ray.core.generated.instance_manager_pb2 import Instance +from ray.tests.autoscaler_test_utils import MockProcessRunner, MockProvider + + +class FakeCounter: + def dec(self, *args, **kwargs): + pass + + +class RayInstallerTest(unittest.TestCase): + def setUp(self): + self.base_provider = MockProvider() + self.instance_config_provider = NodeProviderConfig( + load_test_config("test_ray_complex.yaml") + ) + self.ray_installer = RayInstaller( + self.base_provider, self.instance_config_provider, MockProcessRunner() + ) + + def test_node_providers_pass_through(self): + nodes = self.node_provider.create_nodes("worker_nodes1", 1) + assert len(nodes) == 1 + assert nodes[0] == Instance( + instance_type="worker_nodes1", + cloud_instance_id="0", + internal_ip="172.0.0.0", + external_ip="1.2.3.4", + status=Instance.INSTANCE_STATUS_UNSPECIFIED, + ) + self.assertEqual(len(self.base_provider.mock_nodes), 1) + self.assertEqual(self.node_provider.get_non_terminated_nodes(), {"0": nodes[0]}) + nodes1 = self.node_provider.create_nodes("worker_nodes", 2) + assert len(nodes1) == 2 + assert nodes1[0] == Instance( + instance_type="worker_nodes", + cloud_instance_id="1", + internal_ip="172.0.0.1", + external_ip="1.2.3.4", + status=Instance.INSTANCE_STATUS_UNSPECIFIED, + ) + assert nodes1[1] == Instance( + instance_type="worker_nodes", + cloud_instance_id="2", + internal_ip="172.0.0.2", + external_ip="1.2.3.4", + status=Instance.INSTANCE_STATUS_UNSPECIFIED, + ) + self.assertEqual( + self.node_provider.get_non_terminated_nodes(), + {"0": nodes[0], "1": nodes1[0], "2": nodes1[1]}, + ) + self.assertEqual( + self.node_provider.get_nodes_by_cloud_instance_id(["0"]), + { + "0": nodes[0], + }, + ) + self.node_provider.terminate_node("0") + self.assertEqual( + self.node_provider.get_non_terminated_nodes(), + {"1": nodes1[0], "2": nodes1[1]}, + ) + self.assertFalse(self.node_provider.is_readonly()) + + def test_create_node_failure(self): + self.base_provider.error_creates = NodeLaunchException( + "hello", "failed to create node", src_exc_info=None + ) + self.assertEqual(self.node_provider.create_nodes("worker_nodes1", 1), []) + self.assertEqual(len(self.base_provider.mock_nodes), 0) + self.assertTrue( + "worker_nodes1" in self.availability_tracker.summary().node_availabilities + ) + self.assertEqual( + self.node_provider.get_non_terminated_nodes(), + {}, + ) + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) From 376b6aef384d6f7074e18bd686e84aefbda2d77a Mon Sep 17 00:00:00 2001 From: scv119 Date: Tue, 30 May 2023 00:52:53 -0700 Subject: [PATCH 2/5] update --- .../autoscaler/v2/instance_manager/config.py | 7 ++ .../v2/instance_manager/ray_installer.py | 14 ++-- python/ray/autoscaler/v2/tests/test_config.py | 4 ++ .../autoscaler/v2/tests/test_ray_install.py | 69 +++---------------- .../test_cli_patterns/test_ray_complex.yaml | 3 +- 5 files changed, 28 insertions(+), 69 deletions(-) diff --git a/python/ray/autoscaler/v2/instance_manager/config.py b/python/ray/autoscaler/v2/instance_manager/config.py index c6400e23a2e48..86e423b27d8d5 100644 --- a/python/ray/autoscaler/v2/instance_manager/config.py +++ b/python/ray/autoscaler/v2/instance_manager/config.py @@ -82,6 +82,13 @@ def get_node_type_specific_config( config = node_specific_config[config_name] return config + def get_node_resources(self, instance_type_name: str) -> Dict[str, float]: + return copy.deepcopy( + self._node_configs["available_node_types"][instance_type_name].get( + "resources", {} + ) + ) + def get_config(self, config_name, default=None) -> Any: return self._node_configs.get(config_name, default) diff --git a/python/ray/autoscaler/v2/instance_manager/ray_installer.py b/python/ray/autoscaler/v2/instance_manager/ray_installer.py index 2454118f5def5..6596f01d0a233 100644 --- a/python/ray/autoscaler/v2/instance_manager/ray_installer.py +++ b/python/ray/autoscaler/v2/instance_manager/ray_installer.py @@ -30,9 +30,9 @@ def install_ray(self, instance: Instance, head_node_ip: str) -> bool: Install ray on the target instance synchronously. """ - setup_commands = self._config.get_worker_setup_commands(instance) - ray_start_commands = self._config.get_worker_start_ray_commands(instance) - docker_config = self._config.get_docker_config(instance) + setup_commands = self._config.get_worker_setup_commands(instance.instance_type) + ray_start_commands = self._config.get_worker_start_ray_commands() + docker_config = self._config.get_docker_config(instance.instance_type) logger.info( f"Creating new (spawn_updater) updater thread for node" @@ -46,8 +46,8 @@ def install_ray(self, instance: Instance, head_node_ip: str) -> bool: cluster_name=self._config.get_config("cluster_name"), file_mounts=self._config.get_config("file_mounts"), initialization_commands=with_head_node_ip( - self.get_node_type_specific_config( - instance.instance_id, "initialization_commands" + self._config.get_node_type_specific_config( + instance.instance_type, "initialization_commands" ), head_node_ip, ), @@ -73,12 +73,12 @@ def install_ray(self, instance: Instance, head_node_ip: str) -> bool: }, use_internal_ip=True, docker_config=docker_config, - node_resources=instance.node_resources, + node_resources=self._config.get_node_resources(instance.instance_type), process_runner=self._process_runner, ) try: updater.run() - except Exception as e: + except Exception: # Errors has already been handled. return False return True diff --git a/python/ray/autoscaler/v2/tests/test_config.py b/python/ray/autoscaler/v2/tests/test_config.py index b88a805ed172c..c8d7144369746 100644 --- a/python/ray/autoscaler/v2/tests/test_config.py +++ b/python/ray/autoscaler/v2/tests/test_config.py @@ -73,6 +73,10 @@ def test_complex(): "worker_nodes1", "initialization_commands" ) == ["echo init"] + assert config.get_node_resources("worker_nodes1") == {"CPU": 2} + + assert config.get_node_resources("worker_nodes") == {} + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): diff --git a/python/ray/autoscaler/v2/tests/test_ray_install.py b/python/ray/autoscaler/v2/tests/test_ray_install.py index 911810c95fa0f..8463be3d8c54b 100644 --- a/python/ray/autoscaler/v2/tests/test_ray_install.py +++ b/python/ray/autoscaler/v2/tests/test_ray_install.py @@ -12,11 +12,6 @@ from ray.tests.autoscaler_test_utils import MockProcessRunner, MockProvider -class FakeCounter: - def dec(self, *args, **kwargs): - pass - - class RayInstallerTest(unittest.TestCase): def setUp(self): self.base_provider = MockProvider() @@ -27,64 +22,16 @@ def setUp(self): self.base_provider, self.instance_config_provider, MockProcessRunner() ) - def test_node_providers_pass_through(self): - nodes = self.node_provider.create_nodes("worker_nodes1", 1) - assert len(nodes) == 1 - assert nodes[0] == Instance( - instance_type="worker_nodes1", - cloud_instance_id="0", - internal_ip="172.0.0.0", - external_ip="1.2.3.4", - status=Instance.INSTANCE_STATUS_UNSPECIFIED, - ) - self.assertEqual(len(self.base_provider.mock_nodes), 1) - self.assertEqual(self.node_provider.get_non_terminated_nodes(), {"0": nodes[0]}) - nodes1 = self.node_provider.create_nodes("worker_nodes", 2) - assert len(nodes1) == 2 - assert nodes1[0] == Instance( - instance_type="worker_nodes", - cloud_instance_id="1", - internal_ip="172.0.0.1", - external_ip="1.2.3.4", - status=Instance.INSTANCE_STATUS_UNSPECIFIED, - ) - assert nodes1[1] == Instance( - instance_type="worker_nodes", - cloud_instance_id="2", - internal_ip="172.0.0.2", - external_ip="1.2.3.4", - status=Instance.INSTANCE_STATUS_UNSPECIFIED, - ) - self.assertEqual( - self.node_provider.get_non_terminated_nodes(), - {"0": nodes[0], "1": nodes1[0], "2": nodes1[1]}, + def test_install_succeeded(self): + assert self.ray_installer.install_ray( + Instance( + instance_id="0", instance_type="worker_nodes1", cloud_instance_id="0" + ), + head_node_ip="1.2.3.4", ) - self.assertEqual( - self.node_provider.get_nodes_by_cloud_instance_id(["0"]), - { - "0": nodes[0], - }, - ) - self.node_provider.terminate_node("0") - self.assertEqual( - self.node_provider.get_non_terminated_nodes(), - {"1": nodes1[0], "2": nodes1[1]}, - ) - self.assertFalse(self.node_provider.is_readonly()) - def test_create_node_failure(self): - self.base_provider.error_creates = NodeLaunchException( - "hello", "failed to create node", src_exc_info=None - ) - self.assertEqual(self.node_provider.create_nodes("worker_nodes1", 1), []) - self.assertEqual(len(self.base_provider.mock_nodes), 0) - self.assertTrue( - "worker_nodes1" in self.availability_tracker.summary().node_availabilities - ) - self.assertEqual( - self.node_provider.get_non_terminated_nodes(), - {}, - ) + def test_install_failure(self): + pass if __name__ == "__main__": diff --git a/python/ray/tests/test_cli_patterns/test_ray_complex.yaml b/python/ray/tests/test_cli_patterns/test_ray_complex.yaml index f32094662ed3d..533ebe058dc2d 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_complex.yaml +++ b/python/ray/tests/test_cli_patterns/test_ray_complex.yaml @@ -24,7 +24,8 @@ available_node_types: initialization_commands: - echo what worker_nodes1: - resources: {} + resources: + CPU: 2 max_workers: 2 min_workers: 1 node_config: From 48b3e877710005226779bb02f0589e0aad3cf328 Mon Sep 17 00:00:00 2001 From: scv119 Date: Tue, 30 May 2023 01:18:01 -0700 Subject: [PATCH 3/5] update --- .../v2/instance_manager/ray_installer.py | 2 +- .../ray/autoscaler/v2/tests/test_ray_install.py | 16 +++++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/python/ray/autoscaler/v2/instance_manager/ray_installer.py b/python/ray/autoscaler/v2/instance_manager/ray_installer.py index 6596f01d0a233..2cd093f9cea05 100644 --- a/python/ray/autoscaler/v2/instance_manager/ray_installer.py +++ b/python/ray/autoscaler/v2/instance_manager/ray_installer.py @@ -78,7 +78,7 @@ def install_ray(self, instance: Instance, head_node_ip: str) -> bool: ) try: updater.run() - except Exception: + except Exception as e: # Errors has already been handled. return False return True diff --git a/python/ray/autoscaler/v2/tests/test_ray_install.py b/python/ray/autoscaler/v2/tests/test_ray_install.py index 8463be3d8c54b..7bac6039de3ee 100644 --- a/python/ray/autoscaler/v2/tests/test_ray_install.py +++ b/python/ray/autoscaler/v2/tests/test_ray_install.py @@ -6,6 +6,7 @@ import pytest # noqa from ray._private.test_utils import load_test_config +from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_NODE_STATUS from ray.autoscaler.v2.instance_manager.config import NodeProviderConfig from ray.autoscaler.v2.instance_manager.ray_installer import RayInstaller from ray.core.generated.instance_manager_pb2 import Instance @@ -18,11 +19,15 @@ def setUp(self): self.instance_config_provider = NodeProviderConfig( load_test_config("test_ray_complex.yaml") ) + self.runner = MockProcessRunner() self.ray_installer = RayInstaller( - self.base_provider, self.instance_config_provider, MockProcessRunner() + self.base_provider, self.instance_config_provider, self.runner ) def test_install_succeeded(self): + self.base_provider.create_node({}, {TAG_RAY_NODE_KIND: "worker_nodes1"}, 1) + self.runner.respond_to_call("json .Config.Env", ["[]" for i in range(1)]) + assert self.ray_installer.install_ray( Instance( instance_id="0", instance_type="worker_nodes1", cloud_instance_id="0" @@ -30,8 +35,13 @@ def test_install_succeeded(self): head_node_ip="1.2.3.4", ) - def test_install_failure(self): - pass + def test_install_failed(self): + assert not self.ray_installer.install_ray( + Instance( + instance_id="0", instance_type="worker_nodes1", cloud_instance_id="0" + ), + head_node_ip="1.2.3.4", + ) if __name__ == "__main__": From 638fe0e2134f763bc586118021585d50c6d35189 Mon Sep 17 00:00:00 2001 From: scv119 Date: Tue, 30 May 2023 01:20:33 -0700 Subject: [PATCH 4/5] update --- python/ray/autoscaler/v2/instance_manager/ray_installer.py | 2 +- python/ray/autoscaler/v2/tests/test_ray_install.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/autoscaler/v2/instance_manager/ray_installer.py b/python/ray/autoscaler/v2/instance_manager/ray_installer.py index 2cd093f9cea05..6596f01d0a233 100644 --- a/python/ray/autoscaler/v2/instance_manager/ray_installer.py +++ b/python/ray/autoscaler/v2/instance_manager/ray_installer.py @@ -78,7 +78,7 @@ def install_ray(self, instance: Instance, head_node_ip: str) -> bool: ) try: updater.run() - except Exception as e: + except Exception: # Errors has already been handled. return False return True diff --git a/python/ray/autoscaler/v2/tests/test_ray_install.py b/python/ray/autoscaler/v2/tests/test_ray_install.py index 7bac6039de3ee..ec48949eff10d 100644 --- a/python/ray/autoscaler/v2/tests/test_ray_install.py +++ b/python/ray/autoscaler/v2/tests/test_ray_install.py @@ -6,7 +6,7 @@ import pytest # noqa from ray._private.test_utils import load_test_config -from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_NODE_STATUS +from ray.autoscaler.tags import TAG_RAY_NODE_KIND from ray.autoscaler.v2.instance_manager.config import NodeProviderConfig from ray.autoscaler.v2.instance_manager.ray_installer import RayInstaller from ray.core.generated.instance_manager_pb2 import Instance From baaa51be4034d13e20fbe878e7201080a45a38cf Mon Sep 17 00:00:00 2001 From: scv119 Date: Wed, 31 May 2023 00:53:58 -0700 Subject: [PATCH 5/5] update --- python/ray/autoscaler/v2/BUILD | 8 ++++++++ .../{test_ray_install.py => test_ray_installer.py} | 13 +++++++++++++ 2 files changed, 21 insertions(+) rename python/ray/autoscaler/v2/tests/{test_ray_install.py => test_ray_installer.py} (76%) diff --git a/python/ray/autoscaler/v2/BUILD b/python/ray/autoscaler/v2/BUILD index bc4657c3e37ff..37e31ce870506 100644 --- a/python/ray/autoscaler/v2/BUILD +++ b/python/ray/autoscaler/v2/BUILD @@ -38,3 +38,11 @@ py_test( tags = ["team:core"], deps = ["//:ray_lib",], ) + +py_test( + name = "test_ray_installer", + size = "small", + srcs = ["tests/test_ray_installer.py"], + tags = ["team:core"], + deps = ["//:ray_lib",], +) diff --git a/python/ray/autoscaler/v2/tests/test_ray_install.py b/python/ray/autoscaler/v2/tests/test_ray_installer.py similarity index 76% rename from python/ray/autoscaler/v2/tests/test_ray_install.py rename to python/ray/autoscaler/v2/tests/test_ray_installer.py index ec48949eff10d..586a8e30d50c3 100644 --- a/python/ray/autoscaler/v2/tests/test_ray_install.py +++ b/python/ray/autoscaler/v2/tests/test_ray_installer.py @@ -36,6 +36,7 @@ def test_install_succeeded(self): ) def test_install_failed(self): + # creation failed because no such node. assert not self.ray_installer.install_ray( Instance( instance_id="0", instance_type="worker_nodes1", cloud_instance_id="0" @@ -43,6 +44,18 @@ def test_install_failed(self): head_node_ip="1.2.3.4", ) + self.base_provider.create_node({}, {TAG_RAY_NODE_KIND: "worker_nodes1"}, 1) + self.runner.fail_cmds = ["setup_cmd"] + self.runner.respond_to_call("json .Config.Env", ["[]" for i in range(1)]) + + # creation failed because setup command failed. + assert self.ray_installer.install_ray( + Instance( + instance_id="0", instance_type="worker_nodes1", cloud_instance_id="0" + ), + head_node_ip="1.2.3.4", + ) + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"):