From e83794ff9acd466903723f9f99513056a3da9519 Mon Sep 17 00:00:00 2001 From: Chen Shen Date: Wed, 3 May 2023 22:12:20 -0700 Subject: [PATCH] [autoscaler v2][3/n] introducing instance storage (#34979) Why are these changes needed? this is the stack of PRs to introduce new node_provider for autoscaler v2. Stack of PRs #34976 #34977 #34979 <- this PR #34983 #34985 This PR introduces instance storage, which is a wrapper around the storage that allows us to store and update the state of instances; it also allows users to subscribe to instance state change, which enables the followup reconciler to reconcile the instance state with cloud provider. --- BUILD.bazel | 1 + python/ray/autoscaler/v2/BUILD | 8 + .../v2/instance_manager/instance_storage.py | 199 ++++++++++++ .../v2/tests/test_instance_storage.py | 290 ++++++++++++++++++ 4 files changed, 498 insertions(+) create mode 100644 python/ray/autoscaler/v2/instance_manager/instance_storage.py create mode 100644 python/ray/autoscaler/v2/tests/test_instance_storage.py diff --git a/BUILD.bazel b/BUILD.bazel index b7cb26e07bd03..32a453b9a086e 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -2883,6 +2883,7 @@ filegroup( "//src/ray/protobuf:event_py_proto", "//src/ray/protobuf:gcs_py_proto", "//src/ray/protobuf:gcs_service_py_proto", + "//src/ray/protobuf:instance_manager_py_proto", "//src/ray/protobuf:job_agent_py_proto", "//src/ray/protobuf:monitor_py_proto", "//src/ray/protobuf:node_manager_py_proto", diff --git a/python/ray/autoscaler/v2/BUILD b/python/ray/autoscaler/v2/BUILD index bef608e8d148e..723d585ebc70b 100644 --- a/python/ray/autoscaler/v2/BUILD +++ b/python/ray/autoscaler/v2/BUILD @@ -5,6 +5,14 @@ # -------------------------------------------------------------------- load("//bazel:python.bzl", "py_test_module_list") +py_test( + name = "test_instance_storage", + size = "small", + srcs = ["tests/test_instance_storage.py"], + tags = ["team:core"], + deps = ["//:ray_lib",], +) + py_test( name = "test_storage", size = "small", diff --git a/python/ray/autoscaler/v2/instance_manager/instance_storage.py b/python/ray/autoscaler/v2/instance_manager/instance_storage.py new file mode 100644 index 0000000000000..990cb1f00733d --- /dev/null +++ b/python/ray/autoscaler/v2/instance_manager/instance_storage.py @@ -0,0 +1,199 @@ +import logging +from abc import ABCMeta, abstractmethod +from dataclasses import dataclass +from typing import Dict, List, Optional, Set, Tuple + +from ray.autoscaler.v2.instance_manager.storage import Storage, StoreStatus +from ray.core.generated.instance_manager_pb2 import Instance + +logger = logging.getLogger(__name__) + + +@dataclass +class InstanceUpdateEvent: + """Notifies the status change of an instance.""" + + instance_id: str + new_status: int + + +class InstanceUpdatedSuscriber(metaclass=ABCMeta): + """Subscribers to instance status changes.""" + + @abstractmethod + def notify(self, events: List[InstanceUpdateEvent]) -> None: + pass + + +class InstanceStorage: + """Instance storage stores the states of instances in the storage. It also + allows users to subscribe to instance status changes to trigger reconciliation + with cloud provider.""" + + def __init__( + self, + cluster_id: str, + storage: Storage, + status_change_subscriber: Optional[InstanceUpdatedSuscriber] = None, + ) -> None: + self._storage = storage + self._cluster_id = cluster_id + self._table_name = f"instance_table@{cluster_id}" + self._status_change_subscriber = status_change_subscriber + + def batch_upsert_instances( + self, + updates: List[Instance], + expected_storage_version: Optional[int] = None, + ) -> StoreStatus: + """Upsert instances into the storage. If the instance already exists, + it will be updated. Otherwise, it will be inserted. If the + expected_storage_version is specified, the update will fail if the + current storage version does not match the expected version. + + Note the version of the upserted instances will be set to the current + storage version. + + Args: + updates: A list of instances to be upserted. + expected_storage_version: The expected storage version. + + Returns: + StoreStatus: A tuple of (success, storage_version). + """ + mutations = {} + version = self._storage.get_version() + # handle version mismatch + if expected_storage_version and expected_storage_version != version: + return StoreStatus(False, version) + + for instance in updates: + # the instance version is set to 0, it will be + # populated by the storage entry's verion on read + instance.version = 0 + mutations[instance.instance_id] = instance.SerializeToString() + + result, version = self._storage.batch_update( + self._table_name, mutations, {}, expected_storage_version + ) + + if result and self._status_change_subscriber: + self._status_change_subscriber.notify( + [ + InstanceUpdateEvent( + instance_id=instance.instance_id, + new_status=instance.status, + ) + for instance in updates + ], + ) + + return StoreStatus(result, version) + + def upsert_instance( + self, + instance: Instance, + expected_instance_version: Optional[int] = None, + expected_storage_verison: Optional[int] = None, + ) -> StoreStatus: + """Upsert an instance in the storage. + If the expected_instance_version is specified, the update will fail + if the current instance version does not match the expected version. + Similarly, if the expected_storage_version is + specified, the update will fail if the current storage version does not + match the expected version. + + Note the version of the upserted instances will be set to the current + storage version. + + Args: + instance: The instance to be updated. + expected_instance_version: The expected instance version. + expected_storage_version: The expected storage version. + + Returns: + StoreStatus: A tuple of (success, storage_version). + """ + # the instance version is set to 0, it will be + # populated by the storage entry's verion on read + instance.version = 0 + result, version = self._storage.update( + self._table_name, + key=instance.instance_id, + value=instance.SerializeToString(), + expected_entry_version=expected_instance_version, + expected_storage_version=expected_storage_verison, + insert_only=False, + ) + + if result and self._status_change_subscriber: + self._status_change_subscriber.notify( + [ + InstanceUpdateEvent( + instance_id=instance.instance_id, + new_status=instance.status, + ) + ], + ) + + return StoreStatus(result, version) + + def get_instances( + self, instance_ids: List[str] = None, status_filter: Set[int] = None + ) -> Tuple[Dict[str, Instance], int]: + """Get instances from the storage. + + Args: + instance_ids: A list of instance ids to be retrieved. If empty, all + instances will be retrieved. + + Returns: + Tuple[Dict[str, Instance], int]: A tuple of (instances, version). + The instances is a dictionary of (instance_id, instance) pairs. + """ + instance_ids = instance_ids or [] + status_filter = status_filter or set() + pairs, version = self._storage.get(self._table_name, instance_ids) + instances = {} + for instance_id, (instance_data, entry_version) in pairs.items(): + instance = Instance() + instance.ParseFromString(instance_data) + instance.version = entry_version + if status_filter and instance.status not in status_filter: + continue + instances[instance_id] = instance + return instances, version + + def batch_delete_instances( + self, instance_ids: List[str], expected_storage_version: Optional[int] = None + ) -> StoreStatus: + """Delete instances from the storage. If the expected_version is + specified, the update will fail if the current storage version does not + match the expected version. + + Args: + to_delete: A list of instances to be deleted. + expected_version: The expected storage version. + + Returns: + StoreStatus: A tuple of (success, storage_version). + """ + version = self._storage.get_version() + if expected_storage_version and expected_storage_version != version: + return StoreStatus(False, version) + + result = self._storage.batch_update( + self._table_name, {}, instance_ids, expected_storage_version + ) + + if result[0] and self._status_change_subscriber: + self._status_change_subscriber.notify( + [ + InstanceUpdateEvent( + instance_id=instance_id, + new_status=Instance.GARAGE_COLLECTED, + ) + for instance_id in instance_ids + ], + ) + return result diff --git a/python/ray/autoscaler/v2/tests/test_instance_storage.py b/python/ray/autoscaler/v2/tests/test_instance_storage.py new file mode 100644 index 0000000000000..881520ea6cf44 --- /dev/null +++ b/python/ray/autoscaler/v2/tests/test_instance_storage.py @@ -0,0 +1,290 @@ +# coding: utf-8 +import copy +import os +import sys + +import pytest # noqa + +from ray.autoscaler.v2.instance_manager.instance_storage import ( + InstanceStorage, + InstanceUpdatedSuscriber, + InstanceUpdateEvent, +) +from ray.autoscaler.v2.instance_manager.storage import InMemoryStorage +from ray.core.generated.instance_manager_pb2 import Instance + + +class DummySubscriber(InstanceUpdatedSuscriber): + def __init__(self): + self.events = [] + + def notify(self, events): + self.events.extend(events) + + +def create_instance( + instance_id, status=Instance.INSTANCE_STATUS_UNSPECIFIED, version=0 +): + return Instance(instance_id=instance_id, status=status, version=version) + + +def test_upsert(): + subscriber = DummySubscriber() + + storage = InstanceStorage( + cluster_id="test_cluster", + storage=InMemoryStorage(), + status_change_subscriber=subscriber, + ) + instance1 = create_instance("instance1") + instance2 = create_instance("instance2") + instance3 = create_instance("instance3") + + assert (True, 1) == storage.batch_upsert_instances( + [instance1, instance2], + expected_storage_version=None, + ) + + assert subscriber.events == [ + InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + ] + + instance1.version = 1 + instance2.version = 1 + entries, storage_version = storage.get_instances() + + assert storage_version == 1 + assert entries == { + "instance1": instance1, + "instance2": instance2, + } + + assert (False, 1) == storage.batch_upsert_instances( + [create_instance("instance1"), create_instance("instance2")], + expected_storage_version=0, + ) + + assert subscriber.events == [ + InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + ] + + instance2.status = Instance.IDLE + assert (True, 2) == storage.batch_upsert_instances( + [instance3, instance2], + expected_storage_version=1, + ) + + instance1.version = 1 + instance2.version = 2 + instance3.version = 2 + entries, storage_version = storage.get_instances() + + assert storage_version == 2 + assert entries == { + "instance1": instance1, + "instance2": instance2, + "instance3": instance3, + } + + assert subscriber.events == [ + InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance3", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance2", Instance.IDLE), + ] + + +def test_update(): + subscriber = DummySubscriber() + + storage = InstanceStorage( + cluster_id="test_cluster", + storage=InMemoryStorage(), + status_change_subscriber=subscriber, + ) + instance1 = create_instance("instance1") + instance2 = create_instance("instance2") + + assert (True, 1) == storage.upsert_instance(instance=instance1) + assert subscriber.events == [ + InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), + ] + assert (True, 2) == storage.upsert_instance(instance=instance2) + + assert subscriber.events == [ + InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + ] + + assert ( + { + "instance1": create_instance("instance1", version=1), + "instance2": create_instance("instance2", version=2), + }, + 2, + ) == storage.get_instances() + + # failed because instance version is not correct + assert (False, 2) == storage.upsert_instance( + instance=instance1, + expected_instance_version=0, + ) + + # failed because storage version is not correct + assert (False, 2) == storage.upsert_instance( + instance=instance1, + expected_storage_verison=0, + ) + + assert subscriber.events == [ + InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + ] + + assert (True, 3) == storage.upsert_instance( + instance=instance2, + expected_storage_verison=2, + ) + + assert ( + { + "instance1": create_instance("instance1", version=1), + "instance2": create_instance("instance2", version=3), + }, + 3, + ) == storage.get_instances() + + assert subscriber.events == [ + InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + ] + + assert (True, 4) == storage.upsert_instance( + instance=instance1, + expected_instance_version=1, + ) + + assert ( + { + "instance1": create_instance("instance1", version=4), + "instance2": create_instance("instance2", version=3), + }, + 4, + ) == storage.get_instances() + + assert subscriber.events == [ + InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), + ] + + +def test_delete(): + subscriber = DummySubscriber() + + storage = InstanceStorage( + cluster_id="test_cluster", + storage=InMemoryStorage(), + status_change_subscriber=subscriber, + ) + instance1 = create_instance("instance1") + instance2 = create_instance("instance2") + instance3 = create_instance("instance3") + + assert (True, 1) == storage.batch_upsert_instances( + [instance1, instance2, instance3], + expected_storage_version=None, + ) + + assert (False, 1) == storage.batch_delete_instances( + instance_ids=["instance1"], expected_storage_version=0 + ) + assert (True, 2) == storage.batch_delete_instances(instance_ids=["instance1"]) + + assert subscriber.events == [ + InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance3", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance1", Instance.GARAGE_COLLECTED), + ] + + assert ( + { + "instance2": create_instance("instance2", version=1), + "instance3": create_instance("instance3", version=1), + }, + 2, + ) == storage.get_instances() + + assert (True, 3) == storage.batch_delete_instances( + instance_ids=["instance2"], expected_storage_version=2 + ) + + assert ( + { + "instance3": create_instance("instance3", version=1), + }, + 3, + ) == storage.get_instances() + + assert subscriber.events == [ + InstanceUpdateEvent("instance1", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance2", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance3", Instance.INSTANCE_STATUS_UNSPECIFIED), + InstanceUpdateEvent("instance1", Instance.GARAGE_COLLECTED), + InstanceUpdateEvent("instance2", Instance.GARAGE_COLLECTED), + ] + + +def test_get_instances(): + storage = InstanceStorage( + cluster_id="test_cluster", + storage=InMemoryStorage(), + ) + instance1 = create_instance("instance1", version=1) + instance2 = create_instance("instance2", status=Instance.RUNNING, version=1) + instance3 = create_instance("instance3", status=Instance.IDLE, version=1) + + assert (True, 1) == storage.batch_upsert_instances( + [copy.deepcopy(instance1), copy.deepcopy(instance2), copy.deepcopy(instance3)], + expected_storage_version=None, + ) + + assert ( + { + "instance1": instance1, + "instance2": instance2, + "instance3": instance3, + }, + 1, + ) == storage.get_instances() + + assert ( + { + "instance1": instance1, + "instance2": instance2, + }, + 1, + ) == storage.get_instances(instance_ids=["instance1", "instance2"]) + + assert ({"instance2": instance2}, 1) == storage.get_instances( + instance_ids=["instance1", "instance2"], status_filter={Instance.RUNNING} + ) + + assert ( + { + "instance2": instance2, + }, + 1, + ) == storage.get_instances(status_filter={Instance.RUNNING}) + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__]))