Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[autoscaler v2][5/n] introducing reconciler #34985

Merged
merged 48 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
20eccce
update
scv119 May 2, 2023
c02bd56
update
scv119 May 2, 2023
a14af6c
update
scv119 May 15, 2023
2c9337d
Merge remote-tracking branch 'upstream/master' into autoscaler/instan…
scv119 May 31, 2023
a690fed
update
scv119 May 31, 2023
ab4ca16
revert unintentional changes
scv119 May 31, 2023
278c260
update
scv119 Jun 5, 2023
235f8fd
Merge remote-tracking branch 'upstream/master' into autoscaler/instan…
scv119 Jun 5, 2023
22ad7f9
update
scv119 Jun 5, 2023
1e07b9b
update
scv119 Jun 5, 2023
acdccfb
update
scv119 Jun 5, 2023
78e9e91
update
scv119 Jun 5, 2023
712f267
update
scv119 Jun 5, 2023
9ee5147
update
scv119 Jun 5, 2023
2eafc08
update
scv119 Jun 5, 2023
5f5207e
update
scv119 Jun 5, 2023
67d4f35
update
scv119 Jun 5, 2023
907198d
update
scv119 Jun 5, 2023
eae577b
update
scv119 Jun 5, 2023
9d13bb9
update
scv119 Jun 5, 2023
39e9550
update
scv119 Jun 5, 2023
4c3acf1
update
scv119 Jun 5, 2023
0de8b80
Merge remote-tracking branch 'upstream/master' into autoscaler/instan…
scv119 Jun 12, 2023
d6fb4d0
update
scv119 Jun 12, 2023
deb7b7a
update
scv119 Jun 12, 2023
b0ebefc
update
scv119 Jun 12, 2023
d588ade
Merge remote-tracking branch 'upstream/master' into autoscaler/instan…
scv119 Jun 21, 2023
f9048f7
update
scv119 Jun 22, 2023
1f4ec43
update
scv119 Jun 26, 2023
ef4ab5e
update
scv119 Jun 26, 2023
b69d591
update
scv119 Jun 26, 2023
ca65cea
update
scv119 Jun 26, 2023
549aea3
update
scv119 Jun 26, 2023
3851b43
update
scv119 Jun 26, 2023
da1cbdb
update
scv119 Jun 26, 2023
846565b
update
scv119 Jul 5, 2023
bddfe53
Merge remote-tracking branch 'upstream/master' into autoscaler/instan…
scv119 Jul 5, 2023
1077fe3
update
scv119 Jul 5, 2023
a2f6fb0
update
scv119 Jul 5, 2023
d966e32
update
scv119 Jul 5, 2023
a394496
update
scv119 Jul 5, 2023
a1a57b1
Merge remote-tracking branch 'upstream/master' into autoscaler/instan…
scv119 Jul 18, 2023
3650e86
rebase
scv119 Jul 18, 2023
5ee8998
update
scv119 Jul 20, 2023
bc001ac
Merge remote-tracking branch 'upstream/master' into autoscaler/instan…
scv119 Jul 20, 2023
2d08c42
Merge branch 'master' into autoscaler/instance_manager
scv119 Jul 21, 2023
e6184f0
Merge branch 'master' into autoscaler/instance_manager
scv119 Jul 22, 2023
d7f9894
lint
scv119 Jul 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions python/ray/autoscaler/v2/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,29 @@ py_test(
deps = ["//:ray_lib",],
)

py_test(
name = "test_instance_launcher",
size = "small",
srcs = ["tests/test_instance_launcher.py"],
tags = ["team:core"],
deps = ["//:ray_lib",],
)

py_test(
name = "test_reconciler",
size = "small",
srcs = ["tests/test_reconciler.py"],
tags = ["team:core"],
deps = ["//:ray_lib",],
)

py_test(
name = "test_threaded_ray_installer",
size = "small",
srcs = ["tests/test_threaded_ray_installer.py"],
tags = ["team:core"],
deps = ["//:ray_lib",],
)

py_test(
name = "test_sdk",
Expand Down
Empty file.
88 changes: 57 additions & 31 deletions python/ray/autoscaler/v2/instance_manager/instance_storage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import copy
import logging
import time
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Optional, Set, Tuple
Expand All @@ -15,6 +17,7 @@ class InstanceUpdateEvent:

instance_id: str
new_status: int
new_ray_status: int = Instance.RAY_STATUS_UNKOWN


class InstanceUpdatedSuscriber(metaclass=ABCMeta):
Expand All @@ -39,7 +42,12 @@ def __init__(
self._storage = storage
self._cluster_id = cluster_id
self._table_name = f"instance_table@{cluster_id}"
self._status_change_subscriber = status_change_subscriber
self._status_change_subscribers = []
if status_change_subscriber:
self._status_change_subscribers.append(status_change_subscriber)

def add_status_change_subscriber(self, subscriber: InstanceUpdatedSuscriber):
self._status_change_subscribers.append(subscriber)

def batch_upsert_instances(
self,
Expand Down Expand Up @@ -68,25 +76,29 @@ def batch_upsert_instances(
return StoreStatus(False, version)

for instance in updates:
instance = copy.deepcopy(instance)
# the instance version is set to 0, it will be
# populated by the storage entry's verion on read
instance.version = 0
instance.timestamp_since_last_modified = int(time.time())
mutations[instance.instance_id] = instance.SerializeToString()

result, version = self._storage.batch_update(
self._table_name, mutations, {}, expected_storage_version
)

if result and self._status_change_subscriber:
self._status_change_subscriber.notify(
[
InstanceUpdateEvent(
instance_id=instance.instance_id,
new_status=instance.status,
)
for instance in updates
],
)
if result:
for subscriber in self._status_change_subscribers:
subscriber.notify(
[
InstanceUpdateEvent(
instance_id=instance.instance_id,
new_status=instance.status,
new_ray_status=instance.ray_status,
)
for instance in updates
],
)

return StoreStatus(result, version)

Expand Down Expand Up @@ -114,9 +126,11 @@ def upsert_instance(
Returns:
StoreStatus: A tuple of (success, storage_version).
"""
instance = copy.deepcopy(instance)
# the instance version is set to 0, it will be
# populated by the storage entry's verion on read
instance.version = 0
instance.timestamp_since_last_modified = int(time.time())
result, version = self._storage.update(
self._table_name,
key=instance.instance_id,
Expand All @@ -126,26 +140,34 @@ def upsert_instance(
insert_only=False,
)

if result and self._status_change_subscriber:
self._status_change_subscriber.notify(
[
InstanceUpdateEvent(
instance_id=instance.instance_id,
new_status=instance.status,
)
],
)
if result:
for subscriber in self._status_change_subscribers:
subscriber.notify(
[
InstanceUpdateEvent(
instance_id=instance.instance_id,
new_status=instance.status,
new_ray_status=instance.ray_status,
)
],
)

return StoreStatus(result, version)

def get_instances(
self, instance_ids: List[str] = None, status_filter: Set[int] = None
self,
instance_ids: List[str] = None,
status_filter: Set[int] = None,
ray_status_filter: Set[int] = None,
) -> Tuple[Dict[str, Instance], int]:
"""Get instances from the storage.

Args:
instance_ids: A list of instance ids to be retrieved. If empty, all
instances will be retrieved.
status_filter: Only instances with the specified status will be returned.
ray_status_filter: Only instances with the specified ray status will
be returned.

Returns:
Tuple[Dict[str, Instance], int]: A tuple of (instances, version).
Expand All @@ -161,6 +183,8 @@ def get_instances(
instance.version = entry_version
if status_filter and instance.status not in status_filter:
continue
if ray_status_filter and instance.ray_status not in ray_status_filter:
continue
instances[instance_id] = instance
return instances, version

Expand All @@ -186,14 +210,16 @@ def batch_delete_instances(
self._table_name, {}, instance_ids, expected_storage_version
)

if result[0] and self._status_change_subscriber:
self._status_change_subscriber.notify(
[
InstanceUpdateEvent(
instance_id=instance_id,
new_status=Instance.GARAGE_COLLECTED,
)
for instance_id in instance_ids
],
)
if result[0]:
for subscriber in self._status_change_subscribers:
subscriber.notify(
[
InstanceUpdateEvent(
instance_id=instance_id,
new_status=Instance.GARBAGE_COLLECTED,
new_ray_status=Instance.RAY_STATUS_UNKOWN,
)
for instance_id in instance_ids
],
)
return result
4 changes: 2 additions & 2 deletions python/ray/autoscaler/v2/instance_manager/node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ def _get_instance(self, cloud_instance_id: str) -> Instance:
instance = Instance()
instance.cloud_instance_id = cloud_instance_id
if self._provider.is_running(cloud_instance_id):
instance.status = Instance.STARTING
instance.status = Instance.ALLOCATED
elif self._provider.is_terminated(cloud_instance_id):
instance.status = Instance.STOPPED
else:
instance.status = Instance.INSTANCE_STATUS_UNSPECIFIED
instance.status = Instance.UNKNOWN
instance.internal_ip = self._provider.internal_ip(cloud_instance_id)
instance.external_ip = self._provider.external_ip(cloud_instance_id)
instance.instance_type = self._provider.node_tags(cloud_instance_id)[
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import logging
import math
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from typing import List

from ray.autoscaler._private.constants import (
AUTOSCALER_MAX_CONCURRENT_LAUNCHES,
AUTOSCALER_MAX_LAUNCH_BATCH,
)
from ray.autoscaler.v2.instance_manager.instance_storage import (
InstanceStorage,
InstanceUpdatedSuscriber,
InstanceUpdateEvent,
)
from ray.autoscaler.v2.instance_manager.node_provider import NodeProvider
from ray.core.generated.instance_manager_pb2 import Instance

logger = logging.getLogger(__name__)


class InstanceLauncher(InstanceUpdatedSuscriber):
scv119 marked this conversation as resolved.
Show resolved Hide resolved
"""InstanceLauncher is responsible for provisioning new instances."""

def __init__(
self,
instance_storage: InstanceStorage,
scv119 marked this conversation as resolved.
Show resolved Hide resolved
node_provider: NodeProvider,
max_concurrent_requests: int = math.ceil(
AUTOSCALER_MAX_CONCURRENT_LAUNCHES / float(AUTOSCALER_MAX_LAUNCH_BATCH)
),
max_instances_per_request: int = AUTOSCALER_MAX_LAUNCH_BATCH,
) -> None:
self._instance_storage = instance_storage
self._node_provider = node_provider
self._max_concurrent_requests = max_concurrent_requests
self._max_instances_per_request = max_instances_per_request
self._executor = ThreadPoolExecutor(max_workers=1)
self._launch_instance_executor = ThreadPoolExecutor(
max_workers=self._max_concurrent_requests
)

def notify(self, events: List[InstanceUpdateEvent]) -> None:
scv119 marked this conversation as resolved.
Show resolved Hide resolved
# TODO: we should do reconciliation based on events.
has_new_request = any(
[event.new_status == Instance.UNKNOWN for event in events]
)
if has_new_request:
self._executor.submit(self._may_launch_new_instances)

def _may_launch_new_instances(self):
new_instances, _ = self._instance_storage.get_instances(
status_filter={Instance.UNKNOWN}
)

if not new_instances:
logger.debug("No instances to launch")
return

queued_instances = []
for instance in new_instances.values():
instance.status = Instance.QUEUED
success, version = self._instance_storage.upsert_instance(
instance, expected_instance_version=instance.version
)
if success:
instance.version = version
queued_instances.append(instance)
else:
logger.error(f"Failed to update {instance} QUEUED")

instances_by_type = defaultdict(list)
for instance in queued_instances:
instances_by_type[instance.instance_type].append(instance)

for instance_type, instances in instances_by_type.items():
for i in range(0, len(instances), self._max_instances_per_request):
self._launch_instance_executor.submit(
self._launch_new_instances_by_type,
instance_type,
instances[
i : min(
i + self._max_instances_per_request,
len(instances),
)
],
)

def _launch_new_instances_by_type(
self, instance_type: str, instances: List[Instance]
) -> int:
"""Launches instances of the given type.

Args:
instance_type: type of instance to launch.
instances: list of instances to launch. These instances should
have been marked as QUEUED with instance_type set.
Returns:
num of instances launched.
"""
logger.info(f"Launching {len(instances)} instances of type {instance_type}")
scv119 marked this conversation as resolved.
Show resolved Hide resolved
instances_selected = []
for instance in instances:
instance.status = Instance.REQUESTED
result, version = self._instance_storage.upsert_instance(
instance, expected_instance_version=instance.version
)
if not result:
logger.warn(f"Failed to update instance {instance}")
continue
instance.version = version
instances_selected.append(instance)

if not instances_selected:
return 0

# TODO: idempotency token.
created_cloud_instances = self._node_provider.create_nodes(
instance_type, len(instances_selected)
)

assert len(created_cloud_instances) <= len(instances_selected)

instances_launched = 0
while created_cloud_instances and instances_selected:
cloud_instance = created_cloud_instances.pop()
instance = instances_selected.pop()
instance.cloud_instance_id = cloud_instance.cloud_instance_id
instance.internal_ip = cloud_instance.internal_ip
instance.external_ip = cloud_instance.external_ip
instance.status = Instance.ALLOCATED
instance.ray_status = Instance.RAY_STATUS_UNKOWN

# update instance status into the storage
result, _ = self._instance_storage.upsert_instance(
instance, expected_instance_version=instance.version
)

if not result:
# TODO: this could only happen when the request is canceled.
logger.warn(f"Failed to update instance {instance}")
# push the cloud instance back
created_cloud_instances.append(cloud_instance)
scv119 marked this conversation as resolved.
Show resolved Hide resolved
continue

instances_launched += 1

if created_cloud_instances:
# instances are leaked, we probably need to terminate them
for instance in created_cloud_instances:
self._node_provider.terminate_node(instance.cloud_instance_id)

if instances_selected:
# instances creation failed, we need to marke them allocation failed.
for instance in instances_selected:
instance.status = Instance.ALLOCATION_FAILED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where could we surface the reason of the failure? Are there explicit errors?

# TODO: add more information about the failure.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great details, I'm interested in failure information. Please let me know if it's already in the pipeline.

  1. Is this failure reason more about node_provider launch failure? If no, can we add that information? For example an EC2 instance for worker_node_1 failed to launch with capacity exception.
  2. Do we have plans to support as API (retrieve instance_storage)? For example, autoscaler_v2.get_failed_instances_info(instance_ids: List[str])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re 1: Yes, I think this is when less than selected instances could be started by the node_provider. As for surfacing the specific error info here, maybe we should do so via either metrics/events. Or as you mentioned in 2, through some of APIs to retrieve it.

cc @scv119

result, _ = self._instance_storage.upsert_instance(
instance, expected_instance_version=instance.version
)
# TODO: this could only happen when the request is canceled.
return instances_launched
Loading
Loading