From 006a43ea6c395d619036362a30694f0603d6cba0 Mon Sep 17 00:00:00 2001 From: Chen Shen Date: Mon, 24 Jul 2023 21:10:05 -0700 Subject: [PATCH] [autoscaler v2][5/n] introducing reconciler (#34985) Why are these changes needed? this is the stack of PRs to introduce new node_provider for autoscaler v2. Stack of PRs #34976 #34977 #34979 #34983 #34985 <- this PR Added a reconciler that reconciles the state from instance_storage to node_provider. Specifically, it subscribes to changes in the storage, and triggers following operations in sequence: launch new instances based on the queued requests install ray on newly allocated instances handle various ray failures on allocated instances (ray install failed, or ray stopped) reconcile the instance status with node provider. --- python/ray/autoscaler/v2/BUILD | 23 +++ .../v2/instance_manager/__init__.py | 0 .../v2/instance_manager/instance_storage.py | 88 ++++++---- .../v2/instance_manager/node_provider.py | 4 +- .../instance_manager/subscribers/__init__.py | 0 .../subscribers/instance_launcher.py | 162 ++++++++++++++++++ .../subscribers/reconciler.py | 109 ++++++++++++ .../subscribers/threaded_ray_installer.py | 102 +++++++++++ .../v2/tests/test_instance_launcher.py | 110 ++++++++++++ .../v2/tests/test_instance_storage.py | 84 +++++---- .../autoscaler/v2/tests/test_node_provider.py | 12 +- .../autoscaler/v2/tests/test_reconciler.py | 86 ++++++++++ .../v2/tests/test_threaded_ray_installer.py | 109 ++++++++++++ python/ray/autoscaler/v2/tests/util.py | 23 +++ python/ray/tests/autoscaler_test_utils.py | 3 + .../experimental/instance_manager.proto | 72 ++++---- 16 files changed, 869 insertions(+), 118 deletions(-) create mode 100644 python/ray/autoscaler/v2/instance_manager/__init__.py create mode 100644 python/ray/autoscaler/v2/instance_manager/subscribers/__init__.py create mode 100644 python/ray/autoscaler/v2/instance_manager/subscribers/instance_launcher.py create mode 100644 python/ray/autoscaler/v2/instance_manager/subscribers/reconciler.py create mode 100644 python/ray/autoscaler/v2/instance_manager/subscribers/threaded_ray_installer.py create mode 100644 python/ray/autoscaler/v2/tests/test_instance_launcher.py create mode 100644 python/ray/autoscaler/v2/tests/test_reconciler.py create mode 100644 python/ray/autoscaler/v2/tests/test_threaded_ray_installer.py diff --git a/python/ray/autoscaler/v2/BUILD b/python/ray/autoscaler/v2/BUILD index 654cae900c67a..20ff9e682fab1 100644 --- a/python/ray/autoscaler/v2/BUILD +++ b/python/ray/autoscaler/v2/BUILD @@ -54,6 +54,29 @@ py_test( deps = ["//:ray_lib",], ) +py_test( + name = "test_instance_launcher", + size = "small", + srcs = ["tests/test_instance_launcher.py"], + tags = ["team:core"], + deps = ["//:ray_lib",], +) + +py_test( + name = "test_reconciler", + size = "small", + srcs = ["tests/test_reconciler.py"], + tags = ["team:core"], + deps = ["//:ray_lib",], +) + +py_test( + name = "test_threaded_ray_installer", + size = "small", + srcs = ["tests/test_threaded_ray_installer.py"], + tags = ["team:core"], + deps = ["//:ray_lib",], +) py_test( name = "test_sdk", diff --git a/python/ray/autoscaler/v2/instance_manager/__init__.py b/python/ray/autoscaler/v2/instance_manager/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python/ray/autoscaler/v2/instance_manager/instance_storage.py b/python/ray/autoscaler/v2/instance_manager/instance_storage.py index 990cb1f00733d..3036c5010e911 100644 --- a/python/ray/autoscaler/v2/instance_manager/instance_storage.py +++ b/python/ray/autoscaler/v2/instance_manager/instance_storage.py @@ -1,4 +1,6 @@ +import copy import logging +import time from abc import ABCMeta, abstractmethod from dataclasses import dataclass from typing import Dict, List, Optional, Set, Tuple @@ -15,6 +17,7 @@ class InstanceUpdateEvent: instance_id: str new_status: int + new_ray_status: int = Instance.RAY_STATUS_UNKOWN class InstanceUpdatedSuscriber(metaclass=ABCMeta): @@ -39,7 +42,12 @@ def __init__( self._storage = storage self._cluster_id = cluster_id self._table_name = f"instance_table@{cluster_id}" - self._status_change_subscriber = status_change_subscriber + self._status_change_subscribers = [] + if status_change_subscriber: + self._status_change_subscribers.append(status_change_subscriber) + + def add_status_change_subscriber(self, subscriber: InstanceUpdatedSuscriber): + self._status_change_subscribers.append(subscriber) def batch_upsert_instances( self, @@ -68,25 +76,29 @@ def batch_upsert_instances( return StoreStatus(False, version) for instance in updates: + instance = copy.deepcopy(instance) # the instance version is set to 0, it will be # populated by the storage entry's verion on read instance.version = 0 + instance.timestamp_since_last_modified = int(time.time()) mutations[instance.instance_id] = instance.SerializeToString() result, version = self._storage.batch_update( self._table_name, mutations, {}, expected_storage_version ) - if result and self._status_change_subscriber: - self._status_change_subscriber.notify( - [ - InstanceUpdateEvent( - instance_id=instance.instance_id, - new_status=instance.status, - ) - for instance in updates - ], - ) + if result: + for subscriber in self._status_change_subscribers: + subscriber.notify( + [ + InstanceUpdateEvent( + instance_id=instance.instance_id, + new_status=instance.status, + new_ray_status=instance.ray_status, + ) + for instance in updates + ], + ) return StoreStatus(result, version) @@ -114,9 +126,11 @@ def upsert_instance( Returns: StoreStatus: A tuple of (success, storage_version). """ + instance = copy.deepcopy(instance) # the instance version is set to 0, it will be # populated by the storage entry's verion on read instance.version = 0 + instance.timestamp_since_last_modified = int(time.time()) result, version = self._storage.update( self._table_name, key=instance.instance_id, @@ -126,26 +140,34 @@ def upsert_instance( insert_only=False, ) - if result and self._status_change_subscriber: - self._status_change_subscriber.notify( - [ - InstanceUpdateEvent( - instance_id=instance.instance_id, - new_status=instance.status, - ) - ], - ) + if result: + for subscriber in self._status_change_subscribers: + subscriber.notify( + [ + InstanceUpdateEvent( + instance_id=instance.instance_id, + new_status=instance.status, + new_ray_status=instance.ray_status, + ) + ], + ) return StoreStatus(result, version) def get_instances( - self, instance_ids: List[str] = None, status_filter: Set[int] = None + self, + instance_ids: List[str] = None, + status_filter: Set[int] = None, + ray_status_filter: Set[int] = None, ) -> Tuple[Dict[str, Instance], int]: """Get instances from the storage. Args: instance_ids: A list of instance ids to be retrieved. If empty, all instances will be retrieved. + status_filter: Only instances with the specified status will be returned. + ray_status_filter: Only instances with the specified ray status will + be returned. Returns: Tuple[Dict[str, Instance], int]: A tuple of (instances, version). @@ -161,6 +183,8 @@ def get_instances( instance.version = entry_version if status_filter and instance.status not in status_filter: continue + if ray_status_filter and instance.ray_status not in ray_status_filter: + continue instances[instance_id] = instance return instances, version @@ -186,14 +210,16 @@ def batch_delete_instances( self._table_name, {}, instance_ids, expected_storage_version ) - if result[0] and self._status_change_subscriber: - self._status_change_subscriber.notify( - [ - InstanceUpdateEvent( - instance_id=instance_id, - new_status=Instance.GARAGE_COLLECTED, - ) - for instance_id in instance_ids - ], - ) + if result[0]: + for subscriber in self._status_change_subscribers: + subscriber.notify( + [ + InstanceUpdateEvent( + instance_id=instance_id, + new_status=Instance.GARBAGE_COLLECTED, + new_ray_status=Instance.RAY_STATUS_UNKOWN, + ) + for instance_id in instance_ids + ], + ) return result diff --git a/python/ray/autoscaler/v2/instance_manager/node_provider.py b/python/ray/autoscaler/v2/instance_manager/node_provider.py index 6eede42ba2ee1..dc4c7319c29a4 100644 --- a/python/ray/autoscaler/v2/instance_manager/node_provider.py +++ b/python/ray/autoscaler/v2/instance_manager/node_provider.py @@ -118,11 +118,11 @@ def _get_instance(self, cloud_instance_id: str) -> Instance: instance = Instance() instance.cloud_instance_id = cloud_instance_id if self._provider.is_running(cloud_instance_id): - instance.status = Instance.STARTING + instance.status = Instance.ALLOCATED elif self._provider.is_terminated(cloud_instance_id): instance.status = Instance.STOPPED else: - instance.status = Instance.INSTANCE_STATUS_UNSPECIFIED + instance.status = Instance.UNKNOWN instance.internal_ip = self._provider.internal_ip(cloud_instance_id) instance.external_ip = self._provider.external_ip(cloud_instance_id) instance.instance_type = self._provider.node_tags(cloud_instance_id)[ diff --git a/python/ray/autoscaler/v2/instance_manager/subscribers/__init__.py b/python/ray/autoscaler/v2/instance_manager/subscribers/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python/ray/autoscaler/v2/instance_manager/subscribers/instance_launcher.py b/python/ray/autoscaler/v2/instance_manager/subscribers/instance_launcher.py new file mode 100644 index 0000000000000..1ec5b1e63c10b --- /dev/null +++ b/python/ray/autoscaler/v2/instance_manager/subscribers/instance_launcher.py @@ -0,0 +1,162 @@ +import logging +import math +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor +from typing import List + +from ray.autoscaler._private.constants import ( + AUTOSCALER_MAX_CONCURRENT_LAUNCHES, + AUTOSCALER_MAX_LAUNCH_BATCH, +) +from ray.autoscaler.v2.instance_manager.instance_storage import ( + InstanceStorage, + InstanceUpdatedSuscriber, + InstanceUpdateEvent, +) +from ray.autoscaler.v2.instance_manager.node_provider import NodeProvider +from ray.core.generated.instance_manager_pb2 import Instance + +logger = logging.getLogger(__name__) + + +class InstanceLauncher(InstanceUpdatedSuscriber): + """InstanceLauncher is responsible for provisioning new instances.""" + + def __init__( + self, + instance_storage: InstanceStorage, + node_provider: NodeProvider, + max_concurrent_requests: int = math.ceil( + AUTOSCALER_MAX_CONCURRENT_LAUNCHES / float(AUTOSCALER_MAX_LAUNCH_BATCH) + ), + max_instances_per_request: int = AUTOSCALER_MAX_LAUNCH_BATCH, + ) -> None: + self._instance_storage = instance_storage + self._node_provider = node_provider + self._max_concurrent_requests = max_concurrent_requests + self._max_instances_per_request = max_instances_per_request + self._executor = ThreadPoolExecutor(max_workers=1) + self._launch_instance_executor = ThreadPoolExecutor( + max_workers=self._max_concurrent_requests + ) + + def notify(self, events: List[InstanceUpdateEvent]) -> None: + # TODO: we should do reconciliation based on events. + has_new_request = any( + [event.new_status == Instance.UNKNOWN for event in events] + ) + if has_new_request: + self._executor.submit(self._may_launch_new_instances) + + def _may_launch_new_instances(self): + new_instances, _ = self._instance_storage.get_instances( + status_filter={Instance.UNKNOWN} + ) + + if not new_instances: + logger.debug("No instances to launch") + return + + queued_instances = [] + for instance in new_instances.values(): + instance.status = Instance.QUEUED + success, version = self._instance_storage.upsert_instance( + instance, expected_instance_version=instance.version + ) + if success: + instance.version = version + queued_instances.append(instance) + else: + logger.error(f"Failed to update {instance} QUEUED") + + instances_by_type = defaultdict(list) + for instance in queued_instances: + instances_by_type[instance.instance_type].append(instance) + + for instance_type, instances in instances_by_type.items(): + for i in range(0, len(instances), self._max_instances_per_request): + self._launch_instance_executor.submit( + self._launch_new_instances_by_type, + instance_type, + instances[ + i : min( + i + self._max_instances_per_request, + len(instances), + ) + ], + ) + + def _launch_new_instances_by_type( + self, instance_type: str, instances: List[Instance] + ) -> int: + """Launches instances of the given type. + + Args: + instance_type: type of instance to launch. + instances: list of instances to launch. These instances should + have been marked as QUEUED with instance_type set. + Returns: + num of instances launched. + """ + logger.info(f"Launching {len(instances)} instances of type {instance_type}") + instances_selected = [] + for instance in instances: + instance.status = Instance.REQUESTED + result, version = self._instance_storage.upsert_instance( + instance, expected_instance_version=instance.version + ) + if not result: + logger.warn(f"Failed to update instance {instance}") + continue + instance.version = version + instances_selected.append(instance) + + if not instances_selected: + return 0 + + # TODO: idempotency token. + created_cloud_instances = self._node_provider.create_nodes( + instance_type, len(instances_selected) + ) + + assert len(created_cloud_instances) <= len(instances_selected) + + instances_launched = 0 + while created_cloud_instances and instances_selected: + cloud_instance = created_cloud_instances.pop() + instance = instances_selected.pop() + instance.cloud_instance_id = cloud_instance.cloud_instance_id + instance.internal_ip = cloud_instance.internal_ip + instance.external_ip = cloud_instance.external_ip + instance.status = Instance.ALLOCATED + instance.ray_status = Instance.RAY_STATUS_UNKOWN + + # update instance status into the storage + result, _ = self._instance_storage.upsert_instance( + instance, expected_instance_version=instance.version + ) + + if not result: + # TODO: this could only happen when the request is canceled. + logger.warn(f"Failed to update instance {instance}") + # push the cloud instance back + created_cloud_instances.append(cloud_instance) + continue + + instances_launched += 1 + + if created_cloud_instances: + # instances are leaked, we probably need to terminate them + for instance in created_cloud_instances: + self._node_provider.terminate_node(instance.cloud_instance_id) + + if instances_selected: + # instances creation failed, we need to marke them allocation failed. + for instance in instances_selected: + instance.status = Instance.ALLOCATION_FAILED + # TODO: add more information about the failure. + result, _ = self._instance_storage.upsert_instance( + instance, expected_instance_version=instance.version + ) + # TODO: this could only happen when the request is canceled. + return instances_launched diff --git a/python/ray/autoscaler/v2/instance_manager/subscribers/reconciler.py b/python/ray/autoscaler/v2/instance_manager/subscribers/reconciler.py new file mode 100644 index 0000000000000..01080daaedff9 --- /dev/null +++ b/python/ray/autoscaler/v2/instance_manager/subscribers/reconciler.py @@ -0,0 +1,109 @@ +import logging +import threading +from concurrent.futures import ThreadPoolExecutor +from typing import List + +from ray.autoscaler.v2.instance_manager.instance_storage import ( + InstanceStorage, + InstanceUpdatedSuscriber, + InstanceUpdateEvent, +) +from ray.autoscaler.v2.instance_manager.node_provider import NodeProvider +from ray.core.generated.instance_manager_pb2 import Instance + +logger = logging.getLogger(__name__) + + +class InstanceReconciler(InstanceUpdatedSuscriber): + """InstanceReconciler is responsible for reconciling the difference between + node provider and instance storage. It is also responsible for handling + failures. + """ + + def __init__( + self, + instance_storage: InstanceStorage, + node_provider: NodeProvider, + reconcile_interval_s: int = 120, + ) -> None: + self._instance_storage = instance_storage + self._node_provider = node_provider + self._failure_handling_executor = ThreadPoolExecutor(max_workers=1) + self._reconcile_interval_s = reconcile_interval_s + self._reconcile_timer_lock = threading.Lock() + with self._reconcile_timer_lock: + self._reconcile_timer = threading.Timer( + self._reconcile_interval_s, self._periodic_reconcile_helper + ) + + def shutdown(self): + with self._reconcile_timer_lock: + self._reconcile_timer.cancel() + + def notify(self, events: List[InstanceUpdateEvent]) -> None: + instance_ids = [ + event.instance_id + for event in events + if event.new_status in {Instance.ALLOCATED} + and event.new_ray_status + in {Instance.RAY_STOPPED, Instance.RAY_INSTALL_FAILED} + ] + if instance_ids: + self._failure_handling_executor.submit( + self._handle_ray_failure, instance_ids + ) + + def _handle_ray_failure(self, instance_ids: List[str]) -> int: + failing_instances, _ = self._instance_storage.get_instances( + instance_ids=instance_ids, + status_filter={Instance.ALLOCATED}, + ray_status_filter={Instance.RAY_STOPPED, Instance.RAY_INSTALL_FAILED}, + ) + if not failing_instances: + logger.debug("No ray failure") + return + + failing_instances = failing_instances.values() + for instance in failing_instances: + # this call is asynchronous. + self._node_provider.terminate_node(instance.cloud_instance_id) + + instance.status = Instance.STOPPING + result, _ = self._instance_storage.upsert_instance( + instance, expected_instance_version=instance.version + ) + if not result: + logger.warning("Failed to update instance status to STOPPING") + + def _periodic_reconcile_helper(self) -> None: + try: + self._reconcile_with_node_provider() + except Exception: + logger.exception("Failed to reconcile with node provider") + with self._reconcile_timer_lock: + self._reconcile_timer = threading.Timer( + self._reconcile_interval_s, self._periodic_reconcile_helper + ) + + def _reconcile_with_node_provider(self) -> None: + # reconcile storage state with cloud state. + none_terminated_cloud_instances = self._node_provider.get_non_terminated_nodes() + + # 1. if the storage instance is in STOPPING state and the no + # cloud instance is found, change the instance state to TERMINATED. + stopping_instances, _ = self._instance_storage.get_instances( + status_filter={Instance.STOPPING} + ) + + for instance in stopping_instances.values(): + if none_terminated_cloud_instances.get(instance.cloud_instance_id) is None: + instance.status = Instance.STOPPED + result, _ = self._instance_storage.upsert_instance( + instance, expected_instance_version=instance.version + ) + if not result: + logger.warning("Failed to update instance status to STOPPED") + + # 2. TODO: if the cloud instance has no storage instance can be found, + # it means the instance is likely leaked, terminate the instance. + # 3. TODO: we should also GC nodes have been stuck in installing state. diff --git a/python/ray/autoscaler/v2/instance_manager/subscribers/threaded_ray_installer.py b/python/ray/autoscaler/v2/instance_manager/subscribers/threaded_ray_installer.py new file mode 100644 index 0000000000000..0ac2b379ee23c --- /dev/null +++ b/python/ray/autoscaler/v2/instance_manager/subscribers/threaded_ray_installer.py @@ -0,0 +1,102 @@ +import logging +import time +from concurrent.futures import ThreadPoolExecutor +from typing import List + +from ray.autoscaler.v2.instance_manager.instance_storage import ( + InstanceStorage, + InstanceUpdatedSuscriber, + InstanceUpdateEvent, +) +from ray.autoscaler.v2.instance_manager.ray_installer import RayInstaller +from ray.core.generated.instance_manager_pb2 import Instance + +logger = logging.getLogger(__name__) + + +class ThreadedRayInstaller(InstanceUpdatedSuscriber): + """ThreadedRayInstaller is responsible for install ray on new nodes.""" + + def __init__( + self, + head_node_ip: str, + instance_storage: InstanceStorage, + ray_installer: RayInstaller, + max_install_attempts: int = 3, + install_retry_interval: int = 10, + max_concurrent_installs: int = 50, + ) -> None: + self._head_node_ip = head_node_ip + self._instance_storage = instance_storage + self._ray_installer = ray_installer + self._max_concurrent_installs = max_concurrent_installs + self._max_install_attempts = max_install_attempts + self._install_retry_interval = install_retry_interval + self._ray_installation_executor = ThreadPoolExecutor( + max_workers=self._max_concurrent_installs + ) + + def notify(self, events: List[InstanceUpdateEvent]) -> None: + for event in events: + if ( + event.new_status == Instance.ALLOCATED + and event.new_ray_status == Instance.RAY_STATUS_UNKOWN + ): + self._install_ray_on_new_nodes(event.instance_id) + + def _install_ray_on_new_nodes(self, instance_id: str) -> None: + allocated_instance, _ = self._instance_storage.get_instances( + instance_ids={instance_id}, + status_filter={Instance.ALLOCATED}, + ray_status_filter={Instance.RAY_STATUS_UNKOWN}, + ) + for instance in allocated_instance.values(): + self._ray_installation_executor.submit( + self._install_ray_on_single_node, instance + ) + + def _install_ray_on_single_node(self, instance: Instance) -> None: + assert instance.status == Instance.ALLOCATED + assert instance.ray_status == Instance.RAY_STATUS_UNKOWN + instance.ray_status = Instance.RAY_INSTALLING + success, version = self._instance_storage.upsert_instance( + instance, expected_instance_version=instance.version + ) + if not success: + logger.warning( + f"Failed to update instance {instance.instance_id} to RAY_INSTALLING" + ) + # Do not need to handle failures, it will be covered by + # garbage collection. + return + + # install with exponential backoff + installed = False + backoff_factor = 1 + for _ in range(self._max_install_attempts): + installed = self._ray_installer.install_ray(instance, self._head_node_ip) + if installed: + break + logger.warning("Failed to install ray, retrying...") + time.sleep(self._install_retry_interval * backoff_factor) + backoff_factor *= 2 + + if not installed: + instance.ray_status = Instance.RAY_INSTALL_FAILED + success, version = self._instance_storage.upsert_instance( + instance, + expected_instance_version=version, + ) + else: + instance.ray_status = Instance.RAY_RUNNING + success, version = self._instance_storage.upsert_instance( + instance, + expected_instance_version=version, + ) + if not success: + logger.warning( + f"Failed to update instance {instance.instance_id} to {instance.status}" + ) + # Do not need to handle failures, it will be covered by + # garbage collection. + return diff --git a/python/ray/autoscaler/v2/tests/test_instance_launcher.py b/python/ray/autoscaler/v2/tests/test_instance_launcher.py new file mode 100644 index 0000000000000..3f9b7d5ab1f77 --- /dev/null +++ b/python/ray/autoscaler/v2/tests/test_instance_launcher.py @@ -0,0 +1,110 @@ +# coding: utf-8 +import os +import sys +import unittest + +import pytest # noqa + +from ray._private.test_utils import load_test_config +from ray.autoscaler._private.event_summarizer import EventSummarizer +from ray.autoscaler._private.node_launcher import BaseNodeLauncher +from ray.autoscaler._private.node_provider_availability_tracker import ( + NodeProviderAvailabilityTracker, +) +from ray.autoscaler.v2.instance_manager.config import NodeProviderConfig +from ray.autoscaler.v2.instance_manager.instance_storage import InstanceStorage +from ray.autoscaler.v2.instance_manager.node_provider import NodeProviderAdapter +from ray.autoscaler.v2.instance_manager.storage import InMemoryStorage +from ray.autoscaler.v2.instance_manager.subscribers.instance_launcher import ( + InstanceLauncher, +) +from ray.autoscaler.v2.tests.util import FakeCounter, create_instance +from ray.core.generated.instance_manager_pb2 import Instance +from ray.tests.autoscaler_test_utils import MockProvider + + +class InstanceLauncherTest(unittest.TestCase): + def setUp(self): + self.base_provider = MockProvider() + self.availability_tracker = NodeProviderAvailabilityTracker() + self.node_launcher = BaseNodeLauncher( + self.base_provider, + FakeCounter(), + EventSummarizer(), + self.availability_tracker, + ) + self.instance_config_provider = NodeProviderConfig( + load_test_config("test_ray_complex.yaml") + ) + self.node_provider = NodeProviderAdapter( + self.base_provider, self.node_launcher, self.instance_config_provider + ) + self.instance_storage = InstanceStorage( + cluster_id="test_cluster_id", + storage=InMemoryStorage(), + ) + self.instance_launcher = InstanceLauncher( + instance_storage=self.instance_storage, + node_provider=self.node_provider, + max_concurrent_requests=1, + max_instances_per_request=1, + ) + self.instance_storage.add_status_change_subscriber(self.instance_launcher) + + def test_launch_new_instance_by_type(self): + instance = create_instance("1") + success, verison = self.instance_storage.upsert_instance(instance) + assert success + instance.version = verison + assert 1 == self.instance_launcher._launch_new_instances_by_type( + "worker_nodes1", [instance] + ) + instances, _ = self.instance_storage.get_instances() + assert len(instances) == 1 + assert instances["1"].status == Instance.ALLOCATED + assert instances["1"].cloud_instance_id == "0" + + def test_launch_failed(self): + # launch failed: instance is not in storage + instance = create_instance("1") + assert 0 == self.instance_launcher._launch_new_instances_by_type( + "worker_nodes1", [instance] + ) + instances, _ = self.instance_storage.get_instances() + assert len(instances) == 0 + + # launch failed: instance version mismatch + instance = create_instance("1") + self.instance_storage.upsert_instance(instance) + instance.version = 2 + assert 0 == self.instance_launcher._launch_new_instances_by_type( + "worker_nodes1", [instance] + ) + instances, _ = self.instance_storage.get_instances() + assert len(instances) == 1 + assert instances["1"].status == Instance.UNKNOWN + + def test_launch_partial_success(self): + self.base_provider.partical_success_count = 1 + instance1 = create_instance("1") + instance2 = create_instance("2") + success, version = self.instance_storage.batch_upsert_instances( + [instance1, instance2] + ) + assert success + instance1.version = version + instance2.version = version + self.instance_launcher._launch_new_instances_by_type( + "worker_nodes1", [instance1, instance2] + ) + instances, _ = self.instance_storage.get_instances() + assert len(instances) == 2 + assert instances["1"].status == Instance.ALLOCATION_FAILED + assert instances["2"].status == Instance.ALLOCATED + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/autoscaler/v2/tests/test_instance_storage.py b/python/ray/autoscaler/v2/tests/test_instance_storage.py index 881520ea6cf44..3e06b3669639e 100644 --- a/python/ray/autoscaler/v2/tests/test_instance_storage.py +++ b/python/ray/autoscaler/v2/tests/test_instance_storage.py @@ -2,6 +2,7 @@ import copy import os import sys +from unittest import mock import pytest # noqa @@ -22,12 +23,16 @@ def notify(self, events): self.events.extend(events) -def create_instance( - instance_id, status=Instance.INSTANCE_STATUS_UNSPECIFIED, version=0 -): - return Instance(instance_id=instance_id, status=status, version=version) +def create_instance(instance_id, status=Instance.UNKNOWN, version=0): + return Instance( + instance_id=instance_id, + status=status, + version=version, + timestamp_since_last_modified=1, + ) +@mock.patch("time.time", mock.MagicMock(return_value=1)) def test_upsert(): subscriber = DummySubscriber() @@ -46,8 +51,8 @@ def test_upsert(): ) assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), ] instance1.version = 1 @@ -66,11 +71,11 @@ def test_upsert(): ) assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), ] - instance2.status = Instance.IDLE + instance2.status = Instance.ALLOCATED assert (True, 2) == storage.batch_upsert_instances( [instance3, instance2], expected_storage_version=1, @@ -89,13 +94,14 @@ def test_upsert(): } assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance3", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.IDLE), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), + InstanceUpdateEvent("instance3", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.ALLOCATED), ] +@mock.patch("time.time", mock.MagicMock(return_value=1)) def test_update(): subscriber = DummySubscriber() @@ -109,13 +115,13 @@ def test_update(): assert (True, 1) == storage.upsert_instance(instance=instance1) assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), ] assert (True, 2) == storage.upsert_instance(instance=instance2) assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), ] assert ( @@ -139,8 +145,8 @@ def test_update(): ) assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), ] assert (True, 3) == storage.upsert_instance( @@ -157,9 +163,9 @@ def test_update(): ) == storage.get_instances() assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), ] assert (True, 4) == storage.upsert_instance( @@ -176,13 +182,14 @@ def test_update(): ) == storage.get_instances() assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), ] +@mock.patch("time.time", mock.MagicMock(return_value=1)) def test_delete(): subscriber = DummySubscriber() @@ -206,10 +213,10 @@ def test_delete(): assert (True, 2) == storage.batch_delete_instances(instance_ids=["instance1"]) assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance3", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance1", Instance.GARAGE_COLLECTED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), + InstanceUpdateEvent("instance3", Instance.UNKNOWN), + InstanceUpdateEvent("instance1", Instance.GARBAGE_COLLECTED), ] assert ( @@ -232,22 +239,23 @@ def test_delete(): ) == storage.get_instances() assert subscriber.events == [ - InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance3", Instance.INSTANCE_STATUS_UNSPECIFIED), - InstanceUpdateEvent("instance1", Instance.GARAGE_COLLECTED), - InstanceUpdateEvent("instance2", Instance.GARAGE_COLLECTED), + InstanceUpdateEvent("instance1", Instance.UNKNOWN), + InstanceUpdateEvent("instance2", Instance.UNKNOWN), + InstanceUpdateEvent("instance3", Instance.UNKNOWN), + InstanceUpdateEvent("instance1", Instance.GARBAGE_COLLECTED), + InstanceUpdateEvent("instance2", Instance.GARBAGE_COLLECTED), ] +@mock.patch("time.time", mock.MagicMock(return_value=1)) def test_get_instances(): storage = InstanceStorage( cluster_id="test_cluster", storage=InMemoryStorage(), ) instance1 = create_instance("instance1", version=1) - instance2 = create_instance("instance2", status=Instance.RUNNING, version=1) - instance3 = create_instance("instance3", status=Instance.IDLE, version=1) + instance2 = create_instance("instance2", status=Instance.ALLOCATED, version=1) + instance3 = create_instance("instance3", status=Instance.STOPPING, version=1) assert (True, 1) == storage.batch_upsert_instances( [copy.deepcopy(instance1), copy.deepcopy(instance2), copy.deepcopy(instance3)], @@ -272,7 +280,7 @@ def test_get_instances(): ) == storage.get_instances(instance_ids=["instance1", "instance2"]) assert ({"instance2": instance2}, 1) == storage.get_instances( - instance_ids=["instance1", "instance2"], status_filter={Instance.RUNNING} + instance_ids=["instance1", "instance2"], status_filter={Instance.ALLOCATED} ) assert ( @@ -280,7 +288,7 @@ def test_get_instances(): "instance2": instance2, }, 1, - ) == storage.get_instances(status_filter={Instance.RUNNING}) + ) == storage.get_instances(status_filter={Instance.ALLOCATED}) if __name__ == "__main__": diff --git a/python/ray/autoscaler/v2/tests/test_node_provider.py b/python/ray/autoscaler/v2/tests/test_node_provider.py index 8bdad4a6c18f0..dda209c7ba6cb 100644 --- a/python/ray/autoscaler/v2/tests/test_node_provider.py +++ b/python/ray/autoscaler/v2/tests/test_node_provider.py @@ -14,15 +14,11 @@ from ray.autoscaler.node_launch_exception import NodeLaunchException from ray.autoscaler.v2.instance_manager.config import NodeProviderConfig from ray.autoscaler.v2.instance_manager.node_provider import NodeProviderAdapter +from ray.autoscaler.v2.tests.util import FakeCounter from ray.core.generated.instance_manager_pb2 import Instance from ray.tests.autoscaler_test_utils import MockProvider -class FakeCounter: - def dec(self, *args, **kwargs): - pass - - class NodeProviderTest(unittest.TestCase): def setUp(self): self.base_provider = MockProvider() @@ -48,7 +44,7 @@ def test_node_providers_pass_through(self): cloud_instance_id="0", internal_ip="172.0.0.0", external_ip="1.2.3.4", - status=Instance.INSTANCE_STATUS_UNSPECIFIED, + status=Instance.UNKNOWN, ) self.assertEqual(len(self.base_provider.mock_nodes), 1) self.assertEqual(self.node_provider.get_non_terminated_nodes(), {"0": nodes[0]}) @@ -59,14 +55,14 @@ def test_node_providers_pass_through(self): cloud_instance_id="1", internal_ip="172.0.0.1", external_ip="1.2.3.4", - status=Instance.INSTANCE_STATUS_UNSPECIFIED, + status=Instance.UNKNOWN, ) assert nodes1[1] == Instance( instance_type="worker_nodes", cloud_instance_id="2", internal_ip="172.0.0.2", external_ip="1.2.3.4", - status=Instance.INSTANCE_STATUS_UNSPECIFIED, + status=Instance.UNKNOWN, ) self.assertEqual( self.node_provider.get_non_terminated_nodes(), diff --git a/python/ray/autoscaler/v2/tests/test_reconciler.py b/python/ray/autoscaler/v2/tests/test_reconciler.py new file mode 100644 index 0000000000000..62e46653e19b9 --- /dev/null +++ b/python/ray/autoscaler/v2/tests/test_reconciler.py @@ -0,0 +1,86 @@ +# coding: utf-8 +import os +import sys +import unittest + +import pytest # noqa + +from ray._private.test_utils import load_test_config +from ray.autoscaler._private.event_summarizer import EventSummarizer +from ray.autoscaler._private.node_launcher import BaseNodeLauncher +from ray.autoscaler._private.node_provider_availability_tracker import ( + NodeProviderAvailabilityTracker, +) +from ray.autoscaler.v2.instance_manager.config import NodeProviderConfig +from ray.autoscaler.v2.instance_manager.instance_storage import InstanceStorage +from ray.autoscaler.v2.instance_manager.node_provider import NodeProviderAdapter +from ray.autoscaler.v2.instance_manager.storage import InMemoryStorage +from ray.autoscaler.v2.instance_manager.subscribers.reconciler import InstanceReconciler +from ray.autoscaler.v2.tests.util import FakeCounter +from ray.core.generated.instance_manager_pb2 import Instance +from ray.tests.autoscaler_test_utils import MockProvider + + +class InstanceReconcilerTest(unittest.TestCase): + def setUp(self): + self.base_provider = MockProvider() + self.availability_tracker = NodeProviderAvailabilityTracker() + self.node_launcher = BaseNodeLauncher( + self.base_provider, + FakeCounter(), + EventSummarizer(), + self.availability_tracker, + ) + self.instance_config_provider = NodeProviderConfig( + load_test_config("test_ray_complex.yaml") + ) + self.node_provider = NodeProviderAdapter( + self.base_provider, self.node_launcher, self.instance_config_provider + ) + + self.instance_storage = InstanceStorage( + cluster_id="test_cluster_id", + storage=InMemoryStorage(), + ) + self.reconciler = InstanceReconciler( + instance_storage=self.instance_storage, + node_provider=self.node_provider, + ) + + def tearDown(self): + self.reconciler.shutdown() + + def test_handle_ray_failure(self): + self.node_provider.create_nodes("worker_nodes1", 1) + instance = Instance( + instance_id="0", + instance_type="worker_nodes1", + cloud_instance_id="0", + status=Instance.ALLOCATED, + ray_status=Instance.RAY_STOPPED, + ) + assert not self.base_provider.is_terminated(instance.cloud_instance_id) + success, verison = self.instance_storage.upsert_instance(instance) + assert success + instance.version = verison + self.reconciler._handle_ray_failure([instance.instance_id]) + + instances, _ = self.instance_storage.get_instances( + instance_ids={instance.instance_id} + ) + assert instances[instance.instance_id].status == Instance.STOPPING + assert self.base_provider.is_terminated(instance.cloud_instance_id) + + # reconciler will detect the node is terminated and update the status. + self.reconciler._reconcile_with_node_provider() + instances, _ = self.instance_storage.get_instances( + instance_ids={instance.instance_id} + ) + assert instances[instance.instance_id].status == Instance.STOPPED + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/autoscaler/v2/tests/test_threaded_ray_installer.py b/python/ray/autoscaler/v2/tests/test_threaded_ray_installer.py new file mode 100644 index 0000000000000..323e99c9f975d --- /dev/null +++ b/python/ray/autoscaler/v2/tests/test_threaded_ray_installer.py @@ -0,0 +1,109 @@ +# coding: utf-8 +import os +import sys +import unittest +from unittest.mock import patch + +import pytest # noqa + +from ray._private.test_utils import load_test_config +from ray.autoscaler.tags import TAG_RAY_NODE_KIND +from ray.autoscaler.v2.instance_manager.config import NodeProviderConfig +from ray.autoscaler.v2.instance_manager.instance_storage import InstanceStorage +from ray.autoscaler.v2.instance_manager.ray_installer import RayInstaller +from ray.autoscaler.v2.instance_manager.storage import InMemoryStorage +from ray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installer import ( + ThreadedRayInstaller, +) +from ray.core.generated.instance_manager_pb2 import Instance +from ray.tests.autoscaler_test_utils import MockProcessRunner, MockProvider + + +class ThreadedRayInstallerTest(unittest.TestCase): + def setUp(self): + self.base_provider = MockProvider() + self.instance_config_provider = NodeProviderConfig( + load_test_config("test_ray_complex.yaml") + ) + self.runner = MockProcessRunner() + self.ray_installer = RayInstaller( + self.base_provider, self.instance_config_provider, self.runner + ) + self.instance_storage = InstanceStorage( + cluster_id="test_cluster_id", + storage=InMemoryStorage(), + ) + self.threaded_ray_installer = ThreadedRayInstaller( + head_node_ip="127.0.0.1", + instance_storage=self.instance_storage, + ray_installer=self.ray_installer, + ) + + def test_install_ray_on_new_node_version_mismatch(self): + self.base_provider.create_node({}, {TAG_RAY_NODE_KIND: "worker_nodes1"}, 1) + instance = Instance( + instance_id="0", + instance_type="worker_nodes1", + cloud_instance_id="0", + status=Instance.ALLOCATED, + ) + success, verison = self.instance_storage.upsert_instance(instance) + assert success + self.runner.respond_to_call("json .Config.Env", ["[]" for i in range(1)]) + + self.threaded_ray_installer._install_ray_on_single_node(instance) + instances, _ = self.instance_storage.get_instances( + instance_ids={instance.instance_id} + ) + assert instances[instance.instance_id].status == Instance.ALLOCATED + assert instances[instance.instance_id].version == verison + + @patch.object(RayInstaller, "install_ray") + def test_install_ray_on_new_node_install_failed(self, mock_method): + self.base_provider.create_node({}, {TAG_RAY_NODE_KIND: "worker_nodes1"}, 1) + instance = Instance( + instance_id="0", + instance_type="worker_nodes1", + cloud_instance_id="0", + status=Instance.ALLOCATED, + ) + success, verison = self.instance_storage.upsert_instance(instance) + assert success + instance.version = verison + + mock_method.return_value = False + self.threaded_ray_installer._install_retry_interval = 0 + self.threaded_ray_installer._max_install_attempts = 1 + self.threaded_ray_installer._install_ray_on_single_node(instance) + + instances, _ = self.instance_storage.get_instances( + instance_ids={instance.instance_id} + ) + assert instances[instance.instance_id].ray_status == Instance.RAY_INSTALL_FAILED + + def test_install_ray_on_new_nodes(self): + self.base_provider.create_node({}, {TAG_RAY_NODE_KIND: "worker_nodes1"}, 1) + instance = Instance( + instance_id="0", + instance_type="worker_nodes1", + cloud_instance_id="0", + status=Instance.ALLOCATED, + ) + success, verison = self.instance_storage.upsert_instance(instance) + assert success + instance.version = verison + self.runner.respond_to_call("json .Config.Env", ["[]" for i in range(1)]) + + self.threaded_ray_installer._install_ray_on_new_nodes(instance.instance_id) + self.threaded_ray_installer._ray_installation_executor.shutdown(wait=True) + instances, _ = self.instance_storage.get_instances( + instance_ids={instance.instance_id} + ) + assert instances[instance.instance_id].ray_status == Instance.RAY_RUNNING + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/autoscaler/v2/tests/util.py b/python/ray/autoscaler/v2/tests/util.py index 7b55790a40d0d..16bf4afd963ba 100644 --- a/python/ray/autoscaler/v2/tests/util.py +++ b/python/ray/autoscaler/v2/tests/util.py @@ -2,6 +2,7 @@ from ray.autoscaler.v2.schema import ResourceUsage from ray.core.generated import autoscaler_pb2 +from ray.core.generated.instance_manager_pb2 import Instance def get_cluster_resource_state(stub) -> autoscaler_pb2.ClusterResourceState: @@ -11,6 +12,28 @@ def get_cluster_resource_state(stub) -> autoscaler_pb2.ClusterResourceState: return stub.GetClusterResourceState(request).cluster_resource_state +class FakeCounter: + def dec(self, *args, **kwargs): + pass + + +def create_instance( + instance_id, + status=Instance.UNKNOWN, + version=0, + ray_status=Instance.RAY_STATUS_UNKOWN, + instance_type="worker_nodes1", +): + return Instance( + instance_id=instance_id, + status=status, + version=version, + instance_type=instance_type, + ray_status=ray_status, + timestamp_since_last_modified=1, + ) + + def report_autoscaling_state(stub, autoscaling_state: autoscaler_pb2.AutoscalingState): request = autoscaler_pb2.ReportAutoscalingStateRequest( autoscaling_state=autoscaling_state diff --git a/python/ray/tests/autoscaler_test_utils.py b/python/ray/tests/autoscaler_test_utils.py index c9af7e7582a11..8db4894c4577f 100644 --- a/python/ray/tests/autoscaler_test_utils.py +++ b/python/ray/tests/autoscaler_test_utils.py @@ -159,6 +159,7 @@ def __init__(self, cache_stopped=False, unique_ips=False): self.unique_ips = unique_ips self.fail_to_fetch_ip = False self.safe_to_scale_flag = True + self.partical_success_count = None # Many of these functions are called by node_launcher or updater in # different threads. This can be treated as a global lock for # everything. @@ -233,6 +234,8 @@ def create_node(self, node_config, tags, count, _skip_wait=False): return created_nodes = {} + if self.partical_success_count is not None: + count = min(count, self.partical_success_count) with self.lock: if self.cache_stopped: for node in self.mock_nodes.values(): diff --git a/src/ray/protobuf/experimental/instance_manager.proto b/src/ray/protobuf/experimental/instance_manager.proto index 3d7b4aaa5d778..9f07d2f189839 100644 --- a/src/ray/protobuf/experimental/instance_manager.proto +++ b/src/ray/protobuf/experimental/instance_manager.proto @@ -51,45 +51,37 @@ message GetAvailableInstanceTypesResponse { // period (by default 30 minutes). message Instance { enum InstanceStatus { - // The unspecified state - most likey it is queued. - INSTANCE_STATUS_UNSPECIFIED = 0; - // Instance is starting. The first state update received from the - // instance. - STARTING = 1; - // The instance is running - one of two states of a healthy instance. - RUNNING = 2; - // The instance is idle - one of two states of a healthy instance. - IDLE = 3; - // The instance is stopping - usually follows from the RUNNING, IDLE, - // PREEMPT_REQUEST or DRAIN_REQUEST state. - STOPPING = 4; + // The unspecified state + UNKNOWN = 0; + // The instance is queued to be allocated. + QEUEUD = 1; + // The instance is requested to be allocated - follows from the QUEUED state. + REQUESTED = 2; + // The instance is allocated - follows from the REQUESTED state. + ALLOCATED = 3; + // The instance allocation failed - follows from the REQUESTED state. + ALLOCATION_FAILED = 4; + // The instance is stopping - follows from the ALLOCATED state. + STOPPING = 5; // The instance is stopped - follows from the STOPPING state. - STOPPED = 5; - // The instance is in a bad state - but it is still able to send updates. - FAILING = 6; - // The subscribe service moves instances to this state if they - // have been idle for too long. This allows the cluster manager to - // make a final decision on whether or not to commence a drain - // sequence for this instance. - DRAIN_CONFIRMATION_PENDING = 7; - // The instance should be drained, Ray should start draining process - // but could reject if failed to drain. - DRAIN_REQUEST = 8; - // The instance will be preempted by the instance manager, regardless - // of whether it is drainable or not. - PREEMPT_REQUEST = 9; - // An optional state that can be used to indicate that the instance - // is allocated from cloud provider, but ray hasn't been installed yet. - INSTANCE_ALLOCATED = 10; - // An optional state that can be used to indicate that the instance - // is currently installing Ray. - INSTALLING_RAY = 11; - // An optional state that can be used to indicate that the instance - // failed to allocate from cloud provider. - ALLOCATION_FAILED = 12; - // Node is deleted. - GARAGE_COLLECTED = 13; + STOPPED = 6; + // The instnace record is deleted from the storage - follows from the STOPPED state. + GARBAGE_COLLECTED = 7; } + + enum RayStatus { + // The unspecified state + RAY_STATUS_UNKOWN = 0; + // The instance is installing ray. + RAY_INSTALLING = 1; + // Ray installation failed - follows from the RAY_INSTALLING state. + RAY_INSTALL_FAILED = 2; + // Ray started and connected to gcs - follows from the RAY_INSTALLING state. + RAY_RUNNING = 3; + // Ray stopped - follows from the RAY_RUNNING state. + RAY_STOPPED = 4; + } + // an unique id for the instance that's generated by the // instance manager. This may be optional if // the instance hasn't be started yet. @@ -100,8 +92,8 @@ message Instance { string instance_type = 13; // The corresponding total resources on the node. map total_resources = 14; - // timestamp of the last state changed. - int64 timestamp_since_last_state_change = 15; + // timestamp of the last time the instance get modified. + int64 timestamp_since_last_modified = 15; // the external id of the instance that's generated by // the cloud provider like AWS, GCP, etc. // Note this id can be reused by different instances. @@ -112,6 +104,8 @@ message Instance { string external_ip = 18; // the monotonically increasing version number of the instance. int64 version = 19; + // the status of the ray process on the instance. + RayStatus ray_status = 20; } message UpdateInstanceManagerStateRequest {