diff --git a/python/ray/_private/metrics_agent.py b/python/ray/_private/metrics_agent.py index 0ecc3514208f8..0cc674401c902 100644 --- a/python/ray/_private/metrics_agent.py +++ b/python/ray/_private/metrics_agent.py @@ -21,6 +21,7 @@ from opencensus.tags import tag_value as tag_value_module import ray +from ray._private import services import ray._private.prometheus_exporter as prometheus_exporter from ray.core.generated.metrics_pb2 import Metric @@ -185,18 +186,25 @@ class PrometheusServiceDiscoveryWriter(threading.Thread): def __init__(self, redis_address, redis_password, temp_dir): ray.state.state._initialize_global_state( redis_address=redis_address, redis_password=redis_password) + self.redis_address = redis_address + self.redis_password = redis_password self.temp_dir = temp_dir self.default_service_discovery_flush_period = 5 super().__init__() def get_file_discovery_content(self): - """Return the content for Prometheus serivce discovery.""" + """Return the content for Prometheus service discovery.""" nodes = ray.nodes() metrics_export_addresses = [ "{}:{}".format(node["NodeManagerAddress"], node["MetricsExportPort"]) for node in nodes if node["alive"] is True ] + redis_client = services.create_redis_client(self.redis_address, + self.redis_password) + autoscaler_addr = redis_client.get("AutoscalerMetricsAddress") + if autoscaler_addr: + metrics_export_addresses.append(autoscaler_addr.decode("utf-8")) return json.dumps([{ "labels": { "job": "ray" diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index d49c73a0825c6..f2f7d6fa774e9 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -1834,7 +1834,8 @@ def start_monitor(redis_address, redis_password=None, fate_share=None, max_bytes=0, - backup_count=0): + backup_count=0, + monitor_ip=None): """Run a process to monitor the other processes. Args: @@ -1850,7 +1851,8 @@ def start_monitor(redis_address, RotatingFileHandler's maxBytes. backup_count (int): Log rotation parameter. Corresponding to RotatingFileHandler's backupCount. - + monitor_ip (str): IP address of the machine that the monitor will be + run on. Can be excluded, but required for autoscaler metrics. Returns: ProcessInfo for the process that was started. """ @@ -1865,6 +1867,8 @@ def start_monitor(redis_address, command.append("--autoscaling-config=" + str(autoscaling_config)) if redis_password: command.append("--redis-password=" + redis_password) + if monitor_ip: + command.append("--monitor-ip=" + monitor_ip) process_info = start_ray_process( command, ray_constants.PROCESS_TYPE_MONITOR, diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 392d328b1c5af..97f37e6d39f5b 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -1,4 +1,5 @@ from collections import defaultdict, namedtuple, Counter +from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics from typing import Any, Optional, Dict, List from urllib3.exceptions import MaxRetryError import copy @@ -30,10 +31,9 @@ from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \ with_head_node_ip, hash_launch_conf, hash_runtime_conf, \ format_info_string -from ray.autoscaler._private.constants import \ - AUTOSCALER_MAX_NUM_FAILURES, AUTOSCALER_MAX_LAUNCH_BATCH, \ - AUTOSCALER_MAX_CONCURRENT_LAUNCHES, AUTOSCALER_UPDATE_INTERVAL_S, \ - AUTOSCALER_HEARTBEAT_TIMEOUT_S +from ray.autoscaler._private.constants import AUTOSCALER_MAX_NUM_FAILURES, \ + AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \ + AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S from six.moves import queue logger = logging.getLogger(__name__) @@ -75,13 +75,19 @@ def __init__(self, process_runner=subprocess, update_interval_s=AUTOSCALER_UPDATE_INTERVAL_S, prefix_cluster_info=False, - event_summarizer=None): + event_summarizer=None, + prom_metrics=None): self.config_path = config_path # Prefix each line of info string with cluster name if True self.prefix_cluster_info = prefix_cluster_info # Keep this before self.reset (self.provider needs to be created # exactly once). self.provider = None + # Keep this before self.reset (if an exception occurs in reset + # then prom_metrics must be instantitiated to increment the + # exception counter) + self.prom_metrics = prom_metrics or \ + AutoscalerPrometheusMetrics() self.resource_demand_scheduler = None self.reset(errors_fatal=True) self.head_node_ip = load_metrics.local_ip @@ -113,7 +119,7 @@ def __init__(self, index=i, pending=self.pending_launches, node_types=self.available_node_types, - ) + prom_metrics=self.prom_metrics) node_launcher.daemon = True node_launcher.start() @@ -131,7 +137,6 @@ def __init__(self, for local_path in self.config["file_mounts"].values(): assert os.path.exists(local_path) - logger.info("StandardAutoscaler: {}".format(self.config)) def update(self): @@ -139,6 +144,7 @@ def update(self): self.reset(errors_fatal=False) self._update() except Exception as e: + self.prom_metrics.update_loop_exceptions.inc() logger.exception("StandardAutoscaler: " "Error during autoscaling.") # Don't abort the autoscaler if the K8s API server is down. @@ -217,6 +223,7 @@ def _update(self): self.provider.terminate_nodes(nodes_to_terminate) for node in nodes_to_terminate: self.node_tracker.untrack(node) + self.prom_metrics.stopped_nodes.inc() nodes = self.workers() # Terminate nodes if there are too many @@ -237,6 +244,7 @@ def _update(self): self.provider.terminate_nodes(nodes_to_terminate) for node in nodes_to_terminate: self.node_tracker.untrack(node) + self.prom_metrics.stopped_nodes.inc() nodes = self.workers() to_launch = self.resource_demand_scheduler.get_nodes_to_launch( @@ -295,6 +303,8 @@ def _update(self): " Failed to update node." " Node has already been terminated.") if nodes_to_terminate: + self.prom_metrics.stopped_nodes.inc( + len(nodes_to_terminate)) self.provider.terminate_nodes(nodes_to_terminate) nodes = self.workers() @@ -470,6 +480,7 @@ def reset(self, errors_fatal=False): try: validate_config(new_config) except Exception as e: + self.prom_metrics.config_validation_exceptions.inc() logger.debug( "Cluster config validation failed. The version of " "the ray CLI you launched this cluster with may " @@ -533,6 +544,7 @@ def reset(self, errors_fatal=False): upscaling_speed) except Exception as e: + self.prom_metrics.reset_exceptions.inc() if errors_fatal: raise e else: @@ -725,6 +737,7 @@ def launch_new_node(self, count: int, node_type: Optional[str]) -> None: quantity=count, aggregate=operator.add) self.pending_launches.inc(node_type, count) + self.prom_metrics.pending_nodes.set(self.pending_launches.value) config = copy.deepcopy(self.config) # Split into individual launch requests of the max batch size. while count > 0: @@ -736,8 +749,11 @@ def all_workers(self): return self.workers() + self.unmanaged_workers() def workers(self): - return self.provider.non_terminated_nodes( + nodes = self.provider.non_terminated_nodes( tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) + # Update running nodes gauge whenever we check workers + self.prom_metrics.running_workers.set(len(nodes)) + return nodes def unmanaged_workers(self): return self.provider.non_terminated_nodes( @@ -750,6 +766,7 @@ def kill_workers(self): self.provider.terminate_nodes(nodes) for node in nodes: self.node_tracker.untrack(node) + self.prom_metrics.stopped_nodes.inc() logger.error("StandardAutoscaler: terminated {} node(s)".format( len(nodes))) diff --git a/python/ray/autoscaler/_private/constants.py b/python/ray/autoscaler/_private/constants.py index e7010f1e9f1c1..c5556c7acced8 100644 --- a/python/ray/autoscaler/_private/constants.py +++ b/python/ray/autoscaler/_private/constants.py @@ -61,6 +61,9 @@ def env_integer(key, default): # to run. AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE = 1000 +# Port that autoscaler prometheus metrics will be exported to +AUTOSCALER_METRIC_PORT = env_integer("AUTOSCALER_METRIC_PORT", 44217) + # Max number of retries to AWS (default is 5, time increases exponentially) BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12) # Max number of retries to create an EC2 node (retry different subnet) diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index b17a4aeb4139e..cc0d74d1154e5 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -13,13 +13,16 @@ from typing import Optional import grpc +import prometheus_client import ray from ray.autoscaler._private.autoscaler import StandardAutoscaler from ray.autoscaler._private.commands import teardown_cluster -from ray.autoscaler._private.constants import AUTOSCALER_UPDATE_INTERVAL_S +from ray.autoscaler._private.constants import AUTOSCALER_UPDATE_INTERVAL_S, \ + AUTOSCALER_METRIC_PORT from ray.autoscaler._private.event_summarizer import EventSummarizer from ray.autoscaler._private.load_metrics import LoadMetrics +from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics from ray.autoscaler._private.constants import \ AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE from ray.autoscaler._private.util import DEBUG_AUTOSCALING_STATUS, \ @@ -92,13 +95,16 @@ def __init__(self, autoscaling_config, redis_password=None, prefix_cluster_info=False, + monitor_ip=None, stop_event: Optional[Event] = None): # Initialize the Redis clients. ray.state.state._initialize_global_state( redis_address, redis_password=redis_password) self.redis = ray._private.services.create_redis_client( redis_address, password=redis_password) - + if monitor_ip: + self.redis.set("AutoscalerMetricsAddress", + f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}") (ip, port) = redis_address.split(":") self.gcs_client = connect_to_gcs(ip, int(port), redis_password) # Initialize the gcs stub for getting all node resource usage. @@ -126,6 +132,22 @@ def __init__(self, self.autoscaling_config = autoscaling_config self.autoscaler = None + self.prom_metrics = AutoscalerPrometheusMetrics() + if monitor_ip: + # If monitor_ip wasn't passed in, then don't attempt to start the + # metric server to keep behavior identical to before metrics were + # introduced + try: + logger.info( + "Starting autoscaler metrics server on port {}".format( + AUTOSCALER_METRIC_PORT)) + prometheus_client.start_http_server( + AUTOSCALER_METRIC_PORT, + registry=self.prom_metrics.registry) + except Exception: + logger.exception( + "An exception occurred while starting the metrics server.") + logger.info("Monitor: Started") def __del__(self): @@ -137,7 +159,8 @@ def _initialize_autoscaler(self): self.autoscaling_config, self.load_metrics, prefix_cluster_info=self.prefix_cluster_info, - event_summarizer=self.event_summarizer) + event_summarizer=self.event_summarizer, + prom_metrics=self.prom_metrics) def update_load_metrics(self): """Fetches resource usage data from GCS and updates load metrics.""" @@ -359,6 +382,12 @@ def run(self): default=ray_constants.LOGGING_ROTATE_BACKUP_COUNT, help="Specify the backup count of rotated log file, default is " f"{ray_constants.LOGGING_ROTATE_BACKUP_COUNT}.") + parser.add_argument( + "--monitor-ip", + required=False, + type=str, + default=None, + help="The IP address of the machine hosting the monitor process.") args = parser.parse_args() setup_component_logger( logging_level=args.logging_level, @@ -381,6 +410,7 @@ def run(self): monitor = Monitor( args.redis_address, autoscaling_config, - redis_password=args.redis_password) + redis_password=args.redis_password, + monitor_ip=args.monitor_ip) monitor.run() diff --git a/python/ray/autoscaler/_private/node_launcher.py b/python/ray/autoscaler/_private/node_launcher.py index dc3a8b32953d1..a913f219c4357 100644 --- a/python/ray/autoscaler/_private/node_launcher.py +++ b/python/ray/autoscaler/_private/node_launcher.py @@ -7,6 +7,7 @@ TAG_RAY_NODE_KIND, TAG_RAY_NODE_NAME, TAG_RAY_USER_NODE_TYPE, STATUS_UNINITIALIZED, NODE_KIND_WORKER) +from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics from ray.autoscaler._private.util import hash_launch_conf logger = logging.getLogger(__name__) @@ -19,12 +20,14 @@ def __init__(self, provider, queue, pending, + prom_metrics=None, node_types=None, index=None, *args, **kwargs): self.queue = queue self.pending = pending + self.prom_metrics = prom_metrics or AutoscalerPrometheusMetrics() self.provider = provider self.node_types = node_types self.index = str(index) if index is not None else "" @@ -58,6 +61,7 @@ def _launch_node(self, config: Dict[str, Any], count: int, node_tags[TAG_RAY_USER_NODE_TYPE] = node_type node_config.update(launch_config) self.provider.create_node(node_config, node_tags, count) + self.prom_metrics.started_nodes.inc(count) after = self.provider.non_terminated_nodes(tag_filters=worker_filter) if set(after).issubset(before): self.log("No new nodes reported after node creation.") @@ -69,9 +73,11 @@ def run(self): try: self._launch_node(config, count, node_type) except Exception: + self.prom_metrics.node_launch_exceptions.inc() logger.exception("Launch failed") finally: self.pending.dec(node_type, count) + self.prom_metrics.pending_nodes.set(self.pending.value) def log(self, statement): prefix = "NodeLauncher{}:".format(self.index) diff --git a/python/ray/autoscaler/_private/prom_metrics.py b/python/ray/autoscaler/_private/prom_metrics.py new file mode 100644 index 0000000000000..07992bfb29dae --- /dev/null +++ b/python/ray/autoscaler/_private/prom_metrics.py @@ -0,0 +1,63 @@ +from prometheus_client import ( + CollectorRegistry, + Counter, + Gauge, +) + + +# The metrics in this class should be kept in sync with +# python/ray/tests/test_metrics_agent.py +class AutoscalerPrometheusMetrics: + def __init__(self, registry: CollectorRegistry = None): + self.registry: CollectorRegistry = registry or \ + CollectorRegistry(auto_describe=True) + self.pending_nodes: Gauge = Gauge( + "pending_nodes", + "Number of nodes pending to be started.", + unit="nodes", + namespace="autoscaler", + registry=self.registry) + self.started_nodes: Counter = Counter( + "started_nodes", + "Number of nodes started.", + unit="nodes", + namespace="autoscaler", + registry=self.registry) + self.stopped_nodes: Counter = Counter( + "stopped_nodes", + "Number of nodes stopped.", + unit="nodes", + namespace="autoscaler", + registry=self.registry) + self.running_workers: Gauge = Gauge( + "running_workers", + "Number of worker nodes running.", + unit="nodes", + namespace="autoscaler", + registry=self.registry) + self.update_loop_exceptions: Counter = Counter( + "update_loop_exceptions", + "Number of exceptions raised in the update loop of the " + "autoscaler.", + unit="exceptions", + namespace="autoscaler", + registry=self.registry) + self.node_launch_exceptions: Counter = Counter( + "node_launch_exceptions", + "Number of exceptions raised while launching nodes.", + unit="exceptions", + namespace="autoscaler", + registry=self.registry) + self.reset_exceptions: Counter = Counter( + "reset_exceptions", + "Number of exceptions raised while resetting the autoscaler.", + unit="exceptions", + namespace="autoscaler", + registry=self.registry) + self.config_validation_exceptions: Counter = Counter( + "config_validation_exceptions", + "Number of exceptions raised while validating the config " + "during a reset.", + unit="exceptions", + namespace="autoscaler", + registry=self.registry) diff --git a/python/ray/node.py b/python/ray/node.py index ed473c3386a95..e7abeef911d02 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -830,7 +830,8 @@ def start_monitor(self): redis_password=self._ray_params.redis_password, fate_share=self.kernel_fate_share, max_bytes=self.max_bytes, - backup_count=self.backup_count) + backup_count=self.backup_count, + monitor_ip=self._node_ip_address) assert ray_constants.PROCESS_TYPE_MONITOR not in self.all_processes self.all_processes[ray_constants.PROCESS_TYPE_MONITOR] = [process_info] diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 23d95abcb4edc..829b86c76fabb 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -19,6 +19,7 @@ from ray.autoscaler.sdk import get_docker_host_mount_location from ray.autoscaler._private.load_metrics import LoadMetrics from ray.autoscaler._private.autoscaler import StandardAutoscaler +from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics from ray.autoscaler._private.providers import ( _NODE_PROVIDERS, _clear_provider_cache, _DEFAULT_CONFIGS) from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_NODE_STATUS, \ @@ -160,6 +161,7 @@ def __init__(self, cache_stopped=False, unique_ips=False): self.mock_nodes = {} self.next_id = 0 self.throw = False + self.error_creates = False self.fail_creates = False self.ready_to_create = threading.Event() self.ready_to_create.set() @@ -212,6 +214,8 @@ def external_ip(self, node_id): return self.mock_nodes[node_id].external_ip def create_node(self, node_config, tags, count, _skip_wait=False): + if self.error_creates: + raise Exception if not _skip_wait: self.ready_to_create.wait() if self.fail_creates: @@ -878,18 +882,28 @@ def testScaleUp(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() runner = MockProcessRunner() + mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) autoscaler = StandardAutoscaler( config_path, LoadMetrics(), max_failures=0, process_runner=runner, - update_interval_s=0) + update_interval_s=0, + prom_metrics=mock_metrics) assert len(self.provider.non_terminated_nodes({})) == 0 autoscaler.update() self.waitForNodes(2) + + # started_nodes metric should have been incremented by 2 + assert mock_metrics.started_nodes.inc.call_count == 1 + mock_metrics.started_nodes.inc.assert_called_with(2) + autoscaler.update() self.waitForNodes(2) + # running_workers metric should be set to 2 + mock_metrics.running_workers.set.assert_called_with(2) + def testTerminateOutdatedNodesGracefully(self): config = SMALL_CLUSTER.copy() config["min_workers"] = 5 @@ -903,12 +917,14 @@ def testTerminateOutdatedNodesGracefully(self): }, 10) runner = MockProcessRunner() runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)]) + mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) autoscaler = StandardAutoscaler( config_path, LoadMetrics(), max_failures=0, process_runner=runner, - update_interval_s=0) + update_interval_s=0, + prom_metrics=mock_metrics) self.waitForNodes(10) # Gradually scales down to meet target size, never going too low @@ -925,6 +941,8 @@ def testTerminateOutdatedNodesGracefully(self): events = autoscaler.event_summarizer.summary() assert ("Removing 10 nodes of type " "ray-legacy-worker-node-type (outdated)." in events), events + assert mock_metrics.stopped_nodes.inc.call_count == 10 + mock_metrics.started_nodes.inc.assert_called_with(5) def testDynamicScaling(self): config_path = self.write_config(SMALL_CLUSTER) @@ -938,6 +956,7 @@ def testDynamicScaling(self): TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD }, 1) lm.update("172.0.0.0", {"CPU": 1}, {"CPU": 0}, {}) + mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) autoscaler = StandardAutoscaler( config_path, lm, @@ -945,7 +964,8 @@ def testDynamicScaling(self): max_concurrent_launches=5, max_failures=0, process_runner=runner, - update_interval_s=0) + update_interval_s=0, + prom_metrics=mock_metrics) self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) autoscaler.update() self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) @@ -977,6 +997,8 @@ def testDynamicScaling(self): events = autoscaler.event_summarizer.summary() assert ("Removing 1 nodes of type " "ray-legacy-worker-node-type (max workers)." in events), events + assert mock_metrics.stopped_nodes.inc.call_count == 1 + mock_metrics.running_workers.set.assert_called_with(10) def testInitialWorkers(self): """initial_workers is deprecated, this tests that it is ignored.""" @@ -1281,6 +1303,7 @@ def testDelayedLaunchWithMinWorkers(self): self.provider = MockProvider() runner = MockProcessRunner() runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)]) + mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) autoscaler = StandardAutoscaler( config_path, LoadMetrics(), @@ -1288,7 +1311,8 @@ def testDelayedLaunchWithMinWorkers(self): max_concurrent_launches=8, max_failures=0, process_runner=runner, - update_interval_s=0) + update_interval_s=0, + prom_metrics=mock_metrics) assert len(self.provider.non_terminated_nodes({})) == 0 # update() should launch a wave of 5 nodes (max_launch_batch) @@ -1300,6 +1324,7 @@ def testDelayedLaunchWithMinWorkers(self): waiters = rtc1._cond._waiters self.waitFor(lambda: len(waiters) == 2) assert autoscaler.pending_launches.value == 10 + mock_metrics.pending_nodes.set.assert_called_with(10) assert len(self.provider.non_terminated_nodes({})) == 0 autoscaler.update() self.waitForNodes(0) # Nodes are not added on top of pending. @@ -1308,9 +1333,11 @@ def testDelayedLaunchWithMinWorkers(self): assert len(self.provider.non_terminated_nodes({})) == 10 self.waitForNodes(10) assert autoscaler.pending_launches.value == 0 + mock_metrics.pending_nodes.set.assert_called_with(0) autoscaler.update() self.waitForNodes(10) assert autoscaler.pending_launches.value == 0 + mock_metrics.pending_nodes.set.assert_called_with(0) def testUpdateThrottling(self): config_path = self.write_config(SMALL_CLUSTER) @@ -1368,6 +1395,7 @@ def testIgnoresCorruptedConfig(self): }, 1) lm = LoadMetrics() lm.update("172.0.0.0", {"CPU": 1}, {"CPU": 0}, {}) + mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) autoscaler = StandardAutoscaler( config_path, lm, @@ -1375,7 +1403,8 @@ def testIgnoresCorruptedConfig(self): max_concurrent_launches=10, process_runner=runner, max_failures=0, - update_interval_s=0) + update_interval_s=0, + prom_metrics=mock_metrics) autoscaler.update() self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) @@ -1383,6 +1412,8 @@ def testIgnoresCorruptedConfig(self): self.write_config("asdf", call_prepare_config=False) for _ in range(10): autoscaler.update() + # config validation exceptions metrics should be incremented 10 times + assert mock_metrics.config_validation_exceptions.inc.call_count == 10 time.sleep(0.1) assert autoscaler.pending_launches.value == 0 assert len( @@ -1409,14 +1440,18 @@ def testMaxFailures(self): self.provider = MockProvider() self.provider.throw = True runner = MockProcessRunner() + mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) autoscaler = StandardAutoscaler( config_path, LoadMetrics(), max_failures=2, process_runner=runner, - update_interval_s=0) + update_interval_s=0, + prom_metrics=mock_metrics) autoscaler.update() + assert mock_metrics.update_loop_exceptions.inc.call_count == 1 autoscaler.update() + assert mock_metrics.update_loop_exceptions.inc.call_count == 2 with pytest.raises(Exception): autoscaler.update() @@ -2270,12 +2305,14 @@ def testNodeTerminatedDuringUpdate(self): self.provider = MockProvider() runner = MockProcessRunner() lm = LoadMetrics() + mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) autoscaler = StandardAutoscaler( config_path, lm, max_failures=0, process_runner=runner, - update_interval_s=0) + update_interval_s=0, + prom_metrics=mock_metrics) # Scale up to two up-to-date workers autoscaler.update() @@ -2353,6 +2390,30 @@ def terminate_worker_zero(): assert set(autoscaler.workers()) == {2, 3},\ "Unexpected node_ids" + assert len(mock_metrics.stopped_nodes.mock_calls) == 1 + + def testProviderException(self): + config_path = self.write_config(SMALL_CLUSTER) + self.provider = MockProvider() + self.provider.error_creates = True + runner = MockProcessRunner() + mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) + autoscaler = StandardAutoscaler( + config_path, + LoadMetrics(), + max_failures=0, + process_runner=runner, + update_interval_s=0, + prom_metrics=mock_metrics) + autoscaler.update() + + def exceptions_incremented(): + return mock_metrics.node_launch_exceptions.inc.call_count == 1 + + self.waitFor( + exceptions_incremented, + fail_msg="Expected to see a node launch exception") + if __name__ == "__main__": import sys diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index 6026d711069c8..c2dc8ce36c177 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -9,6 +9,7 @@ import pytest import ray +from ray.autoscaler._private.constants import AUTOSCALER_METRIC_PORT from ray.ray_constants import PROMETHEUS_SERVICE_DISCOVERY_FILE from ray._private.metrics_agent import PrometheusServiceDiscoveryWriter from ray.util.metrics import Counter, Histogram, Gauge @@ -48,6 +49,16 @@ "ray_outbound_heartbeat_size_kb_sum", ] +# This list of metrics should be kept in sync with +# ray/python/ray/autoscaler/_private/prom_metrics.py +_AUTOSCALER_METRICS = [ + "autoscaler_config_validation_exceptions", + "autoscaler_node_launch_exceptions", "autoscaler_pending_nodes", + "autoscaler_reset_exceptions", "autoscaler_running_workers", + "autoscaler_started_nodes", "autoscaler_stopped_nodes", + "autoscaler_update_loop_exceptions" +] + @pytest.fixture def _setup_cluster_for_test(ray_start_cluster): @@ -94,8 +105,9 @@ async def ping(self): metrics_export_port = node_info["MetricsExportPort"] addr = node_info["NodeManagerAddress"] prom_addresses.append(f"{addr}:{metrics_export_port}") - - yield prom_addresses + autoscaler_export_addr = "{}:{}".format(cluster.head_node.node_ip_address, + AUTOSCALER_METRIC_PORT) + yield prom_addresses, autoscaler_export_addr ray.get(worker_should_exit.send.remote()) ray.get(obj_refs) @@ -107,7 +119,7 @@ async def ping(self): def test_metrics_export_end_to_end(_setup_cluster_for_test): TEST_TIMEOUT_S = 20 - prom_addresses = _setup_cluster_for_test + prom_addresses, autoscaler_export_addr = _setup_cluster_for_test def test_cases(): components_dict, metric_names, metric_samples = fetch_prometheus( @@ -165,6 +177,16 @@ def test_cases(): assert hist_count == 1 assert hist_sum == 1.5 + # Autoscaler metrics + _, autoscaler_metric_names, _ = fetch_prometheus( + [autoscaler_export_addr]) + for metric in _AUTOSCALER_METRICS: + # Metric name should appear with some suffix (_count, _total, + # etc...) in the list of all names + assert any(name.startswith(metric) for name in + autoscaler_metric_names), \ + f"{metric} not in {autoscaler_metric_names}" + def wrap_test_case_for_retry(): try: test_cases() @@ -197,10 +219,14 @@ def test_prometheus_file_based_service_discovery(ray_start_cluster): redis_address, ray.ray_constants.REDIS_DEFAULT_PASSWORD, "/tmp/ray") def get_metrics_export_address_from_node(nodes): - return [ + node_export_addrs = [ "{}:{}".format(node.node_ip_address, node.metrics_export_port) for node in nodes ] + # monitor should be run on head node for `ray_start_cluster` fixture + autoscaler_export_addr = "{}:{}".format( + cluster.head_node.node_ip_address, AUTOSCALER_METRIC_PORT) + return node_export_addrs + [autoscaler_export_addr] loaded_json_data = json.loads(writer.get_file_discovery_content())[0] assert (set(get_metrics_export_address_from_node(nodes)) == set(