Skip to content

Commit

Permalink
[autoscaler][observability] Observability into when/why nodes fail to…
Browse files Browse the repository at this point in the history
… launch (ray-project#27697)

This change adds launch failures to the recent failures section of ray status when a node provider provides structured error information. For node providers which don't provide this optional information, there is now change in behavior.

For reference, when trying to launch a node type with a quota issue, it looks like the following. InsufficientInstanceCapacity is the standard term for this issue..

```
======== Autoscaler status: 2022-08-11 22:22:10.735647 ========
Node status
---------------------------------------------------------------
Healthy:
 1 cpu_4_ondemand
Pending:
 quota, 1 launching
Recent failures:
 quota: InsufficientInstanceCapacity (last_attempt: 22:22:00)

Resources
---------------------------------------------------------------
Usage:
 0.0/4.0 CPU
 0.00/9.079 GiB memory
 0.00/4.539 GiB object_store_memory

Demands:
 (no resource demands)
```

```
available_node_types:
    cpu_4_ondemand:
        node_config:
            InstanceType: m4.xlarge
            ImageId: latest_dlami
        resources: {}
        min_workers: 0
        max_workers: 0
    quota:
        node_config:
            InstanceType: p4d.24xlarge
            ImageId: latest_dlami
        resources: {}
        min_workers: 1
        max_workers: 1
```
Co-authored-by: Alex <[email protected]>

Signed-off-by: Stefan van der Kleij <[email protected]>
  • Loading branch information
Alex Wu authored and Stefan van der Kleij committed Aug 18, 2022
1 parent 0d7f060 commit dc1c078
Show file tree
Hide file tree
Showing 13 changed files with 613 additions and 35 deletions.
17 changes: 14 additions & 3 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import threading
import time
from collections import Counter, defaultdict, namedtuple
from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, FrozenSet, List, Optional, Set, Tuple, Union

Expand Down Expand Up @@ -35,6 +35,10 @@
record_local_head_state_if_needed,
)
from ray.autoscaler._private.node_launcher import BaseNodeLauncher, NodeLauncher
from ray.autoscaler._private.node_provider_availability_tracker import (
NodeAvailabilitySummary,
NodeProviderAvailabilityTracker,
)
from ray.autoscaler._private.node_tracker import NodeTracker
from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
from ray.autoscaler._private.providers import _get_node_provider
Expand Down Expand Up @@ -100,6 +104,9 @@ class AutoscalerSummary:
pending_nodes: List[Tuple[NodeIP, NodeType, NodeStatus]]
pending_launches: Dict[NodeType, int]
failed_nodes: List[Tuple[NodeIP, NodeType]]
node_availability_summary: NodeAvailabilitySummary = field(
default_factory=lambda: NodeAvailabilitySummary({})
)


class NonTerminatedNodes:
Expand Down Expand Up @@ -208,6 +215,7 @@ def read_fn():
else:
self.config_reader = config_reader

self.node_provider_availability_tracker = NodeProviderAvailabilityTracker()
# Prefix each line of info string with cluster name if True
self.prefix_cluster_info = prefix_cluster_info
# Keep this before self.reset (self.provider needs to be created
Expand Down Expand Up @@ -295,9 +303,10 @@ def read_fn():
self.foreground_node_launcher = BaseNodeLauncher(
provider=self.provider,
pending=self.pending_launches,
event_summarizer=self.event_summarizer,
node_provider_availability_tracker=self.node_provider_availability_tracker, # noqa: E501 Flake and black disagree how to format this.
node_types=self.available_node_types,
prom_metrics=self.prom_metrics,
event_summarizer=self.event_summarizer,
)
else:
self.launch_queue = queue.Queue()
Expand All @@ -308,9 +317,10 @@ def read_fn():
queue=self.launch_queue,
index=i,
pending=self.pending_launches,
event_summarizer=self.event_summarizer,
node_provider_availability_tracker=self.node_provider_availability_tracker, # noqa: E501 Flake and black disagreee how to format this.
node_types=self.available_node_types,
prom_metrics=self.prom_metrics,
event_summarizer=self.event_summarizer,
)
node_launcher.daemon = True
node_launcher.start()
Expand Down Expand Up @@ -1424,6 +1434,7 @@ def summary(self) -> Optional[AutoscalerSummary]:
pending_nodes=pending_nodes,
pending_launches=pending_launches,
failed_nodes=failed_nodes,
node_availability_summary=self.node_provider_availability_tracker.summary(),
)

def info_string(self):
Expand Down
20 changes: 16 additions & 4 deletions python/ray/autoscaler/_private/aws/node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ray.autoscaler._private.cli_logger import cf, cli_logger
from ray.autoscaler._private.constants import BOTO_CREATE_MAX_RETRIES, BOTO_MAX_RETRIES
from ray.autoscaler._private.log_timer import LogTimer
from ray.autoscaler.node_launch_exception import NodeLaunchException
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.tags import (
TAG_RAY_CLUSTER_NAME,
Expand Down Expand Up @@ -448,7 +449,22 @@ def _create_node(self, node_config, tags, count):
)
break
except botocore.exceptions.ClientError as exc:
# Launch failure may be due to instance type availability in
# the given AZ
subnet_idx += 1
if attempt == max_tries:
try:
exc = NodeLaunchException(
category=exc.response["Error"]["Code"],
description=exc.response["Error"]["Message"],
source_exception=exc,
)
except Exception:
# In theory, all ClientError's we expect to get should
# have these fields, but just in case we can't parse
# it, it's fine, just throw the original error.
cli_logger.warning("Couldn't parse exception.", exc)
pass
cli_logger.abort(
"Failed to launch instances. Max attempts exceeded.",
exc=exc,
Expand All @@ -458,10 +474,6 @@ def _create_node(self, node_config, tags, count):
"create_instances: Attempt failed with {}, retrying.", exc
)

# Launch failure may be due to instance type availability in
# the given AZ
subnet_idx += 1

return created_nodes_dict

def terminate_node(self, node_id):
Expand Down
14 changes: 13 additions & 1 deletion python/ray/autoscaler/_private/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
)
from ray.autoscaler._private.event_system import CreateClusterEvent, global_event_system
from ray.autoscaler._private.log_timer import LogTimer
from ray.autoscaler._private.node_provider_availability_tracker import (
NodeAvailabilitySummary,
)
from ray.autoscaler._private.providers import (
_NODE_PROVIDERS,
_PROVIDER_PRETTY_NAMES,
Expand Down Expand Up @@ -132,7 +135,16 @@ def debug_status(status, error) -> str:
timestamp = status_dict.get("time")
if lm_summary_dict and autoscaler_summary_dict and timestamp:
lm_summary = LoadMetricsSummary(**lm_summary_dict)
autoscaler_summary = AutoscalerSummary(**autoscaler_summary_dict)
node_availability_summary_dict = autoscaler_summary_dict.pop(
"node_availability_summary", {}
)
node_availability_summary = NodeAvailabilitySummary.from_fields(
**node_availability_summary_dict
)
autoscaler_summary = AutoscalerSummary(
node_availability_summary=node_availability_summary,
**autoscaler_summary_dict,
)
report_time = datetime.datetime.fromtimestamp(timestamp)
status = format_info_string(
lm_summary, autoscaler_summary, time=report_time
Expand Down
4 changes: 4 additions & 0 deletions python/ray/autoscaler/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ def env_integer(key, default):

AUTOSCALER_MAX_FAILURES_DISPLAYED = 20

AUTOSCALER_NODE_AVAILABILITY_MAX_STALENESS_S = env_integer(
"AUTOSCALER_NODE_AVAILABILITY_MAX_STALENESS_S", 30 * 60
)

# The maximum allowed resource demand vector size to guarantee the resource
# demand scheduler bin packing algorithm takes a reasonable amount of time
# to run.
Expand Down
36 changes: 32 additions & 4 deletions python/ray/autoscaler/_private/node_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
import traceback
from typing import Any, Dict, Optional

from ray.autoscaler._private.node_provider_availability_tracker import (
NodeProviderAvailabilityTracker,
)
from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
from ray.autoscaler._private.util import hash_launch_conf
from ray.autoscaler.node_launch_exception import NodeLaunchException
from ray.autoscaler.tags import (
NODE_KIND_WORKER,
STATUS_UNINITIALIZED,
Expand Down Expand Up @@ -39,21 +43,24 @@ def __init__(
provider,
pending,
event_summarizer,
node_provider_availability_tracker: NodeProviderAvailabilityTracker,
prom_metrics=None,
node_types=None,
index=None,
*args,
**kwargs,
):
self.pending = pending
self.event_summarizer = event_summarizer
self.node_provider_availability_tracker = node_provider_availability_tracker
self.prom_metrics = prom_metrics or AutoscalerPrometheusMetrics()
self.provider = provider
self.node_types = node_types
self.index = str(index) if index is not None else ""
self.event_summarizer = event_summarizer

def launch_node(self, config: Dict[str, Any], count: int, node_type: Optional[str]):
self.log("Got {} nodes to launch.".format(count))
node_launch_start_time = time.time()
try:
self._launch_node(config, count, node_type)
except Exception:
Expand All @@ -74,6 +81,12 @@ def launch_node(self, config: Dict[str, Any], count: int, node_type: Optional[st
interval_s=60,
)
logger.exception("Launch failed")
else:
self.node_provider_availability_tracker.update_node_availability(
node_type=node_type,
timestamp=node_launch_start_time,
node_launch_exception=None,
)
finally:
self.pending.dec(node_type, count)
self.prom_metrics.pending_nodes.set(self.pending.value)
Expand Down Expand Up @@ -111,9 +124,22 @@ def _launch_node(
node_tags[TAG_RAY_USER_NODE_TYPE] = node_type
node_config.update(launch_config)
launch_start_time = time.time()
self.provider.create_node_with_resources(
node_config, node_tags, count, resources
)
try:
self.provider.create_node_with_resources(
node_config, node_tags, count, resources
)
except NodeLaunchException as node_launch_exception:
self.node_provider_availability_tracker.update_node_availability(
node_type, launch_start_time, node_launch_exception
)
# Do some special handling if we have a structured error.
self.log(
f"Failed to launch {node_type}: "
f"({node_launch_exception.category}):"
f"{node_launch_exception.description}"
)
# Reraise to trigger the more general exception handling code.
raise node_launch_exception.source_exception
launch_time = time.time() - launch_start_time
for _ in range(count):
# Note: when launching multiple nodes we observe the time it
Expand All @@ -140,6 +166,7 @@ def __init__(
queue,
pending,
event_summarizer,
node_provider_availability_tracker,
prom_metrics=None,
node_types=None,
index=None,
Expand All @@ -152,6 +179,7 @@ def __init__(
provider=provider,
pending=pending,
event_summarizer=event_summarizer,
node_provider_availability_tracker=node_provider_availability_tracker,
prom_metrics=prom_metrics,
node_types=node_types,
index=index,
Expand Down
Loading

0 comments on commit dc1c078

Please sign in to comment.