Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[autoscaler] Fix initialization artifacts #22570

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions docker/kuberay-autoscaler/run_autoscaler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import argparse
import logging
import os
import time

import ray
from ray import ray_constants
Expand Down Expand Up @@ -74,15 +73,6 @@ def setup_logging() -> None:
args.cluster_name, args.cluster_namespace
)

# When the entrypoint code reaches here,
# the GCS might not have collected information on the head node itself.
# That can lead to a annoying artifact at the start of the autoscaler logs:
# a status message showing no nodes at all connected to the Ray cluster.
# Wait a bit to avoid that artifact.
# TODO (Dmitri): Fix StandardAutoscaler.summary() to avoid the issue
# and remove the sleep.
time.sleep(5)

Monitor(
address=f"{head_ip}:6379",
redis_password=args.redis_password,
Expand Down
5 changes: 4 additions & 1 deletion python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,7 @@ def kill_workers(self):
self.prom_metrics.stopped_nodes.inc()
logger.error("StandardAutoscaler: terminated {} node(s)".format(len(nodes)))

def summary(self):
def summary(self) -> Optional[AutoscalerSummary]:
"""Summarizes the active, pending, and failed node launches.

An active node is a node whose raylet is actively reporting heartbeats.
Expand All @@ -1250,6 +1250,8 @@ def summary(self):
Returns:
AutoscalerSummary: The summary.
"""
if not self.non_terminated_nodes:
return None
active_nodes = Counter()
pending_nodes = []
failed_nodes = []
Expand Down Expand Up @@ -1309,4 +1311,5 @@ def summary(self):
def info_string(self):
lm_summary = self.load_metrics.summary()
autoscaler_summary = self.summary()
assert autoscaler_summary
return "\n" + format_info_string(lm_summary, autoscaler_summary)
26 changes: 18 additions & 8 deletions python/ray/autoscaler/_private/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,28 @@ def try_reload_log_state(provider_config: Dict[str, Any], log_state: dict) -> No

def debug_status(status, error) -> str:
"""Return a debug string for the autoscaler."""
if not status:
status = "No cluster status."
else:
if status:
status = status.decode("utf-8")
as_dict = json.loads(status)
time = datetime.datetime.fromtimestamp(as_dict["time"])
lm_summary = LoadMetricsSummary(**as_dict["load_metrics_report"])
autoscaler_summary = AutoscalerSummary(**as_dict["autoscaler_report"])
status = format_info_string(lm_summary, autoscaler_summary, time=time)
status_dict = json.loads(status)
lm_summary_dict = status_dict.get("load_metrics_report")
autoscaler_summary_dict = status_dict.get("autoscaler_report")
timestamp = status_dict.get("time")
if lm_summary_dict and autoscaler_summary_dict and timestamp:
lm_summary = LoadMetricsSummary(**lm_summary_dict)
autoscaler_summary = AutoscalerSummary(**autoscaler_summary_dict)
report_time = datetime.datetime.fromtimestamp(timestamp)
status = format_info_string(
lm_summary, autoscaler_summary, time=report_time
)
else:
status = "No cluster status."
else:
status = "No cluster status."

if error:
status += "\n"
status += error.decode("utf-8")

return status


Expand Down
6 changes: 6 additions & 0 deletions python/ray/autoscaler/_private/load_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ def __init__(self):
self.resource_requests = []
self.cluster_full_of_actors_detected = False

def __bool__(self):
"""A load metrics instance is Falsey iff the autoscaler process
has not received a resource message from the GCS.
"""
return bool(self.raylet_id_by_ip)

def update(
self,
ip: str,
Expand Down
18 changes: 14 additions & 4 deletions python/ray/autoscaler/_private/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,21 @@ def _run(self):
"monitor_pid": os.getpid(),
}

# Process autoscaling actions
if self.autoscaler:
# Only used to update the load metrics for the autoscaler.
if self.autoscaler and not self.load_metrics:
# load_metrics is Falsey iff we haven't collected any
# resource messages from the GCS, which can happen at startup if
# the GCS hasn't yet received data from the Raylets.
# In this case, do not do an autoscaler update.
# Wait to get load metrics.
logger.info(
"Autoscaler has not yet received load metrics. Waiting."
)
elif self.autoscaler:
# Process autoscaling actions
self.autoscaler.update()
status["autoscaler_report"] = asdict(self.autoscaler.summary())
autoscaler_summary = self.autoscaler.summary()
if autoscaler_summary:
status["autoscaler_report"] = asdict(autoscaler_summary)

for msg in self.event_summarizer.summary():
# Need to prefix each line of the message for the lines to
Expand Down
7 changes: 7 additions & 0 deletions python/ray/tests/test_autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2117,6 +2117,7 @@ def testMaxFailures(self):
prom_metrics=mock_metrics,
)
autoscaler.update()
assert autoscaler.summary() is None
assert mock_metrics.update_loop_exceptions.inc.call_count == 1
autoscaler.update()
assert mock_metrics.update_loop_exceptions.inc.call_count == 2
Expand Down Expand Up @@ -2323,6 +2324,12 @@ def testScaleDownMaxWorkers(self):
node_type_counts[node_type] += 1
assert node_type_counts == {"m4.large": 2, "p2.xlarge": 6}

def testFalseyLoadMetrics(self):
lm = LoadMetrics()
assert not lm
lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, {})
assert lm

def testScaleUpBasedOnLoad(self):
config = SMALL_CLUSTER.copy()
config["min_workers"] = 1
Expand Down