diff --git a/python/ray/autoscaler/v2/BUILD b/python/ray/autoscaler/v2/BUILD index 654cae900c67a..20ff9e682fab1 100644 --- a/python/ray/autoscaler/v2/BUILD +++ b/python/ray/autoscaler/v2/BUILD @@ -54,6 +54,29 @@ py_test( deps = ["//:ray_lib",], ) +py_test( + name = "test_instance_launcher", + size = "small", + srcs = ["tests/test_instance_launcher.py"], + tags = ["team:core"], + deps = ["//:ray_lib",], +) + +py_test( + name = "test_reconciler", + size = "small", + srcs = ["tests/test_reconciler.py"], + tags = ["team:core"], + deps = ["//:ray_lib",], +) + +py_test( + name = "test_threaded_ray_installer", + size = "small", + srcs = ["tests/test_threaded_ray_installer.py"], + tags = ["team:core"], + deps = ["//:ray_lib",], +) py_test( name = "test_sdk", diff --git a/python/ray/autoscaler/v2/instance_manager/__init__.py b/python/ray/autoscaler/v2/instance_manager/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python/ray/autoscaler/v2/instance_manager/instance_storage.py b/python/ray/autoscaler/v2/instance_manager/instance_storage.py index 990cb1f00733d..3036c5010e911 100644 --- a/python/ray/autoscaler/v2/instance_manager/instance_storage.py +++ b/python/ray/autoscaler/v2/instance_manager/instance_storage.py @@ -1,4 +1,6 @@ +import copy import logging +import time from abc import ABCMeta, abstractmethod from dataclasses import dataclass from typing import Dict, List, Optional, Set, Tuple @@ -15,6 +17,7 @@ class InstanceUpdateEvent: instance_id: str new_status: int + new_ray_status: int = Instance.RAY_STATUS_UNKOWN class InstanceUpdatedSuscriber(metaclass=ABCMeta): @@ -39,7 +42,12 @@ def __init__( self._storage = storage self._cluster_id = cluster_id self._table_name = f"instance_table@{cluster_id}" - self._status_change_subscriber = status_change_subscriber + self._status_change_subscribers = [] + if status_change_subscriber: + self._status_change_subscribers.append(status_change_subscriber) + + def add_status_change_subscriber(self, subscriber: InstanceUpdatedSuscriber): + self._status_change_subscribers.append(subscriber) def batch_upsert_instances( self, @@ -68,25 +76,29 @@ def batch_upsert_instances( return StoreStatus(False, version) for instance in updates: + instance = copy.deepcopy(instance) # the instance version is set to 0, it will be # populated by the storage entry's verion on read instance.version = 0 + instance.timestamp_since_last_modified = int(time.time()) mutations[instance.instance_id] = instance.SerializeToString() result, version = self._storage.batch_update( self._table_name, mutations, {}, expected_storage_version ) - if result and self._status_change_subscriber: - self._status_change_subscriber.notify( - [ - InstanceUpdateEvent( - instance_id=instance.instance_id, - new_status=instance.status, - ) - for instance in updates - ], - ) + if result: + for subscriber in self._status_change_subscribers: + subscriber.notify( + [ + InstanceUpdateEvent( + instance_id=instance.instance_id, + new_status=instance.status, + new_ray_status=instance.ray_status, + ) + for instance in updates + ], + ) return StoreStatus(result, version) @@ -114,9 +126,11 @@ def upsert_instance( Returns: StoreStatus: A tuple of (success, storage_version). """ + instance = copy.deepcopy(instance) # the instance version is set to 0, it will be # populated by the storage entry's verion on read instance.version = 0 + instance.timestamp_since_last_modified = int(time.time()) result, version = self._storage.update( self._table_name, key=instance.instance_id, @@ -126,26 +140,34 @@ def upsert_instance( insert_only=False, ) - if result and self._status_change_subscriber: - self._status_change_subscriber.notify( - [ - InstanceUpdateEvent( - instance_id=instance.instance_id, - new_status=instance.status, - ) - ], - ) + if result: + for subscriber in self._status_change_subscribers: + subscriber.notify( + [ + InstanceUpdateEvent( + instance_id=instance.instance_id, + new_status=instance.status, + new_ray_status=instance.ray_status, + ) + ], + ) return StoreStatus(result, version) def get_instances( - self, instance_ids: List[str] = None, status_filter: Set[int] = None + self, + instance_ids: List[str] = None, + status_filter: Set[int] = None, + ray_status_filter: Set[int] = None, ) -> Tuple[Dict[str, Instance], int]: """Get instances from the storage. Args: instance_ids: A list of instance ids to be retrieved. If empty, all instances will be retrieved. + status_filter: Only instances with the specified status will be returned. + ray_status_filter: Only instances with the specified ray status will + be returned. Returns: Tuple[Dict[str, Instance], int]: A tuple of (instances, version). @@ -161,6 +183,8 @@ def get_instances( instance.version = entry_version if status_filter and instance.status not in status_filter: continue + if ray_status_filter and instance.ray_status not in ray_status_filter: + continue instances[instance_id] = instance return instances, version @@ -186,14 +210,16 @@ def batch_delete_instances( self._table_name, {}, instance_ids, expected_storage_version ) - if result[0] and self._status_change_subscriber: - self._status_change_subscriber.notify( - [ - InstanceUpdateEvent( - instance_id=instance_id, - new_status=Instance.GARAGE_COLLECTED, - ) - for instance_id in instance_ids - ], - ) + if result[0]: + for subscriber in self._status_change_subscribers: + subscriber.notify( + [ + InstanceUpdateEvent( + instance_id=instance_id, + new_status=Instance.GARBAGE_COLLECTED, + new_ray_status=Instance.RAY_STATUS_UNKOWN, + ) + for instance_id in instance_ids + ], + ) return result diff --git a/python/ray/autoscaler/v2/instance_manager/node_provider.py b/python/ray/autoscaler/v2/instance_manager/node_provider.py index 6eede42ba2ee1..dc4c7319c29a4 100644 --- a/python/ray/autoscaler/v2/instance_manager/node_provider.py +++ b/python/ray/autoscaler/v2/instance_manager/node_provider.py @@ -118,11 +118,11 @@ def _get_instance(self, cloud_instance_id: str) -> Instance: instance = Instance() instance.cloud_instance_id = cloud_instance_id if self._provider.is_running(cloud_instance_id): - instance.status = Instance.STARTING + instance.status = Instance.ALLOCATED elif self._provider.is_terminated(cloud_instance_id): instance.status = Instance.STOPPED else: - instance.status = Instance.INSTANCE_STATUS_UNSPECIFIED + instance.status = Instance.UNKNOWN instance.internal_ip = self._provider.internal_ip(cloud_instance_id) instance.external_ip = self._provider.external_ip(cloud_instance_id) instance.instance_type = self._provider.node_tags(cloud_instance_id)[ diff --git a/python/ray/autoscaler/v2/instance_manager/subscribers/__init__.py b/python/ray/autoscaler/v2/instance_manager/subscribers/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python/ray/autoscaler/v2/instance_manager/subscribers/instance_launcher.py b/python/ray/autoscaler/v2/instance_manager/subscribers/instance_launcher.py new file mode 100644 index 0000000000000..1ec5b1e63c10b --- /dev/null +++ b/python/ray/autoscaler/v2/instance_manager/subscribers/instance_launcher.py @@ -0,0 +1,162 @@ +import logging +import math +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor +from typing import List + +from ray.autoscaler._private.constants import ( + AUTOSCALER_MAX_CONCURRENT_LAUNCHES, + AUTOSCALER_MAX_LAUNCH_BATCH, +) +from ray.autoscaler.v2.instance_manager.instance_storage import ( + InstanceStorage, + InstanceUpdatedSuscriber, + InstanceUpdateEvent, +) +from ray.autoscaler.v2.instance_manager.node_provider import NodeProvider +from ray.core.generated.instance_manager_pb2 import Instance + +logger = logging.getLogger(__name__) + + +class InstanceLauncher(InstanceUpdatedSuscriber): + """InstanceLauncher is responsible for provisioning new instances.""" + + def __init__( + self, + instance_storage: InstanceStorage, + node_provider: NodeProvider, + max_concurrent_requests: int = math.ceil( + AUTOSCALER_MAX_CONCURRENT_LAUNCHES / float(AUTOSCALER_MAX_LAUNCH_BATCH) + ), + max_instances_per_request: int = AUTOSCALER_MAX_LAUNCH_BATCH, + ) -> None: + self._instance_storage = instance_storage + self._node_provider = node_provider + self._max_concurrent_requests = max_concurrent_requests + self._max_instances_per_request = max_instances_per_request + self._executor = ThreadPoolExecutor(max_workers=1) + self._launch_instance_executor = ThreadPoolExecutor( + max_workers=self._max_concurrent_requests + ) + + def notify(self, events: List[InstanceUpdateEvent]) -> None: + # TODO: we should do reconciliation based on events. + has_new_request = any( + [event.new_status == Instance.UNKNOWN for event in events] + ) + if has_new_request: + self._executor.submit(self._may_launch_new_instances) + + def _may_launch_new_instances(self): + new_instances, _ = self._instance_storage.get_instances( + status_filter={Instance.UNKNOWN} + ) + + if not new_instances: + logger.debug("No instances to launch") + return + + queued_instances = [] + for instance in new_instances.values(): + instance.status = Instance.QUEUED + success, version = self._instance_storage.upsert_instance( + instance, expected_instance_version=instance.version + ) + if success: + instance.version = version + queued_instances.append(instance) + else: + logger.error(f"Failed to update {instance} QUEUED") + + instances_by_type = defaultdict(list) + for instance in queued_instances: + instances_by_type[instance.instance_type].append(instance) + + for instance_type, instances in instances_by_type.items(): + for i in range(0, len(instances), self._max_instances_per_request): + self._launch_instance_executor.submit( + self._launch_new_instances_by_type, + instance_type, + instances[ + i : min( + i + self._max_instances_per_request, + len(instances), + ) + ], + ) + + def _launch_new_instances_by_type( + self, instance_type: str, instances: List[Instance] + ) -> int: + """Launches instances of the given type. + + Args: + instance_type: type of instance to launch. + instances: list of instances to launch. These instances should + have been marked as QUEUED with instance_type set. + Returns: + num of instances launched. + """ + logger.info(f"Launching {len(instances)} instances of type {instance_type}") + instances_selected = [] + for instance in instances: + instance.status = Instance.REQUESTED + result, version = self._instance_storage.upsert_instance( + instance, expected_instance_version=instance.version + ) + if not result: + logger.warn(f"Failed to update instance {instance}") + continue + instance.version = version + instances_selected.append(instance) + + if not instances_selected: + return 0 + + # TODO: idempotency token. + created_cloud_instances = self._node_provider.create_nodes( + instance_type, len(instances_selected) + ) + + assert len(created_cloud_instances) <= len(instances_selected) + + instances_launched = 0 + while created_cloud_instances and instances_selected: + cloud_instance = created_cloud_instances.pop() + instance = instances_selected.pop() + instance.cloud_instance_id = cloud_instance.cloud_instance_id + instance.internal_ip = cloud_instance.internal_ip + instance.external_ip = cloud_instance.external_ip + instance.status = Instance.ALLOCATED + instance.ray_status = Instance.RAY_STATUS_UNKOWN + + # update instance status into the storage + result, _ = self._instance_storage.upsert_instance( + instance, expected_instance_version=instance.version + ) + + if not result: + # TODO: this could only happen when the request is canceled. + logger.warn(f"Failed to update instance {instance}") + # push the cloud instance back + created_cloud_instances.append(cloud_instance) + continue + + instances_launched += 1 + + if created_cloud_instances: + # instances are leaked, we probably need to terminate them + for instance in created_cloud_instances: + self._node_provider.terminate_node(instance.cloud_instance_id) + + if instances_selected: + # instances creation failed, we need to marke them allocation failed. + for instance in instances_selected: + instance.status = Instance.ALLOCATION_FAILED + # TODO: add more information about the failure. + result, _ = self._instance_storage.upsert_instance( + instance, expected_instance_version=instance.version + ) + # TODO: this could only happen when the request is canceled. + return instances_launched diff --git a/python/ray/autoscaler/v2/instance_manager/subscribers/reconciler.py b/python/ray/autoscaler/v2/instance_manager/subscribers/reconciler.py new file mode 100644 index 0000000000000..01080daaedff9 --- /dev/null +++ b/python/ray/autoscaler/v2/instance_manager/subscribers/reconciler.py @@ -0,0 +1,109 @@ +import logging +import threading +from concurrent.futures import ThreadPoolExecutor +from typing import List + +from ray.autoscaler.v2.instance_manager.instance_storage import ( + InstanceStorage, + InstanceUpdatedSuscriber, + InstanceUpdateEvent, +) +from ray.autoscaler.v2.instance_manager.node_provider import NodeProvider +from ray.core.generated.instance_manager_pb2 import Instance + +logger = logging.getLogger(__name__) + + +class InstanceReconciler(InstanceUpdatedSuscriber): + """InstanceReconciler is responsible for reconciling the difference between + node provider and instance storage. It is also responsible for handling + failures. + """ + + def __init__( + self, + instance_storage: InstanceStorage, + node_provider: NodeProvider, + reconcile_interval_s: int = 120, + ) -> None: + self._instance_storage = instance_storage + self._node_provider = node_provider + self._failure_handling_executor = ThreadPoolExecutor(max_workers=1) + self._reconcile_interval_s = reconcile_interval_s + self._reconcile_timer_lock = threading.Lock() + with self._reconcile_timer_lock: + self._reconcile_timer = threading.Timer( + self._reconcile_interval_s, self._periodic_reconcile_helper + ) + + def shutdown(self): + with self._reconcile_timer_lock: + self._reconcile_timer.cancel() + + def notify(self, events: List[InstanceUpdateEvent]) -> None: + instance_ids = [ + event.instance_id + for event in events + if event.new_status in {Instance.ALLOCATED} + and event.new_ray_status + in {Instance.RAY_STOPPED, Instance.RAY_INSTALL_FAILED} + ] + if instance_ids: + self._failure_handling_executor.submit( + self._handle_ray_failure, instance_ids + ) + + def _handle_ray_failure(self, instance_ids: List[str]) -> int: + failing_instances, _ = self._instance_storage.get_instances( + instance_ids=instance_ids, + status_filter={Instance.ALLOCATED}, + ray_status_filter={Instance.RAY_STOPPED, Instance.RAY_INSTALL_FAILED}, + ) + if not failing_instances: + logger.debug("No ray failure") + return + + failing_instances = failing_instances.values() + for instance in failing_instances: + # this call is asynchronous. + self._node_provider.terminate_node(instance.cloud_instance_id) + + instance.status = Instance.STOPPING + result, _ = self._instance_storage.upsert_instance( + instance, expected_instance_version=instance.version + ) + if not result: + logger.warning("Failed to update instance status to STOPPING") + + def _periodic_reconcile_helper(self) -> None: + try: + self._reconcile_with_node_provider() + except Exception: + logger.exception("Failed to reconcile with node provider") + with self._reconcile_timer_lock: + self._reconcile_timer = threading.Timer( + self._reconcile_interval_s, self._periodic_reconcile_helper + ) + + def _reconcile_with_node_provider(self) -> None: + # reconcile storage state with cloud state. + none_terminated_cloud_instances = self._node_provider.get_non_terminated_nodes() + + # 1. if the storage instance is in STOPPING state and the no + # cloud instance is found, change the instance state to TERMINATED. + stopping_instances, _ = self._instance_storage.get_instances( + status_filter={Instance.STOPPING} + ) + + for instance in stopping_instances.values(): + if none_terminated_cloud_instances.get(instance.cloud_instance_id) is None: + instance.status = Instance.STOPPED + result, _ = self._instance_storage.upsert_instance( + instance, expected_instance_version=instance.version + ) + if not result: + logger.warning("Failed to update instance status to STOPPED") + + # 2. TODO: if the cloud instance has no storage instance can be found, + # it means the instance is likely leaked, terminate the instance. + # 3. TODO: we should also GC nodes have been stuck in installing state. diff --git a/python/ray/autoscaler/v2/instance_manager/subscribers/threaded_ray_installer.py b/python/ray/autoscaler/v2/instance_manager/subscribers/threaded_ray_installer.py new file mode 100644 index 0000000000000..0ac2b379ee23c --- /dev/null +++ b/python/ray/autoscaler/v2/instance_manager/subscribers/threaded_ray_installer.py @@ -0,0 +1,102 @@ +import logging +import time +from concurrent.futures import ThreadPoolExecutor +from typing import List + +from ray.autoscaler.v2.instance_manager.instance_storage import ( + InstanceStorage, + InstanceUpdatedSuscriber, + InstanceUpdateEvent, +) +from ray.autoscaler.v2.instance_manager.ray_installer import RayInstaller +from ray.core.generated.instance_manager_pb2 import Instance + +logger = logging.getLogger(__name__) + + +class ThreadedRayInstaller(InstanceUpdatedSuscriber): + """ThreadedRayInstaller is responsible for install ray on new nodes.""" + + def __init__( + self, + head_node_ip: str, + instance_storage: InstanceStorage, + ray_installer: RayInstaller, + max_install_attempts: int = 3, + install_retry_interval: int = 10, + max_concurrent_installs: int = 50, + ) -> None: + self._head_node_ip = head_node_ip + self._instance_storage = instance_storage + self._ray_installer = ray_installer + self._max_concurrent_installs = max_concurrent_installs + self._max_install_attempts = max_install_attempts + self._install_retry_interval = install_retry_interval + self._ray_installation_executor = ThreadPoolExecutor( + max_workers=self._max_concurrent_installs + ) + + def notify(self, events: List[InstanceUpdateEvent]) -> None: + for event in events: + if ( + event.new_status == Instance.ALLOCATED + and event.new_ray_status == Instance.RAY_STATUS_UNKOWN + ): + self._install_ray_on_new_nodes(event.instance_id) + + def _install_ray_on_new_nodes(self, instance_id: str) -> None: + allocated_instance, _ = self._instance_storage.get_instances( + instance_ids={instance_id}, + status_filter={Instance.ALLOCATED}, + ray_status_filter={Instance.RAY_STATUS_UNKOWN}, + ) + for instance in allocated_instance.values(): + self._ray_installation_executor.submit( + self._install_ray_on_single_node, instance + ) + + def _install_ray_on_single_node(self, instance: Instance) -> None: + assert instance.status == Instance.ALLOCATED + assert instance.ray_status == Instance.RAY_STATUS_UNKOWN + instance.ray_status = Instance.RAY_INSTALLING + success, version = self._instance_storage.upsert_instance( + instance, expected_instance_version=instance.version + ) + if not success: + logger.warning( + f"Failed to update instance {instance.instance_id} to RAY_INSTALLING" + ) + # Do not need to handle failures, it will be covered by + # garbage collection. + return + + # install with exponential backoff + installed = False + backoff_factor = 1 + for _ in range(self._max_install_attempts): + installed = self._ray_installer.install_ray(instance, self._head_node_ip) + if installed: + break + logger.warning("Failed to install ray, retrying...") + time.sleep(self._install_retry_interval * backoff_factor) + backoff_factor *= 2 + + if not installed: + instance.ray_status = Instance.RAY_INSTALL_FAILED + success, version = self._instance_storage.upsert_instance( + instance, + expected_instance_version=version, + ) + else: + instance.ray_status = Instance.RAY_RUNNING + success, version = self._instance_storage.upsert_instance( + instance, + expected_instance_version=version, + ) + if not success: + logger.warning( + f"Failed to update instance {instance.instance_id} to {instance.status}" + ) + # Do not need to handle failures, it will be covered by + # garbage collection. + return diff --git a/python/ray/autoscaler/v2/tests/test_instance_launcher.py b/python/ray/autoscaler/v2/tests/test_instance_launcher.py new file mode 100644 index 0000000000000..3f9b7d5ab1f77 --- /dev/null +++ b/python/ray/autoscaler/v2/tests/test_instance_launcher.py @@ -0,0 +1,110 @@ +# coding: utf-8 +import os +import sys +import unittest + +import pytest # noqa + +from ray._private.test_utils import load_test_config +from ray.autoscaler._private.event_summarizer import EventSummarizer +from ray.autoscaler._private.node_launcher import BaseNodeLauncher +from ray.autoscaler._private.node_provider_availability_tracker import ( + NodeProviderAvailabilityTracker, +) +from ray.autoscaler.v2.instance_manager.config import NodeProviderConfig +from ray.autoscaler.v2.instance_manager.instance_storage import InstanceStorage +from ray.autoscaler.v2.instance_manager.node_provider import NodeProviderAdapter +from ray.autoscaler.v2.instance_manager.storage import InMemoryStorage +from ray.autoscaler.v2.instance_manager.subscribers.instance_launcher import ( + InstanceLauncher, +) +from ray.autoscaler.v2.tests.util import FakeCounter, create_instance +from ray.core.generated.instance_manager_pb2 import Instance +from ray.tests.autoscaler_test_utils import MockProvider + + +class InstanceLauncherTest(unittest.TestCase): + def setUp(self): + self.base_provider = MockProvider() + self.availability_tracker = NodeProviderAvailabilityTracker() + self.node_launcher = BaseNodeLauncher( + self.base_provider, + FakeCounter(), + EventSummarizer(), + self.availability_tracker, + ) + self.instance_config_provider = NodeProviderConfig( + load_test_config("test_ray_complex.yaml") + ) + self.node_provider = NodeProviderAdapter( + self.base_provider, self.node_launcher, self.instance_config_provider + ) + self.instance_storage = InstanceStorage( + cluster_id="test_cluster_id", + storage=InMemoryStorage(), + ) + self.instance_launcher = InstanceLauncher( + instance_storage=self.instance_storage, + node_provider=self.node_provider, + max_concurrent_requests=1, + max_instances_per_request=1, + ) + self.instance_storage.add_status_change_subscriber(self.instance_launcher) + + def test_launch_new_instance_by_type(self): + instance = create_instance("1") + success, verison = self.instance_storage.upsert_instance(instance) + assert success + instance.version = verison + assert 1 == self.instance_launcher._launch_new_instances_by_type( + "worker_nodes1", [instance] + ) + instances, _ = self.instance_storage.get_instances() + assert len(instances) == 1 + assert instances["1"].status == Instance.ALLOCATED + assert instances["1"].cloud_instance_id == "0" + + def test_launch_failed(self): + # launch failed: instance is not in storage + instance = create_instance("1") + assert 0 == self.instance_launcher._launch_new_instances_by_type( + "worker_nodes1", [instance] + ) + instances, _ = self.instance_storage.get_instances() + assert len(instances) == 0 + + # launch failed: instance version mismatch + instance = create_instance("1") + self.instance_storage.upsert_instance(instance) + instance.version = 2 + assert 0 == self.instance_launcher._launch_new_instances_by_type( + "worker_nodes1", [instance] + ) + instances, _ = self.instance_storage.get_instances() + assert len(instances) == 1 + assert instances["1"].status == Instance.UNKNOWN + + def test_launch_partial_success(self): + self.base_provider.partical_success_count = 1 + instance1 = create_instance("1") + instance2 = create_instance("2") + success, version = self.instance_storage.batch_upsert_instances( + [instance1, instance2] + ) + assert success + instance1.version = version + instance2.version = version + self.instance_launcher._launch_new_instances_by_type( + "worker_nodes1", [instance1, instance2] + ) + instances, _ = self.instance_storage.get_instances() + assert len(instances) == 2 + assert instances["1"].status == Instance.ALLOCATION_FAILED + assert instances["2"].status == Instance.ALLOCATED + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/autoscaler/v2/tests/test_instance_storage.py b/python/ray/autoscaler/v2/tests/test_instance_storage.py index 881520ea6cf44..3e06b3669639e 100644 --- a/python/ray/autoscaler/v2/tests/test_instance_storage.py +++ b/python/ray/autoscaler/v2/tests/test_instance_storage.py @@ -2,6 +2,7 @@ import copy import os import sys +from unittest import mock import pytest # noqa @@ -22,12 +23,16 @@ def notify(self, events): self.events.extend(events) -def create_instance( - instance_id, status=Instance.INSTANCE_STATUS_UNSPECIFIED, version=0 -): - return Instance(instance_id=instance_id, status=status, version=version) +def create_instance(instance_id, status=Instance.UNKNOWN, version=0): + return Instance( + instance_id=instance_id, + status=status, + version=version, + timestamp_since_last_modified=1, + ) +@mock.patch("time.time", mock.MagicMock(return_value=1)) def test_upsert(): subscriber = DummySubscriber() @@ -46,8 +51,8 @@ def test_upsert(): ) assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), ] instance1.version = 1 @@ -66,11 +71,11 @@ def test_upsert(): ) assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), ] - instance2.status = Instance.IDLE + instance2.status = Instance.ALLOCATED assert (True, 2) == storage.batch_upsert_instances( [instance3, instance2], expected_storage_version=1, @@ -89,13 +94,14 @@ def test_upsert(): } assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance3", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.IDLE), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), + InstanceUpdateEvent("instance3", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.ALLOCATED), ] +@mock.patch("time.time", mock.MagicMock(return_value=1)) def test_update(): subscriber = DummySubscriber() @@ -109,13 +115,13 @@ def test_update(): assert (True, 1) == storage.upsert_instance(instance=instance1) assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), ] assert (True, 2) == storage.upsert_instance(instance=instance2) assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), ] assert ( @@ -139,8 +145,8 @@ def test_update(): ) assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), ] assert (True, 3) == storage.upsert_instance( @@ -157,9 +163,9 @@ def test_update(): ) == storage.get_instances() assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), ] assert (True, 4) == storage.upsert_instance( @@ -176,13 +182,14 @@ def test_update(): ) == storage.get_instances() assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), ] +@mock.patch("time.time", mock.MagicMock(return_value=1)) def test_delete(): subscriber = DummySubscriber() @@ -206,10 +213,10 @@ def test_delete(): assert (True, 2) == storage.batch_delete_instances(instance_ids=["instance1"]) assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance3", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance1", Instance.GARAGE_COLLECTED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), + InstanceUpdateEvent("instance3", Instance.UNKNOWN), + InstanceUpdateEvent("instance1", Instance.GARBAGE_COLLECTED), ] assert ( @@ -232,22 +239,23 @@ def test_delete(): ) == storage.get_instances() assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance3", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance1", Instance.GARAGE_COLLECTED), - InstanceUpdateEvent("instance2", Instance.GARAGE_COLLECTED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), + InstanceUpdateEvent("instance3", Instance.UNKNOWN), + InstanceUpdateEvent("instance1", Instance.GARBAGE_COLLECTED), + InstanceUpdateEvent("instance2", Instance.GARBAGE_COLLECTED), ] +@mock.patch("time.time", mock.MagicMock(return_value=1)) def test_get_instances(): storage = InstanceStorage( cluster_id="test_cluster", storage=InMemoryStorage(), ) instance1 = create_instance("instance1", version=1) - instance2 = create_instance("instance2", status=Instance.RUNNING, version=1) - instance3 = create_instance("instance3", status=Instance.IDLE, version=1) + instance2 = create_instance("instance2", status=Instance.ALLOCATED, version=1) + instance3 = create_instance("instance3", status=Instance.STOPPING, version=1) assert (True, 1) == storage.batch_upsert_instances( [copy.deepcopy(instance1), copy.deepcopy(instance2), copy.deepcopy(instance3)], @@ -272,7 +280,7 @@ def test_get_instances(): ) == storage.get_instances(instance_ids=["instance1", "instance2"]) assert ({"instance2": instance2}, 1) == storage.get_instances( - instance_ids=["instance1", "instance2"], status_filter={Instance.RUNNING} + instance_ids=["instance1", "instance2"], status_filter={Instance.ALLOCATED} ) assert ( @@ -280,7 +288,7 @@ def test_get_instances(): "instance2": instance2, }, 1, - ) == storage.get_instances(status_filter={Instance.RUNNING}) + ) == storage.get_instances(status_filter={Instance.ALLOCATED}) if __name__ == "__main__": diff --git a/python/ray/autoscaler/v2/tests/test_node_provider.py b/python/ray/autoscaler/v2/tests/test_node_provider.py index 8bdad4a6c18f0..dda209c7ba6cb 100644 --- a/python/ray/autoscaler/v2/tests/test_node_provider.py +++ b/python/ray/autoscaler/v2/tests/test_node_provider.py @@ -14,15 +14,11 @@ from ray.autoscaler.node_launch_exception import NodeLaunchException from ray.autoscaler.v2.instance_manager.config import NodeProviderConfig from ray.autoscaler.v2.instance_manager.node_provider import NodeProviderAdapter +from ray.autoscaler.v2.tests.util import FakeCounter from ray.core.generated.instance_manager_pb2 import Instance from ray.tests.autoscaler_test_utils import MockProvider -class FakeCounter: - def dec(self, *args, **kwargs): - pass - - class NodeProviderTest(unittest.TestCase): def setUp(self): self.base_provider = MockProvider() @@ -48,7 +44,7 @@ def test_node_providers_pass_through(self): cloud_instance_id="0", internal_ip="172.0.0.0", external_ip="1.2.3.4", - status=Instance.INSTANCE_STATUS_UNSPECIFIED, + status=Instance.UNKNOWN, ) self.assertEqual(len(self.base_provider.mock_nodes), 1) self.assertEqual(self.node_provider.get_non_terminated_nodes(), {"0": nodes[0]}) @@ -59,14 +55,14 @@ def test_node_providers_pass_through(self): cloud_instance_id="1", internal_ip="172.0.0.1", external_ip="1.2.3.4", - status=Instance.INSTANCE_STATUS_UNSPECIFIED, + status=Instance.UNKNOWN, ) assert nodes1[1] == Instance( instance_type="worker_nodes", cloud_instance_id="2", internal_ip="172.0.0.2", external_ip="1.2.3.4", - status=Instance.INSTANCE_STATUS_UNSPECIFIED, + status=Instance.UNKNOWN, ) self.assertEqual( self.node_provider.get_non_terminated_nodes(), diff --git a/python/ray/autoscaler/v2/tests/test_reconciler.py b/python/ray/autoscaler/v2/tests/test_reconciler.py new file mode 100644 index 0000000000000..62e46653e19b9 --- /dev/null +++ b/python/ray/autoscaler/v2/tests/test_reconciler.py @@ -0,0 +1,86 @@ +# coding: utf-8 +import os +import sys +import unittest + +import pytest # noqa + +from ray._private.test_utils import load_test_config +from ray.autoscaler._private.event_summarizer import EventSummarizer +from ray.autoscaler._private.node_launcher import BaseNodeLauncher +from ray.autoscaler._private.node_provider_availability_tracker import ( + NodeProviderAvailabilityTracker, +) +from ray.autoscaler.v2.instance_manager.config import NodeProviderConfig +from ray.autoscaler.v2.instance_manager.instance_storage import InstanceStorage +from ray.autoscaler.v2.instance_manager.node_provider import NodeProviderAdapter +from ray.autoscaler.v2.instance_manager.storage import InMemoryStorage +from ray.autoscaler.v2.instance_manager.subscribers.reconciler import InstanceReconciler +from ray.autoscaler.v2.tests.util import FakeCounter +from ray.core.generated.instance_manager_pb2 import Instance +from ray.tests.autoscaler_test_utils import MockProvider + + +class InstanceReconcilerTest(unittest.TestCase): + def setUp(self): + self.base_provider = MockProvider() + self.availability_tracker = NodeProviderAvailabilityTracker() + self.node_launcher = BaseNodeLauncher( + self.base_provider, + FakeCounter(), + EventSummarizer(), + self.availability_tracker, + ) + self.instance_config_provider = NodeProviderConfig( + load_test_config("test_ray_complex.yaml") + ) + self.node_provider = NodeProviderAdapter( + self.base_provider, self.node_launcher, self.instance_config_provider + ) + + self.instance_storage = InstanceStorage( + cluster_id="test_cluster_id", + storage=InMemoryStorage(), + ) + self.reconciler = InstanceReconciler( + instance_storage=self.instance_storage, + node_provider=self.node_provider, + ) + + def tearDown(self): + self.reconciler.shutdown() + + def test_handle_ray_failure(self): + self.node_provider.create_nodes("worker_nodes1", 1) + instance = Instance( + instance_id="0", + instance_type="worker_nodes1", + cloud_instance_id="0", + status=Instance.ALLOCATED, + ray_status=Instance.RAY_STOPPED, + ) + assert not self.base_provider.is_terminated(instance.cloud_instance_id) + success, verison = self.instance_storage.upsert_instance(instance) + assert success + instance.version = verison + self.reconciler._handle_ray_failure([instance.instance_id]) + + instances, _ = self.instance_storage.get_instances( + instance_ids={instance.instance_id} + ) + assert instances[instance.instance_id].status == Instance.STOPPING + assert self.base_provider.is_terminated(instance.cloud_instance_id) + + # reconciler will detect the node is terminated and update the status. + self.reconciler._reconcile_with_node_provider() + instances, _ = self.instance_storage.get_instances( + instance_ids={instance.instance_id} + ) + assert instances[instance.instance_id].status == Instance.STOPPED + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/autoscaler/v2/tests/test_threaded_ray_installer.py b/python/ray/autoscaler/v2/tests/test_threaded_ray_installer.py new file mode 100644 index 0000000000000..323e99c9f975d --- /dev/null +++ b/python/ray/autoscaler/v2/tests/test_threaded_ray_installer.py @@ -0,0 +1,109 @@ +# coding: utf-8 +import os +import sys +import unittest +from unittest.mock import patch + +import pytest # noqa + +from ray._private.test_utils import load_test_config +from ray.autoscaler.tags import TAG_RAY_NODE_KIND +from ray.autoscaler.v2.instance_manager.config import NodeProviderConfig +from ray.autoscaler.v2.instance_manager.instance_storage import InstanceStorage +from ray.autoscaler.v2.instance_manager.ray_installer import RayInstaller +from ray.autoscaler.v2.instance_manager.storage import InMemoryStorage +from ray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installer import ( + ThreadedRayInstaller, +) +from ray.core.generated.instance_manager_pb2 import Instance +from ray.tests.autoscaler_test_utils import MockProcessRunner, MockProvider + + +class ThreadedRayInstallerTest(unittest.TestCase): + def setUp(self): + self.base_provider = MockProvider() + self.instance_config_provider = NodeProviderConfig( + load_test_config("test_ray_complex.yaml") + ) + self.runner = MockProcessRunner() + self.ray_installer = RayInstaller( + self.base_provider, self.instance_config_provider, self.runner + ) + self.instance_storage = InstanceStorage( + cluster_id="test_cluster_id", + storage=InMemoryStorage(), + ) + self.threaded_ray_installer = ThreadedRayInstaller( + head_node_ip="127.0.0.1", + instance_storage=self.instance_storage, + ray_installer=self.ray_installer, + ) + + def test_install_ray_on_new_node_version_mismatch(self): + self.base_provider.create_node({}, {TAG_RAY_NODE_KIND: "worker_nodes1"}, 1) + instance = Instance( + instance_id="0", + instance_type="worker_nodes1", + cloud_instance_id="0", + status=Instance.ALLOCATED, + ) + success, verison = self.instance_storage.upsert_instance(instance) + assert success + self.runner.respond_to_call("json .Config.Env", ["[]" for i in range(1)]) + + self.threaded_ray_installer._install_ray_on_single_node(instance) + instances, _ = self.instance_storage.get_instances( + instance_ids={instance.instance_id} + ) + assert instances[instance.instance_id].status == Instance.ALLOCATED + assert instances[instance.instance_id].version == verison + + @patch.object(RayInstaller, "install_ray") + def test_install_ray_on_new_node_install_failed(self, mock_method): + self.base_provider.create_node({}, {TAG_RAY_NODE_KIND: "worker_nodes1"}, 1) + instance = Instance( + instance_id="0", + instance_type="worker_nodes1", + cloud_instance_id="0", + status=Instance.ALLOCATED, + ) + success, verison = self.instance_storage.upsert_instance(instance) + assert success + instance.version = verison + + mock_method.return_value = False + self.threaded_ray_installer._install_retry_interval = 0 + self.threaded_ray_installer._max_install_attempts = 1 + self.threaded_ray_installer._install_ray_on_single_node(instance) + + instances, _ = self.instance_storage.get_instances( + instance_ids={instance.instance_id} + ) + assert instances[instance.instance_id].ray_status == Instance.RAY_INSTALL_FAILED + + def test_install_ray_on_new_nodes(self): + self.base_provider.create_node({}, {TAG_RAY_NODE_KIND: "worker_nodes1"}, 1) + instance = Instance( + instance_id="0", + instance_type="worker_nodes1", + cloud_instance_id="0", + status=Instance.ALLOCATED, + ) + success, verison = self.instance_storage.upsert_instance(instance) + assert success + instance.version = verison + self.runner.respond_to_call("json .Config.Env", ["[]" for i in range(1)]) + + self.threaded_ray_installer._install_ray_on_new_nodes(instance.instance_id) + self.threaded_ray_installer._ray_installation_executor.shutdown(wait=True) + instances, _ = self.instance_storage.get_instances( + instance_ids={instance.instance_id} + ) + assert instances[instance.instance_id].ray_status == Instance.RAY_RUNNING + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/autoscaler/v2/tests/util.py b/python/ray/autoscaler/v2/tests/util.py index 7b55790a40d0d..16bf4afd963ba 100644 --- a/python/ray/autoscaler/v2/tests/util.py +++ b/python/ray/autoscaler/v2/tests/util.py @@ -2,6 +2,7 @@ from ray.autoscaler.v2.schema import ResourceUsage from ray.core.generated import autoscaler_pb2 +from ray.core.generated.instance_manager_pb2 import Instance def get_cluster_resource_state(stub) -> autoscaler_pb2.ClusterResourceState: @@ -11,6 +12,28 @@ def get_cluster_resource_state(stub) -> autoscaler_pb2.ClusterResourceState: return stub.GetClusterResourceState(request).cluster_resource_state +class FakeCounter: + def dec(self, *args, **kwargs): + pass + + +def create_instance( + instance_id, + status=Instance.UNKNOWN, + version=0, + ray_status=Instance.RAY_STATUS_UNKOWN, + instance_type="worker_nodes1", +): + return Instance( + instance_id=instance_id, + status=status, + version=version, + instance_type=instance_type, + ray_status=ray_status, + timestamp_since_last_modified=1, + ) + + def report_autoscaling_state(stub, autoscaling_state: autoscaler_pb2.AutoscalingState): request = autoscaler_pb2.ReportAutoscalingStateRequest( autoscaling_state=autoscaling_state diff --git a/python/ray/tests/autoscaler_test_utils.py b/python/ray/tests/autoscaler_test_utils.py index c9af7e7582a11..8db4894c4577f 100644 --- a/python/ray/tests/autoscaler_test_utils.py +++ b/python/ray/tests/autoscaler_test_utils.py @@ -159,6 +159,7 @@ def __init__(self, cache_stopped=False, unique_ips=False): self.unique_ips = unique_ips self.fail_to_fetch_ip = False self.safe_to_scale_flag = True + self.partical_success_count = None # Many of these functions are called by node_launcher or updater in # different threads. This can be treated as a global lock for # everything. @@ -233,6 +234,8 @@ def create_node(self, node_config, tags, count, _skip_wait=False): return created_nodes = {} + if self.partical_success_count is not None: + count = min(count, self.partical_success_count) with self.lock: if self.cache_stopped: for node in self.mock_nodes.values(): diff --git a/src/ray/protobuf/experimental/instance_manager.proto b/src/ray/protobuf/experimental/instance_manager.proto index 3d7b4aaa5d778..9f07d2f189839 100644 --- a/src/ray/protobuf/experimental/instance_manager.proto +++ b/src/ray/protobuf/experimental/instance_manager.proto @@ -51,45 +51,37 @@ message GetAvailableInstanceTypesResponse { // period (by default 30 minutes). message Instance { enum InstanceStatus { - // The unspecified state - most likey it is queued. - INSTANCE_STATUS_UNSPECIFIED = 0; - // Instance is starting. The first state update received from the - // instance. - STARTING = 1; - // The instance is running - one of two states of a healthy instance. - RUNNING = 2; - // The instance is idle - one of two states of a healthy instance. - IDLE = 3; - // The instance is stopping - usually follows from the RUNNING, IDLE, - // PREEMPT_REQUEST or DRAIN_REQUEST state. - STOPPING = 4; + // The unspecified state + UNKNOWN = 0; + // The instance is queued to be allocated. + QEUEUD = 1; + // The instance is requested to be allocated - follows from the QUEUED state. + REQUESTED = 2; + // The instance is allocated - follows from the REQUESTED state. + ALLOCATED = 3; + // The instance allocation failed - follows from the REQUESTED state. + ALLOCATION_FAILED = 4; + // The instance is stopping - follows from the ALLOCATED state. + STOPPING = 5; // The instance is stopped - follows from the STOPPING state. - STOPPED = 5; - // The instance is in a bad state - but it is still able to send updates. - FAILING = 6; - // The subscribe service moves instances to this state if they - // have been idle for too long. This allows the cluster manager to - // make a final decision on whether or not to commence a drain - // sequence for this instance. - DRAIN_CONFIRMATION_PENDING = 7; - // The instance should be drained, Ray should start draining process - // but could reject if failed to drain. - DRAIN_REQUEST = 8; - // The instance will be preempted by the instance manager, regardless - // of whether it is drainable or not. - PREEMPT_REQUEST = 9; - // An optional state that can be used to indicate that the instance - // is allocated from cloud provider, but ray hasn't been installed yet. - INSTANCE_ALLOCATED = 10; - // An optional state that can be used to indicate that the instance - // is currently installing Ray. - INSTALLING_RAY = 11; - // An optional state that can be used to indicate that the instance - // failed to allocate from cloud provider. - ALLOCATION_FAILED = 12; - // Node is deleted. - GARAGE_COLLECTED = 13; + STOPPED = 6; + // The instnace record is deleted from the storage - follows from the STOPPED state. + GARBAGE_COLLECTED = 7; } + + enum RayStatus { + // The unspecified state + RAY_STATUS_UNKOWN = 0; + // The instance is installing ray. + RAY_INSTALLING = 1; + // Ray installation failed - follows from the RAY_INSTALLING state. + RAY_INSTALL_FAILED = 2; + // Ray started and connected to gcs - follows from the RAY_INSTALLING state. + RAY_RUNNING = 3; + // Ray stopped - follows from the RAY_RUNNING state. + RAY_STOPPED = 4; + } + // an unique id for the instance that's generated by the // instance manager. This may be optional if // the instance hasn't be started yet. @@ -100,8 +92,8 @@ message Instance { string instance_type = 13; // The corresponding total resources on the node. map total_resources = 14; - // timestamp of the last state changed. - int64 timestamp_since_last_state_change = 15; + // timestamp of the last time the instance get modified. + int64 timestamp_since_last_modified = 15; // the external id of the instance that's generated by // the cloud provider like AWS, GCP, etc. // Note this id can be reused by different instances. @@ -112,6 +104,8 @@ message Instance { string external_ip = 18; // the monotonically increasing version number of the instance. int64 version = 19; + // the status of the ray process on the instance. + RayStatus ray_status = 20; } message UpdateInstanceManagerStateRequest {