-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[autoscaler] Autoscaler metrics #16066
Changes from 28 commits
46ea81b
7c7cd1a
f170974
4150bf0
771b304
3f11bc6
eb9edb1
b4130a8
09a8998
849c1ba
6f58b81
9050bab
7be69c1
8f682b4
51f7a5c
4259ecd
bb9896e
091610a
ee005b1
1d82167
a43b38e
25f55dc
f7013c2
b2bd1e5
a0c10f0
c069b8c
2c18da5
216060c
377d2c8
0f1a6b2
a6cc229
2edcb1a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
from collections import defaultdict, namedtuple, Counter | ||
from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics | ||
from typing import Any, Optional, Dict, List | ||
from urllib3.exceptions import MaxRetryError | ||
import copy | ||
|
@@ -30,10 +31,9 @@ | |
from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \ | ||
with_head_node_ip, hash_launch_conf, hash_runtime_conf, \ | ||
format_info_string | ||
from ray.autoscaler._private.constants import \ | ||
AUTOSCALER_MAX_NUM_FAILURES, AUTOSCALER_MAX_LAUNCH_BATCH, \ | ||
AUTOSCALER_MAX_CONCURRENT_LAUNCHES, AUTOSCALER_UPDATE_INTERVAL_S, \ | ||
AUTOSCALER_HEARTBEAT_TIMEOUT_S | ||
from ray.autoscaler._private.constants import AUTOSCALER_MAX_NUM_FAILURES, \ | ||
AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \ | ||
AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S | ||
from six.moves import queue | ||
|
||
logger = logging.getLogger(__name__) | ||
|
@@ -75,13 +75,19 @@ def __init__(self, | |
process_runner=subprocess, | ||
update_interval_s=AUTOSCALER_UPDATE_INTERVAL_S, | ||
prefix_cluster_info=False, | ||
event_summarizer=None): | ||
event_summarizer=None, | ||
prom_metrics=None): | ||
self.config_path = config_path | ||
# Prefix each line of info string with cluster name if True | ||
self.prefix_cluster_info = prefix_cluster_info | ||
# Keep this before self.reset (self.provider needs to be created | ||
# exactly once). | ||
self.provider = None | ||
# Keep this before self.reset (if an exception occurs in reset | ||
# then prom_metrics must be instantitiated to increment the | ||
# exception counter) | ||
self.prom_metrics = prom_metrics or \ | ||
AutoscalerPrometheusMetrics() | ||
self.resource_demand_scheduler = None | ||
self.reset(errors_fatal=True) | ||
self.head_node_ip = load_metrics.local_ip | ||
|
@@ -113,7 +119,7 @@ def __init__(self, | |
index=i, | ||
pending=self.pending_launches, | ||
node_types=self.available_node_types, | ||
) | ||
prom_metrics=self.prom_metrics) | ||
node_launcher.daemon = True | ||
node_launcher.start() | ||
|
||
|
@@ -131,14 +137,14 @@ def __init__(self, | |
|
||
for local_path in self.config["file_mounts"].values(): | ||
assert os.path.exists(local_path) | ||
|
||
logger.info("StandardAutoscaler: {}".format(self.config)) | ||
|
||
def update(self): | ||
try: | ||
self.reset(errors_fatal=False) | ||
self._update() | ||
except Exception as e: | ||
self.prom_metrics.update_loop_exceptions.inc() | ||
logger.exception("StandardAutoscaler: " | ||
"Error during autoscaling.") | ||
# Don't abort the autoscaler if the K8s API server is down. | ||
|
@@ -217,6 +223,7 @@ def _update(self): | |
self.provider.terminate_nodes(nodes_to_terminate) | ||
for node in nodes_to_terminate: | ||
self.node_tracker.untrack(node) | ||
self.prom_metrics.stopped_nodes.inc() | ||
nodes = self.workers() | ||
|
||
# Terminate nodes if there are too many | ||
|
@@ -237,6 +244,7 @@ def _update(self): | |
self.provider.terminate_nodes(nodes_to_terminate) | ||
for node in nodes_to_terminate: | ||
self.node_tracker.untrack(node) | ||
self.prom_metrics.stopped_nodes.inc() | ||
nodes = self.workers() | ||
|
||
to_launch = self.resource_demand_scheduler.get_nodes_to_launch( | ||
|
@@ -470,6 +478,7 @@ def reset(self, errors_fatal=False): | |
try: | ||
validate_config(new_config) | ||
except Exception as e: | ||
self.prom_metrics.config_validation_exceptions.inc() | ||
logger.debug( | ||
"Cluster config validation failed. The version of " | ||
"the ray CLI you launched this cluster with may " | ||
|
@@ -533,6 +542,7 @@ def reset(self, errors_fatal=False): | |
upscaling_speed) | ||
|
||
except Exception as e: | ||
self.prom_metrics.reset_exceptions.inc() | ||
if errors_fatal: | ||
raise e | ||
else: | ||
|
@@ -725,6 +735,7 @@ def launch_new_node(self, count: int, node_type: Optional[str]) -> None: | |
quantity=count, | ||
aggregate=operator.add) | ||
self.pending_launches.inc(node_type, count) | ||
self.prom_metrics.pending_nodes.set(self.pending_launches.value) | ||
config = copy.deepcopy(self.config) | ||
# Split into individual launch requests of the max batch size. | ||
while count > 0: | ||
|
@@ -736,8 +747,11 @@ def all_workers(self): | |
return self.workers() + self.unmanaged_workers() | ||
|
||
def workers(self): | ||
return self.provider.non_terminated_nodes( | ||
nodes = self.provider.non_terminated_nodes( | ||
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) | ||
# Update running nodes gauge whenever we check workers | ||
self.prom_metrics.running_nodes.set(len(nodes)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is just workers, it does not include the head node, right? |
||
return nodes | ||
|
||
def unmanaged_workers(self): | ||
return self.provider.non_terminated_nodes( | ||
|
@@ -750,6 +764,7 @@ def kill_workers(self): | |
self.provider.terminate_nodes(nodes) | ||
for node in nodes: | ||
self.node_tracker.untrack(node) | ||
self.prom_metrics.stopped_nodes.inc() | ||
logger.error("StandardAutoscaler: terminated {} node(s)".format( | ||
len(nodes))) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,13 +13,16 @@ | |
from typing import Optional | ||
|
||
import grpc | ||
import prometheus_client | ||
|
||
import ray | ||
from ray.autoscaler._private.autoscaler import StandardAutoscaler | ||
from ray.autoscaler._private.commands import teardown_cluster | ||
from ray.autoscaler._private.constants import AUTOSCALER_UPDATE_INTERVAL_S | ||
from ray.autoscaler._private.constants import AUTOSCALER_UPDATE_INTERVAL_S, \ | ||
AUTOSCALER_METRIC_PORT | ||
from ray.autoscaler._private.event_summarizer import EventSummarizer | ||
from ray.autoscaler._private.load_metrics import LoadMetrics | ||
from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics | ||
from ray.autoscaler._private.constants import \ | ||
AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE | ||
from ray.autoscaler._private.util import DEBUG_AUTOSCALING_STATUS, \ | ||
|
@@ -92,13 +95,16 @@ def __init__(self, | |
autoscaling_config, | ||
redis_password=None, | ||
prefix_cluster_info=False, | ||
monitor_ip=None, | ||
stop_event: Optional[Event] = None): | ||
# Initialize the Redis clients. | ||
ray.state.state._initialize_global_state( | ||
redis_address, redis_password=redis_password) | ||
self.redis = ray._private.services.create_redis_client( | ||
redis_address, password=redis_password) | ||
|
||
if monitor_ip: | ||
self.redis.set("AutoscalerMetricsAddress", | ||
f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}") | ||
(ip, port) = redis_address.split(":") | ||
self.gcs_client = connect_to_gcs(ip, int(port), redis_password) | ||
# Initialize the gcs stub for getting all node resource usage. | ||
|
@@ -126,6 +132,16 @@ def __init__(self, | |
self.autoscaling_config = autoscaling_config | ||
self.autoscaler = None | ||
|
||
self.prom_metrics = AutoscalerPrometheusMetrics() | ||
try: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be ok to skip this try-except block if The Kubernetes Operator currently runs multiple monitor processes at the same ip, which is why we're skipping doing this support for k8s right now. |
||
logger.info("Starting autoscaler metrics server on port {}".format( | ||
AUTOSCALER_METRIC_PORT)) | ||
prometheus_client.start_http_server( | ||
AUTOSCALER_METRIC_PORT, registry=self.prom_metrics.registry) | ||
except Exception: | ||
logger.exception( | ||
"An exception occurred while starting the metrics server.") | ||
|
||
logger.info("Monitor: Started") | ||
|
||
def __del__(self): | ||
|
@@ -137,7 +153,8 @@ def _initialize_autoscaler(self): | |
self.autoscaling_config, | ||
self.load_metrics, | ||
prefix_cluster_info=self.prefix_cluster_info, | ||
event_summarizer=self.event_summarizer) | ||
event_summarizer=self.event_summarizer, | ||
prom_metrics=self.prom_metrics) | ||
|
||
def update_load_metrics(self): | ||
"""Fetches resource usage data from GCS and updates load metrics.""" | ||
|
@@ -359,6 +376,12 @@ def run(self): | |
default=ray_constants.LOGGING_ROTATE_BACKUP_COUNT, | ||
help="Specify the backup count of rotated log file, default is " | ||
f"{ray_constants.LOGGING_ROTATE_BACKUP_COUNT}.") | ||
parser.add_argument( | ||
"--monitor-ip", | ||
required=False, | ||
type=str, | ||
default=None, | ||
help="The IP address of the machine hosting the monitor process.") | ||
args = parser.parse_args() | ||
setup_component_logger( | ||
logging_level=args.logging_level, | ||
|
@@ -381,6 +404,7 @@ def run(self): | |
monitor = Monitor( | ||
args.redis_address, | ||
autoscaling_config, | ||
redis_password=args.redis_password) | ||
redis_password=args.redis_password, | ||
monitor_ip=args.monitor_ip) | ||
|
||
monitor.run() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,11 +2,13 @@ | |
import copy | ||
import logging | ||
import threading | ||
import time | ||
|
||
from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_STATUS, | ||
TAG_RAY_NODE_KIND, TAG_RAY_NODE_NAME, | ||
TAG_RAY_USER_NODE_TYPE, STATUS_UNINITIALIZED, | ||
NODE_KIND_WORKER) | ||
from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics | ||
from ray.autoscaler._private.util import hash_launch_conf | ||
|
||
logger = logging.getLogger(__name__) | ||
|
@@ -19,12 +21,14 @@ def __init__(self, | |
provider, | ||
queue, | ||
pending, | ||
prom_metrics=None, | ||
node_types=None, | ||
index=None, | ||
*args, | ||
**kwargs): | ||
self.queue = queue | ||
self.pending = pending | ||
self.prom_metrics = prom_metrics or AutoscalerPrometheusMetrics() | ||
self.provider = provider | ||
self.node_types = node_types | ||
self.index = str(index) if index is not None else "" | ||
|
@@ -57,7 +61,16 @@ def _launch_node(self, config: Dict[str, Any], count: int, | |
if node_type: | ||
node_tags[TAG_RAY_USER_NODE_TYPE] = node_type | ||
node_config.update(launch_config) | ||
launch_start_time = time.time() | ||
self.provider.create_node(node_config, node_tags, count) | ||
startup_time = time.time() - launch_start_time | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this reflects startup time. For most (all?) providers, Not sure how easy that is to measure. |
||
for _ in range(count): | ||
# Note: when launching multiple nodes we observe the time it | ||
# took all nodes to start up for each node. For example, if 4 | ||
# nodes were launched in 25 seconds, we would observe the 25 | ||
# second startup time 4 times. | ||
self.prom_metrics.worker_startup_time.observe(startup_time) | ||
self.prom_metrics.started_nodes.inc(count) | ||
after = self.provider.non_terminated_nodes(tag_filters=worker_filter) | ||
if set(after).issubset(before): | ||
self.log("No new nodes reported after node creation.") | ||
|
@@ -69,9 +82,11 @@ def run(self): | |
try: | ||
self._launch_node(config, count, node_type) | ||
except Exception: | ||
self.prom_metrics.node_launch_exceptions.inc() | ||
logger.exception("Launch failed") | ||
finally: | ||
self.pending.dec(node_type, count) | ||
self.prom_metrics.pending_nodes.set(self.pending.value) | ||
ckw017 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def log(self, statement): | ||
prefix = "NodeLauncher{}:".format(self.index) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also increment
![Screen Shot 2021-05-31 at 3 06 04 PM](https://user-images.githubusercontent.com/21353794/120245351-bb7a5d00-c221-11eb-9422-ea38f4705148.png)
self.prom_metrics.stopped_nodes.inc()
here: