From 46ea81bf6e97f195d0dd843750b162a9ef5d82e7 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Mon, 24 May 2021 11:39:34 -0700 Subject: [PATCH 01/32] initial --- python/ray/autoscaler/_private/autoscaler.py | 4 ++ python/ray/autoscaler/_private/constants.py | 2 + .../kubernetes/operator_configs/operator.yaml | 49 +++++++++++++++++++ 3 files changed, 55 insertions(+) create mode 100644 python/ray/autoscaler/kubernetes/operator_configs/operator.yaml diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 392d328b1c5af..907d0a8f5946d 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -1,4 +1,5 @@ from collections import defaultdict, namedtuple, Counter +from python.ray.autoscaler._private.constants import AUTOSCALER_METRIC_PORT from typing import Any, Optional, Dict, List from urllib3.exceptions import MaxRetryError import copy @@ -6,6 +7,7 @@ import math import operator import os +import prometheus_client import subprocess import threading import time @@ -38,6 +40,8 @@ logger = logging.getLogger(__name__) +prometheus_client.start_http_server(AUTOSCALER_METRIC_PORT) + # Tuple of modified fields for the given node_id returned by should_update # that will be passed into a NodeUpdaterThread. UpdateInstructions = namedtuple( diff --git a/python/ray/autoscaler/_private/constants.py b/python/ray/autoscaler/_private/constants.py index e7010f1e9f1c1..5d9390dc2ff38 100644 --- a/python/ray/autoscaler/_private/constants.py +++ b/python/ray/autoscaler/_private/constants.py @@ -61,6 +61,8 @@ def env_integer(key, default): # to run. AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE = 1000 +AUTOSCALER_METRIC_PORT = env_integer("AUTOSCALER_METRIC_PORT", 44217) + # Max number of retries to AWS (default is 5, time increases exponentially) BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12) # Max number of retries to create an EC2 node (retry different subnet) diff --git a/python/ray/autoscaler/kubernetes/operator_configs/operator.yaml b/python/ray/autoscaler/kubernetes/operator_configs/operator.yaml new file mode 100644 index 0000000000000..3c37f554aa8b1 --- /dev/null +++ b/python/ray/autoscaler/kubernetes/operator_configs/operator.yaml @@ -0,0 +1,49 @@ +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: ray-operator-serviceaccount +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: ray-operator-role +rules: +- apiGroups: ["", "cluster.ray.io"] + resources: ["rayclusters", "rayclusters/finalizers", "rayclusters/status", "pods", "pods/exec", "services"] + verbs: ["get", "watch", "list", "create", "delete", "patch", "update"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: ray-operator-rolebinding +subjects: +- kind: ServiceAccount + name: ray-operator-serviceaccount +roleRef: + kind: Role + name: ray-operator-role + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: v1 +kind: Pod +metadata: + name: ray-operator-pod +spec: + serviceAccountName: ray-operator-serviceaccount + containers: + - name: ray + imagePullPolicy: Always + image: rayproject/ray:nightly + command: ["ray-operator"] + env: + - name: RAY_OPERATOR_POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + resources: + requests: + cpu: 1 + memory: 1Gi + limits: + memory: 2Gi From 7c7cd1a1856d3269c9d7130ec0421c99c94c8fd3 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Mon, 24 May 2021 14:19:23 -0700 Subject: [PATCH 02/32] sanity check --- python/ray/autoscaler/_private/autoscaler.py | 53 ++++++++++++++++--- .../ray/autoscaler/_private/node_launcher.py | 8 +++ 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 907d0a8f5946d..1f8a2dfe87e59 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -1,5 +1,4 @@ from collections import defaultdict, namedtuple, Counter -from python.ray.autoscaler._private.constants import AUTOSCALER_METRIC_PORT from typing import Any, Optional, Dict, List from urllib3.exceptions import MaxRetryError import copy @@ -34,14 +33,12 @@ format_info_string from ray.autoscaler._private.constants import \ AUTOSCALER_MAX_NUM_FAILURES, AUTOSCALER_MAX_LAUNCH_BATCH, \ - AUTOSCALER_MAX_CONCURRENT_LAUNCHES, AUTOSCALER_UPDATE_INTERVAL_S, \ - AUTOSCALER_HEARTBEAT_TIMEOUT_S + AUTOSCALER_MAX_CONCURRENT_LAUNCHES, AUTOSCALER_METRIC_PORT, \ + AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S from six.moves import queue logger = logging.getLogger(__name__) -prometheus_client.start_http_server(AUTOSCALER_METRIC_PORT) - # Tuple of modified fields for the given node_id returned by should_update # that will be passed into a NodeUpdaterThread. UpdateInstructions = namedtuple( @@ -52,6 +49,41 @@ "AutoscalerSummary", ["active_nodes", "pending_nodes", "pending_launches", "failed_nodes"]) +# Metrics +AUTOSCALER_METRIC_REGISTRY = prometheus_client.CollectorRegistry() +prometheus_client.start_http_server( + AUTOSCALER_METRIC_PORT, registry=AUTOSCALER_METRIC_REGISTRY) +WORKER_STARTUP_TIME_HISTOGRAM = prometheus_client.Histogram( + "worker_startup_time_seconds", + "Worker startup time", + unit="seconds", + namespace="autoscaler", + registry=AUTOSCALER_METRIC_REGISTRY) +NODES_STARTED_COUNTER = prometheus_client.Counter( + "started_nodes", + "Number of nodes started", + unit="nodes", + namespace="autoscaler", + registry=AUTOSCALER_METRIC_REGISTRY) +NODES_STOPPED_COUNTER = prometheus_client.Counter( + "stopped_nodes", + "Number of nodes stopped", + unit="nodes", + namespace="autoscaler", + registry=AUTOSCALER_METRIC_REGISTRY) +NUM_EXCEPTIONS_COUNTER = prometheus_client.Counter( + "exceptions", + "Number of exceptions", + unit="exceptions", + namespace="autoscaler", + registry=AUTOSCALER_METRIC_REGISTRY) +NODES_RUNNING_GAUGE = prometheus_client.Gauge( + "running_nodes", + "Number of nodes running", + unit="nodes", + namespace="autoscaler", + registry=AUTOSCALER_METRIC_REGISTRY) + class StandardAutoscaler: """The autoscaling control loop for a Ray cluster. @@ -116,6 +148,7 @@ def __init__(self, queue=self.launch_queue, index=i, pending=self.pending_launches, + startup_histogram=WORKER_STARTUP_TIME_HISTOGRAM, node_types=self.available_node_types, ) node_launcher.daemon = True @@ -166,6 +199,7 @@ def _update(self): self.last_update_time = now nodes = self.workers() + NODES_RUNNING_GAUGE.set(len(nodes)) self.load_metrics.prune_active_ips([ self.provider.internal_ip(node_id) @@ -221,7 +255,9 @@ def _update(self): self.provider.terminate_nodes(nodes_to_terminate) for node in nodes_to_terminate: self.node_tracker.untrack(node) + NODES_STOPPED_COUNTER.inc() nodes = self.workers() + NODES_RUNNING_GAUGE.set(len(nodes)) # Terminate nodes if there are too many nodes_to_terminate = [] @@ -241,7 +277,9 @@ def _update(self): self.provider.terminate_nodes(nodes_to_terminate) for node in nodes_to_terminate: self.node_tracker.untrack(node) + NODES_STOPPED_COUNTER.inc() nodes = self.workers() + NODES_RUNNING_GAUGE.set(len(nodes)) to_launch = self.resource_demand_scheduler.get_nodes_to_launch( self.provider.non_terminated_nodes(tag_filters={}), @@ -253,9 +291,11 @@ def _update(self): ensure_min_cluster_size=self.load_metrics.get_resource_requests()) for node_type, count in to_launch.items(): self.launch_new_node(count, node_type=node_type) + NODES_STARTED_COUNTER.inc() if to_launch: nodes = self.workers() + NODES_RUNNING_GAUGE.set(len(nodes)) # Process any completed updates completed_nodes = [] @@ -740,8 +780,9 @@ def all_workers(self): return self.workers() + self.unmanaged_workers() def workers(self): - return self.provider.non_terminated_nodes( + nodes = self.provider.non_terminated_nodes( tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) + return nodes def unmanaged_workers(self): return self.provider.non_terminated_nodes( diff --git a/python/ray/autoscaler/_private/node_launcher.py b/python/ray/autoscaler/_private/node_launcher.py index dc3a8b32953d1..6a969d7e77722 100644 --- a/python/ray/autoscaler/_private/node_launcher.py +++ b/python/ray/autoscaler/_private/node_launcher.py @@ -2,6 +2,7 @@ import copy import logging import threading +import time from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND, TAG_RAY_NODE_NAME, @@ -19,6 +20,7 @@ def __init__(self, provider, queue, pending, + startup_histogram=None, node_types=None, index=None, *args, @@ -26,6 +28,7 @@ def __init__(self, self.queue = queue self.pending = pending self.provider = provider + self.startup_histogram = startup_histogram self.node_types = node_types self.index = str(index) if index is not None else "" super(NodeLauncher, self).__init__(*args, **kwargs) @@ -57,7 +60,12 @@ def _launch_node(self, config: Dict[str, Any], count: int, if node_type: node_tags[TAG_RAY_USER_NODE_TYPE] = node_type node_config.update(launch_config) + launch_start_time = time.time() self.provider.create_node(node_config, node_tags, count) + startup_time = time.time() - launch_start_time + if self.startup_histogram: + for _ in range(count): + self.startup_histogram.observe(startup_time) after = self.provider.non_terminated_nodes(tag_filters=worker_filter) if set(after).issubset(before): self.log("No new nodes reported after node creation.") From f170974bdada3b7c18c9a35998ebb734c43dfd06 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Tue, 25 May 2021 13:06:05 -0700 Subject: [PATCH 03/32] lint and more --- python/ray/_private/metrics_agent.py | 2 + python/ray/autoscaler/_private/autoscaler.py | 69 +++++++------------ python/ray/autoscaler/_private/constants.py | 1 + .../ray/autoscaler/_private/node_launcher.py | 12 ++-- .../ray/autoscaler/_private/prom_metrics.py | 33 +++++++++ 5 files changed, 66 insertions(+), 51 deletions(-) create mode 100644 python/ray/autoscaler/_private/prom_metrics.py diff --git a/python/ray/_private/metrics_agent.py b/python/ray/_private/metrics_agent.py index 0ecc3514208f8..0970369eb7a61 100644 --- a/python/ray/_private/metrics_agent.py +++ b/python/ray/_private/metrics_agent.py @@ -197,6 +197,8 @@ def get_file_discovery_content(self): node["MetricsExportPort"]) for node in nodes if node["alive"] is True ] + # TODO(ckw): how to get autoscaler ip? + # autoscaler_export_addr = "{}:{}".format("????", AUTOSCALER_METRIC_PO return json.dumps([{ "labels": { "job": "ray" diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 1f8a2dfe87e59..b0335df310ec3 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -31,10 +31,13 @@ from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \ with_head_node_ip, hash_launch_conf, hash_runtime_conf, \ format_info_string -from ray.autoscaler._private.constants import \ - AUTOSCALER_MAX_NUM_FAILURES, AUTOSCALER_MAX_LAUNCH_BATCH, \ - AUTOSCALER_MAX_CONCURRENT_LAUNCHES, AUTOSCALER_METRIC_PORT, \ - AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S +from ray.autoscaler._private.constants import AUTOSCALER_MAX_NUM_FAILURES, \ + AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \ + AUTOSCALER_METRIC_PORT, AUTOSCALER_UPDATE_INTERVAL_S, \ + AUTOSCALER_HEARTBEAT_TIMEOUT_S +from ray.autoscaler._private.prom_metrics import \ + AUTOSCALER_EXCEPTIONS_COUNTER, AUTOSCALER_STOPPED_NODES_COUNTER, \ + AUTOSCALER_METRIC_REGISTRY, AUTOSCALER_RUNNING_NODES_GAUGE from six.moves import queue logger = logging.getLogger(__name__) @@ -49,40 +52,9 @@ "AutoscalerSummary", ["active_nodes", "pending_nodes", "pending_launches", "failed_nodes"]) -# Metrics -AUTOSCALER_METRIC_REGISTRY = prometheus_client.CollectorRegistry() -prometheus_client.start_http_server( - AUTOSCALER_METRIC_PORT, registry=AUTOSCALER_METRIC_REGISTRY) -WORKER_STARTUP_TIME_HISTOGRAM = prometheus_client.Histogram( - "worker_startup_time_seconds", - "Worker startup time", - unit="seconds", - namespace="autoscaler", - registry=AUTOSCALER_METRIC_REGISTRY) -NODES_STARTED_COUNTER = prometheus_client.Counter( - "started_nodes", - "Number of nodes started", - unit="nodes", - namespace="autoscaler", - registry=AUTOSCALER_METRIC_REGISTRY) -NODES_STOPPED_COUNTER = prometheus_client.Counter( - "stopped_nodes", - "Number of nodes stopped", - unit="nodes", - namespace="autoscaler", - registry=AUTOSCALER_METRIC_REGISTRY) -NUM_EXCEPTIONS_COUNTER = prometheus_client.Counter( - "exceptions", - "Number of exceptions", - unit="exceptions", - namespace="autoscaler", - registry=AUTOSCALER_METRIC_REGISTRY) -NODES_RUNNING_GAUGE = prometheus_client.Gauge( - "running_nodes", - "Number of nodes running", - unit="nodes", - namespace="autoscaler", - registry=AUTOSCALER_METRIC_REGISTRY) +# Prevent multiple servers from starting if multiple autoscaler's +# are instantiated for some reaso +_metric_server_started = False class StandardAutoscaler: @@ -148,7 +120,6 @@ def __init__(self, queue=self.launch_queue, index=i, pending=self.pending_launches, - startup_histogram=WORKER_STARTUP_TIME_HISTOGRAM, node_types=self.available_node_types, ) node_launcher.daemon = True @@ -169,6 +140,12 @@ def __init__(self, for local_path in self.config["file_mounts"].values(): assert os.path.exists(local_path) + global _metric_server_started + if not _metric_server_started: + _metric_server_started = True + prometheus_client.start_http_server( + AUTOSCALER_METRIC_PORT, registry=AUTOSCALER_METRIC_REGISTRY) + logger.info("StandardAutoscaler: {}".format(self.config)) def update(self): @@ -176,6 +153,7 @@ def update(self): self.reset(errors_fatal=False) self._update() except Exception as e: + AUTOSCALER_EXCEPTIONS_COUNTER.inc() logger.exception("StandardAutoscaler: " "Error during autoscaling.") # Don't abort the autoscaler if the K8s API server is down. @@ -199,7 +177,6 @@ def _update(self): self.last_update_time = now nodes = self.workers() - NODES_RUNNING_GAUGE.set(len(nodes)) self.load_metrics.prune_active_ips([ self.provider.internal_ip(node_id) @@ -255,9 +232,8 @@ def _update(self): self.provider.terminate_nodes(nodes_to_terminate) for node in nodes_to_terminate: self.node_tracker.untrack(node) - NODES_STOPPED_COUNTER.inc() + AUTOSCALER_STOPPED_NODES_COUNTER.inc() nodes = self.workers() - NODES_RUNNING_GAUGE.set(len(nodes)) # Terminate nodes if there are too many nodes_to_terminate = [] @@ -277,9 +253,8 @@ def _update(self): self.provider.terminate_nodes(nodes_to_terminate) for node in nodes_to_terminate: self.node_tracker.untrack(node) - NODES_STOPPED_COUNTER.inc() + AUTOSCALER_STOPPED_NODES_COUNTER.inc() nodes = self.workers() - NODES_RUNNING_GAUGE.set(len(nodes)) to_launch = self.resource_demand_scheduler.get_nodes_to_launch( self.provider.non_terminated_nodes(tag_filters={}), @@ -291,11 +266,9 @@ def _update(self): ensure_min_cluster_size=self.load_metrics.get_resource_requests()) for node_type, count in to_launch.items(): self.launch_new_node(count, node_type=node_type) - NODES_STARTED_COUNTER.inc() if to_launch: nodes = self.workers() - NODES_RUNNING_GAUGE.set(len(nodes)) # Process any completed updates completed_nodes = [] @@ -514,6 +487,7 @@ def reset(self, errors_fatal=False): try: validate_config(new_config) except Exception as e: + AUTOSCALER_EXCEPTIONS_COUNTER.inc() logger.debug( "Cluster config validation failed. The version of " "the ray CLI you launched this cluster with may " @@ -577,6 +551,7 @@ def reset(self, errors_fatal=False): upscaling_speed) except Exception as e: + AUTOSCALER_EXCEPTIONS_COUNTER.inc() if errors_fatal: raise e else: @@ -782,6 +757,8 @@ def all_workers(self): def workers(self): nodes = self.provider.non_terminated_nodes( tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) + # Update running nodes gauge whenever we check workers + AUTOSCALER_RUNNING_NODES_GAUGE.set(len(nodes)) return nodes def unmanaged_workers(self): diff --git a/python/ray/autoscaler/_private/constants.py b/python/ray/autoscaler/_private/constants.py index 5d9390dc2ff38..c5556c7acced8 100644 --- a/python/ray/autoscaler/_private/constants.py +++ b/python/ray/autoscaler/_private/constants.py @@ -61,6 +61,7 @@ def env_integer(key, default): # to run. AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE = 1000 +# Port that autoscaler prometheus metrics will be exported to AUTOSCALER_METRIC_PORT = env_integer("AUTOSCALER_METRIC_PORT", 44217) # Max number of retries to AWS (default is 5, time increases exponentially) diff --git a/python/ray/autoscaler/_private/node_launcher.py b/python/ray/autoscaler/_private/node_launcher.py index 6a969d7e77722..ffb59677666ee 100644 --- a/python/ray/autoscaler/_private/node_launcher.py +++ b/python/ray/autoscaler/_private/node_launcher.py @@ -9,6 +9,9 @@ TAG_RAY_USER_NODE_TYPE, STATUS_UNINITIALIZED, NODE_KIND_WORKER) from ray.autoscaler._private.util import hash_launch_conf +from ray.autoscaler._private.prom_metrics import ( + AUTOSCALER_EXCEPTIONS_COUNTER, AUTOSCALER_WORKER_STARTUP_TIME_HISTOGRAM, + AUTOSCALER_STARTED_NODES_COUNTER) logger = logging.getLogger(__name__) @@ -20,7 +23,6 @@ def __init__(self, provider, queue, pending, - startup_histogram=None, node_types=None, index=None, *args, @@ -28,7 +30,6 @@ def __init__(self, self.queue = queue self.pending = pending self.provider = provider - self.startup_histogram = startup_histogram self.node_types = node_types self.index = str(index) if index is not None else "" super(NodeLauncher, self).__init__(*args, **kwargs) @@ -63,9 +64,9 @@ def _launch_node(self, config: Dict[str, Any], count: int, launch_start_time = time.time() self.provider.create_node(node_config, node_tags, count) startup_time = time.time() - launch_start_time - if self.startup_histogram: - for _ in range(count): - self.startup_histogram.observe(startup_time) + for _ in range(count): + AUTOSCALER_WORKER_STARTUP_TIME_HISTOGRAM.observe(startup_time) + AUTOSCALER_STARTED_NODES_COUNTER.inc(count) after = self.provider.non_terminated_nodes(tag_filters=worker_filter) if set(after).issubset(before): self.log("No new nodes reported after node creation.") @@ -77,6 +78,7 @@ def run(self): try: self._launch_node(config, count, node_type) except Exception: + AUTOSCALER_EXCEPTIONS_COUNTER.inc() logger.exception("Launch failed") finally: self.pending.dec(node_type, count) diff --git a/python/ray/autoscaler/_private/prom_metrics.py b/python/ray/autoscaler/_private/prom_metrics.py new file mode 100644 index 0000000000000..56da79f8ba18c --- /dev/null +++ b/python/ray/autoscaler/_private/prom_metrics.py @@ -0,0 +1,33 @@ +import prometheus_client + +AUTOSCALER_METRIC_REGISTRY = prometheus_client.CollectorRegistry() +AUTOSCALER_WORKER_STARTUP_TIME_HISTOGRAM = prometheus_client.Histogram( + "worker_startup_time_seconds", + "Worker startup time", + unit="seconds", + namespace="autoscaler", + registry=AUTOSCALER_METRIC_REGISTRY) +AUTOSCALER_STARTED_NODES_COUNTER = prometheus_client.Counter( + "started_nodes", + "Number of nodes started", + unit="nodes", + namespace="autoscaler", + registry=AUTOSCALER_METRIC_REGISTRY) +AUTOSCALER_STOPPED_NODES_COUNTER = prometheus_client.Counter( + "stopped_nodes", + "Number of nodes stopped", + unit="nodes", + namespace="autoscaler", + registry=AUTOSCALER_METRIC_REGISTRY) +AUTOSCALER_RUNNING_NODES_GAUGE = prometheus_client.Gauge( + "running_nodes", + "Number of nodes running", + unit="nodes", + namespace="autoscaler", + registry=AUTOSCALER_METRIC_REGISTRY) +AUTOSCALER_EXCEPTIONS_COUNTER = prometheus_client.Counter( + "exceptions", + "Number of exceptions", + unit="exceptions", + namespace="autoscaler", + registry=AUTOSCALER_METRIC_REGISTRY) From 4150bf0a113dbe0d6bcfbdb649298017a7e5d8e7 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Tue, 25 May 2021 13:11:50 -0700 Subject: [PATCH 04/32] remove extra file? --- python/ray/autoscaler/_private/autoscaler.py | 4 +- .../kubernetes/operator_configs/operator.yaml | 49 ------------------- 2 files changed, 2 insertions(+), 51 deletions(-) delete mode 100644 python/ray/autoscaler/kubernetes/operator_configs/operator.yaml diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index b0335df310ec3..0296848b5879d 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -52,8 +52,8 @@ "AutoscalerSummary", ["active_nodes", "pending_nodes", "pending_launches", "failed_nodes"]) -# Prevent multiple servers from starting if multiple autoscaler's -# are instantiated for some reaso +# Prevent multiple metric servers from starting if multiple autoscaler's +# are instantiated _metric_server_started = False diff --git a/python/ray/autoscaler/kubernetes/operator_configs/operator.yaml b/python/ray/autoscaler/kubernetes/operator_configs/operator.yaml deleted file mode 100644 index 3c37f554aa8b1..0000000000000 --- a/python/ray/autoscaler/kubernetes/operator_configs/operator.yaml +++ /dev/null @@ -1,49 +0,0 @@ ---- -apiVersion: v1 -kind: ServiceAccount -metadata: - name: ray-operator-serviceaccount ---- -kind: Role -apiVersion: rbac.authorization.k8s.io/v1 -metadata: - name: ray-operator-role -rules: -- apiGroups: ["", "cluster.ray.io"] - resources: ["rayclusters", "rayclusters/finalizers", "rayclusters/status", "pods", "pods/exec", "services"] - verbs: ["get", "watch", "list", "create", "delete", "patch", "update"] ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: RoleBinding -metadata: - name: ray-operator-rolebinding -subjects: -- kind: ServiceAccount - name: ray-operator-serviceaccount -roleRef: - kind: Role - name: ray-operator-role - apiGroup: rbac.authorization.k8s.io ---- -apiVersion: v1 -kind: Pod -metadata: - name: ray-operator-pod -spec: - serviceAccountName: ray-operator-serviceaccount - containers: - - name: ray - imagePullPolicy: Always - image: rayproject/ray:nightly - command: ["ray-operator"] - env: - - name: RAY_OPERATOR_POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - resources: - requests: - cpu: 1 - memory: 1Gi - limits: - memory: 2Gi From 771b30428c5f1e3ba978f187791b0e4d933579fc Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Wed, 26 May 2021 13:51:51 -0700 Subject: [PATCH 05/32] format --- python/ray/autoscaler/_private/autoscaler.py | 42 +++++++---- .../ray/autoscaler/_private/node_launcher.py | 19 +++-- .../ray/autoscaler/_private/prom_metrics.py | 75 +++++++++++-------- 3 files changed, 82 insertions(+), 54 deletions(-) diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 0296848b5879d..a77559d78026b 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -1,4 +1,5 @@ from collections import defaultdict, namedtuple, Counter +from ray.autoscaler._private.prom_metrics import DEFAULT_AUTOSCALER_METRICS from typing import Any, Optional, Dict, List from urllib3.exceptions import MaxRetryError import copy @@ -35,9 +36,6 @@ AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \ AUTOSCALER_METRIC_PORT, AUTOSCALER_UPDATE_INTERVAL_S, \ AUTOSCALER_HEARTBEAT_TIMEOUT_S -from ray.autoscaler._private.prom_metrics import \ - AUTOSCALER_EXCEPTIONS_COUNTER, AUTOSCALER_STOPPED_NODES_COUNTER, \ - AUTOSCALER_METRIC_REGISTRY, AUTOSCALER_RUNNING_NODES_GAUGE from six.moves import queue logger = logging.getLogger(__name__) @@ -52,8 +50,8 @@ "AutoscalerSummary", ["active_nodes", "pending_nodes", "pending_launches", "failed_nodes"]) -# Prevent multiple metric servers from starting if multiple autoscaler's -# are instantiated +# Prevent multiple metric servers from starting if multiple autoscalers +# are instantiated, for example when testing _metric_server_started = False @@ -83,13 +81,18 @@ def __init__(self, process_runner=subprocess, update_interval_s=AUTOSCALER_UPDATE_INTERVAL_S, prefix_cluster_info=False, - event_summarizer=None): + event_summarizer=None, + prom_metrics=None): self.config_path = config_path # Prefix each line of info string with cluster name if True self.prefix_cluster_info = prefix_cluster_info # Keep this before self.reset (self.provider needs to be created # exactly once). self.provider = None + # Keep this before self.reset (if an exception occurs in reset + # then prom_metrics must be instantitiated to increment the + # exception counter) + self.prom_metrics = prom_metrics or DEFAULT_AUTOSCALER_METRICS self.resource_demand_scheduler = None self.reset(errors_fatal=True) self.head_node_ip = load_metrics.local_ip @@ -142,9 +145,18 @@ def __init__(self, global _metric_server_started if not _metric_server_started: - _metric_server_started = True - prometheus_client.start_http_server( - AUTOSCALER_METRIC_PORT, registry=AUTOSCALER_METRIC_REGISTRY) + try: + logger.info( + "Starting autoscaler metrics server on port {}".format( + AUTOSCALER_METRIC_PORT)) + prometheus_client.start_http_server( + AUTOSCALER_METRIC_PORT, + registry=self.prom_metrics.registry) + _metric_server_started = True + except Exception: + logger.error( + "An error occurred while starting the metrics server.") + raise Exception logger.info("StandardAutoscaler: {}".format(self.config)) @@ -153,7 +165,7 @@ def update(self): self.reset(errors_fatal=False) self._update() except Exception as e: - AUTOSCALER_EXCEPTIONS_COUNTER.inc() + self.prom_metrics.exceptions.inc() logger.exception("StandardAutoscaler: " "Error during autoscaling.") # Don't abort the autoscaler if the K8s API server is down. @@ -232,7 +244,7 @@ def _update(self): self.provider.terminate_nodes(nodes_to_terminate) for node in nodes_to_terminate: self.node_tracker.untrack(node) - AUTOSCALER_STOPPED_NODES_COUNTER.inc() + self.prom_metrics.stopped_nodes.inc() nodes = self.workers() # Terminate nodes if there are too many @@ -253,7 +265,7 @@ def _update(self): self.provider.terminate_nodes(nodes_to_terminate) for node in nodes_to_terminate: self.node_tracker.untrack(node) - AUTOSCALER_STOPPED_NODES_COUNTER.inc() + self.prom_metrics.stopped_nodes.inc() nodes = self.workers() to_launch = self.resource_demand_scheduler.get_nodes_to_launch( @@ -487,7 +499,7 @@ def reset(self, errors_fatal=False): try: validate_config(new_config) except Exception as e: - AUTOSCALER_EXCEPTIONS_COUNTER.inc() + self.prom_metrics.exceptions.inc() logger.debug( "Cluster config validation failed. The version of " "the ray CLI you launched this cluster with may " @@ -551,7 +563,7 @@ def reset(self, errors_fatal=False): upscaling_speed) except Exception as e: - AUTOSCALER_EXCEPTIONS_COUNTER.inc() + self.prom_metrics.exceptions.inc() if errors_fatal: raise e else: @@ -758,7 +770,7 @@ def workers(self): nodes = self.provider.non_terminated_nodes( tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) # Update running nodes gauge whenever we check workers - AUTOSCALER_RUNNING_NODES_GAUGE.set(len(nodes)) + self.prom_metrics.running_nodes.set(len(nodes)) return nodes def unmanaged_workers(self): diff --git a/python/ray/autoscaler/_private/node_launcher.py b/python/ray/autoscaler/_private/node_launcher.py index ffb59677666ee..c0f31d9d65ac6 100644 --- a/python/ray/autoscaler/_private/node_launcher.py +++ b/python/ray/autoscaler/_private/node_launcher.py @@ -9,9 +9,6 @@ TAG_RAY_USER_NODE_TYPE, STATUS_UNINITIALIZED, NODE_KIND_WORKER) from ray.autoscaler._private.util import hash_launch_conf -from ray.autoscaler._private.prom_metrics import ( - AUTOSCALER_EXCEPTIONS_COUNTER, AUTOSCALER_WORKER_STARTUP_TIME_HISTOGRAM, - AUTOSCALER_STARTED_NODES_COUNTER) logger = logging.getLogger(__name__) @@ -23,12 +20,14 @@ def __init__(self, provider, queue, pending, + prom_metrics=None, node_types=None, index=None, *args, **kwargs): self.queue = queue self.pending = pending + self.prom_metrics = prom_metrics self.provider = provider self.node_types = node_types self.index = str(index) if index is not None else "" @@ -64,9 +63,14 @@ def _launch_node(self, config: Dict[str, Any], count: int, launch_start_time = time.time() self.provider.create_node(node_config, node_tags, count) startup_time = time.time() - launch_start_time - for _ in range(count): - AUTOSCALER_WORKER_STARTUP_TIME_HISTOGRAM.observe(startup_time) - AUTOSCALER_STARTED_NODES_COUNTER.inc(count) + if self.prom_metrics: + for _ in range(count): + # Note: when launching multiple nodes we observe the time it + # took all nodes to start up for each node. For example, if 4 + # nodes were launched in 25 seconds, we would observe the 25 + # second startup time 4 times. + self.prom_metrics.startup_time.observe(startup_time) + self.prom_metrics.started_nodes.inc(count) after = self.provider.non_terminated_nodes(tag_filters=worker_filter) if set(after).issubset(before): self.log("No new nodes reported after node creation.") @@ -78,7 +82,8 @@ def run(self): try: self._launch_node(config, count, node_type) except Exception: - AUTOSCALER_EXCEPTIONS_COUNTER.inc() + if self.prom_metrics: + self.prom_metrics.exceptions.inc() logger.exception("Launch failed") finally: self.pending.dec(node_type, count) diff --git a/python/ray/autoscaler/_private/prom_metrics.py b/python/ray/autoscaler/_private/prom_metrics.py index 56da79f8ba18c..a0a4765a583f3 100644 --- a/python/ray/autoscaler/_private/prom_metrics.py +++ b/python/ray/autoscaler/_private/prom_metrics.py @@ -1,33 +1,44 @@ -import prometheus_client +from prometheus_client import ( + CollectorRegistry, + Counter, + Gauge, + Histogram, +) -AUTOSCALER_METRIC_REGISTRY = prometheus_client.CollectorRegistry() -AUTOSCALER_WORKER_STARTUP_TIME_HISTOGRAM = prometheus_client.Histogram( - "worker_startup_time_seconds", - "Worker startup time", - unit="seconds", - namespace="autoscaler", - registry=AUTOSCALER_METRIC_REGISTRY) -AUTOSCALER_STARTED_NODES_COUNTER = prometheus_client.Counter( - "started_nodes", - "Number of nodes started", - unit="nodes", - namespace="autoscaler", - registry=AUTOSCALER_METRIC_REGISTRY) -AUTOSCALER_STOPPED_NODES_COUNTER = prometheus_client.Counter( - "stopped_nodes", - "Number of nodes stopped", - unit="nodes", - namespace="autoscaler", - registry=AUTOSCALER_METRIC_REGISTRY) -AUTOSCALER_RUNNING_NODES_GAUGE = prometheus_client.Gauge( - "running_nodes", - "Number of nodes running", - unit="nodes", - namespace="autoscaler", - registry=AUTOSCALER_METRIC_REGISTRY) -AUTOSCALER_EXCEPTIONS_COUNTER = prometheus_client.Counter( - "exceptions", - "Number of exceptions", - unit="exceptions", - namespace="autoscaler", - registry=AUTOSCALER_METRIC_REGISTRY) + +class AutoscalerPrometheusMetrics: + def __init__(self, registry: CollectorRegistry): + self.registry: CollectorRegistry = registry + self.worker_startup_time: Histogram = Histogram( + "worker_startup_time_seconds", + "Worker startup time", + unit="seconds", + namespace="autoscaler", + registry=self.registry) + self.started_nodes: Counter = Counter( + "started_nodes", + "Number of nodes started", + unit="nodes", + namespace="autoscaler", + registry=self.registry) + self.stopped_nodes: Counter = Counter( + "stopped_nodes", + "Number of nodes stopped", + unit="nodes", + namespace="autoscaler", + registry=self.registry) + self.running_nodes: Gauge = Gauge( + "running_nodes", + "Number of nodes running", + unit="nodes", + namespace="autoscaler", + registry=self.registry) + self.exceptions: Counter = Counter( + "exceptions", + "Number of exceptions", + unit="exceptions", + namespace="autoscaler", + registry=self.registry) + + +DEFAULT_AUTOSCALER_METRICS = AutoscalerPrometheusMetrics(CollectorRegistry()) From 3f11bc66747917cb7c235f11496d3cb5a4258f5d Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Wed, 26 May 2021 17:38:07 -0700 Subject: [PATCH 06/32] store ip of machine running monitor process --- python/ray/_private/metrics_agent.py | 11 ++++++++++- python/ray/_private/services.py | 8 ++++++-- python/ray/autoscaler/_private/monitor.py | 13 +++++++++++-- python/ray/autoscaler/_private/prom_metrics.py | 3 ++- python/ray/node.py | 4 +++- 5 files changed, 32 insertions(+), 7 deletions(-) diff --git a/python/ray/_private/metrics_agent.py b/python/ray/_private/metrics_agent.py index 0970369eb7a61..9d6e7a3a5ecc8 100644 --- a/python/ray/_private/metrics_agent.py +++ b/python/ray/_private/metrics_agent.py @@ -21,8 +21,10 @@ from opencensus.tags import tag_value as tag_value_module import ray +from ray._private import services import ray._private.prometheus_exporter as prometheus_exporter +from ray.autoscaler._private.constants import AUTOSCALER_METRIC_PORT from ray.core.generated.metrics_pb2 import Metric logger = logging.getLogger(__name__) @@ -185,6 +187,8 @@ class PrometheusServiceDiscoveryWriter(threading.Thread): def __init__(self, redis_address, redis_password, temp_dir): ray.state.state._initialize_global_state( redis_address=redis_address, redis_password=redis_password) + self.redis_address = redis_address + self.redis_password = redis_password self.temp_dir = temp_dir self.default_service_discovery_flush_period = 5 super().__init__() @@ -198,7 +202,12 @@ def get_file_discovery_content(self): if node["alive"] is True ] # TODO(ckw): how to get autoscaler ip? - # autoscaler_export_addr = "{}:{}".format("????", AUTOSCALER_METRIC_PO + redis_client = services.create_redis_client(self.redis_address, + self.redis_password) + monitor_ip = redis_client.get("autoscaler_ip") + if monitor_ip: + autoscaler_export_addr = f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}" + metrics_export_addresses.append(autoscaler_export_addr) return json.dumps([{ "labels": { "job": "ray" diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index d49c73a0825c6..9f0c624b7793f 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -1834,7 +1834,8 @@ def start_monitor(redis_address, redis_password=None, fate_share=None, max_bytes=0, - backup_count=0): + backup_count=0, + monitor_ip=None): """Run a process to monitor the other processes. Args: @@ -1850,7 +1851,8 @@ def start_monitor(redis_address, RotatingFileHandler's maxBytes. backup_count (int): Log rotation parameter. Corresponding to RotatingFileHandler's backupCount. - + monitor_ip (str): ip address of the machine that the monitor will be + run on Returns: ProcessInfo for the process that was started. """ @@ -1865,6 +1867,8 @@ def start_monitor(redis_address, command.append("--autoscaling-config=" + str(autoscaling_config)) if redis_password: command.append("--redis-password=" + redis_password) + if monitor_ip: + command.append("--monitor-ip=" + monitor_ip) process_info = start_ray_process( command, ray_constants.PROCESS_TYPE_MONITOR, diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index b17a4aeb4139e..097630267de75 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -92,13 +92,15 @@ def __init__(self, autoscaling_config, redis_password=None, prefix_cluster_info=False, + monitor_ip=None, stop_event: Optional[Event] = None): # Initialize the Redis clients. ray.state.state._initialize_global_state( redis_address, redis_password=redis_password) self.redis = ray._private.services.create_redis_client( redis_address, password=redis_password) - + if monitor_ip: + self.redis.put("monitor_ip", monitor_ip) (ip, port) = redis_address.split(":") self.gcs_client = connect_to_gcs(ip, int(port), redis_password) # Initialize the gcs stub for getting all node resource usage. @@ -359,6 +361,12 @@ def run(self): default=ray_constants.LOGGING_ROTATE_BACKUP_COUNT, help="Specify the backup count of rotated log file, default is " f"{ray_constants.LOGGING_ROTATE_BACKUP_COUNT}.") + parser.add_argument( + "--monitor-ip", + required=False, + type=str, + default="", + help="The IP address of the machine hosting the monitor process") args = parser.parse_args() setup_component_logger( logging_level=args.logging_level, @@ -381,6 +389,7 @@ def run(self): monitor = Monitor( args.redis_address, autoscaling_config, - redis_password=args.redis_password) + redis_password=args.redis_password, + monitor_ip=args.monitor_ip) monitor.run() diff --git a/python/ray/autoscaler/_private/prom_metrics.py b/python/ray/autoscaler/_private/prom_metrics.py index a0a4765a583f3..5bd2a0deb4922 100644 --- a/python/ray/autoscaler/_private/prom_metrics.py +++ b/python/ray/autoscaler/_private/prom_metrics.py @@ -41,4 +41,5 @@ def __init__(self, registry: CollectorRegistry): registry=self.registry) -DEFAULT_AUTOSCALER_METRICS = AutoscalerPrometheusMetrics(CollectorRegistry()) +DEFAULT_AUTOSCALER_METRICS = AutoscalerPrometheusMetrics( + CollectorRegistry(auto_describe=True)) diff --git a/python/ray/node.py b/python/ray/node.py index ed473c3386a95..c575f74ca4145 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -830,7 +830,9 @@ def start_monitor(self): redis_password=self._ray_params.redis_password, fate_share=self.kernel_fate_share, max_bytes=self.max_bytes, - backup_count=self.backup_count) + backup_count=self.backup_count, + monitor_ip=self._node_ip_address, + ) assert ray_constants.PROCESS_TYPE_MONITOR not in self.all_processes self.all_processes[ray_constants.PROCESS_TYPE_MONITOR] = [process_info] From eb9edb18d82375809a366e1b022d0e1f7d2a6f35 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Wed, 26 May 2021 21:37:15 -0700 Subject: [PATCH 07/32] autoscaler_ip -> monitor_ip --- python/ray/_private/metrics_agent.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/_private/metrics_agent.py b/python/ray/_private/metrics_agent.py index 9d6e7a3a5ecc8..6595dac2b60fa 100644 --- a/python/ray/_private/metrics_agent.py +++ b/python/ray/_private/metrics_agent.py @@ -201,10 +201,10 @@ def get_file_discovery_content(self): node["MetricsExportPort"]) for node in nodes if node["alive"] is True ] - # TODO(ckw): how to get autoscaler ip? + # TODO(ckw): store monitor_ip in RayCluster scenario redis_client = services.create_redis_client(self.redis_address, self.redis_password) - monitor_ip = redis_client.get("autoscaler_ip") + monitor_ip = redis_client.get("monitor_ip") if monitor_ip: autoscaler_export_addr = f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}" metrics_export_addresses.append(autoscaler_export_addr) From b4130a849e42444aead83ea3885331742233c93f Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 08:41:47 -0700 Subject: [PATCH 08/32] lint --- python/ray/_private/metrics_agent.py | 4 ++-- python/ray/autoscaler/_private/autoscaler.py | 2 +- python/ray/autoscaler/_private/monitor.py | 4 +++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/python/ray/_private/metrics_agent.py b/python/ray/_private/metrics_agent.py index 6595dac2b60fa..50bdbf2e8784b 100644 --- a/python/ray/_private/metrics_agent.py +++ b/python/ray/_private/metrics_agent.py @@ -194,7 +194,7 @@ def __init__(self, redis_address, redis_password, temp_dir): super().__init__() def get_file_discovery_content(self): - """Return the content for Prometheus serivce discovery.""" + """Return the content for Prometheus service discovery.""" nodes = ray.nodes() metrics_export_addresses = [ "{}:{}".format(node["NodeManagerAddress"], @@ -204,7 +204,7 @@ def get_file_discovery_content(self): # TODO(ckw): store monitor_ip in RayCluster scenario redis_client = services.create_redis_client(self.redis_address, self.redis_password) - monitor_ip = redis_client.get("monitor_ip") + monitor_ip = redis_client.get("autoscaler_metrics_address") if monitor_ip: autoscaler_export_addr = f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}" metrics_export_addresses.append(autoscaler_export_addr) diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index a77559d78026b..504eb7e5eacdc 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -124,7 +124,7 @@ def __init__(self, index=i, pending=self.pending_launches, node_types=self.available_node_types, - ) + prom_metrics=self.prom_metrics) node_launcher.daemon = True node_launcher.start() diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index 097630267de75..d2bfde4fa98fb 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -4,6 +4,7 @@ import logging import logging.handlers import os +from python.ray.autoscaler._private.constants import AUTOSCALER_METRIC_PORT import sys import signal import time @@ -100,7 +101,8 @@ def __init__(self, self.redis = ray._private.services.create_redis_client( redis_address, password=redis_password) if monitor_ip: - self.redis.put("monitor_ip", monitor_ip) + self.redis.put("autoscaler_metrics_port", + f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}") (ip, port) = redis_address.split(":") self.gcs_client = connect_to_gcs(ip, int(port), redis_password) # Initialize the gcs stub for getting all node resource usage. From 09a8998c2023ed7c8fed37bcb9246b0aded6e5a6 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 09:14:30 -0700 Subject: [PATCH 09/32] add worker startup time buckets --- python/ray/autoscaler/_private/prom_metrics.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/ray/autoscaler/_private/prom_metrics.py b/python/ray/autoscaler/_private/prom_metrics.py index 5bd2a0deb4922..33bb16b732360 100644 --- a/python/ray/autoscaler/_private/prom_metrics.py +++ b/python/ray/autoscaler/_private/prom_metrics.py @@ -9,12 +9,15 @@ class AutoscalerPrometheusMetrics: def __init__(self, registry: CollectorRegistry): self.registry: CollectorRegistry = registry + # Buckets: 30 seconds, 1 minute, 2 minutes, 4 minutes + # 8 minutes, 15 minutes, 30 minutes, 1 hour self.worker_startup_time: Histogram = Histogram( "worker_startup_time_seconds", "Worker startup time", unit="seconds", namespace="autoscaler", - registry=self.registry) + registry=self.registry, + buckets=[30, 60, 120, 240, 480, 900, 1800, 3600]) self.started_nodes: Counter = Counter( "started_nodes", "Number of nodes started", From 849c1baf85f527544010fc8dda32257e063ddb57 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 09:22:17 -0700 Subject: [PATCH 10/32] better descriptions --- python/ray/autoscaler/_private/prom_metrics.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/ray/autoscaler/_private/prom_metrics.py b/python/ray/autoscaler/_private/prom_metrics.py index 33bb16b732360..5128d4d4a5359 100644 --- a/python/ray/autoscaler/_private/prom_metrics.py +++ b/python/ray/autoscaler/_private/prom_metrics.py @@ -10,35 +10,35 @@ class AutoscalerPrometheusMetrics: def __init__(self, registry: CollectorRegistry): self.registry: CollectorRegistry = registry # Buckets: 30 seconds, 1 minute, 2 minutes, 4 minutes - # 8 minutes, 15 minutes, 30 minutes, 1 hour + # 8 minutes, 15 minutes, 30 minutes, 1 hour self.worker_startup_time: Histogram = Histogram( "worker_startup_time_seconds", - "Worker startup time", + "Worker startup time.", unit="seconds", namespace="autoscaler", registry=self.registry, buckets=[30, 60, 120, 240, 480, 900, 1800, 3600]) self.started_nodes: Counter = Counter( "started_nodes", - "Number of nodes started", + "Number of nodes started.", unit="nodes", namespace="autoscaler", registry=self.registry) self.stopped_nodes: Counter = Counter( "stopped_nodes", - "Number of nodes stopped", + "Number of nodes stopped.", unit="nodes", namespace="autoscaler", registry=self.registry) self.running_nodes: Gauge = Gauge( "running_nodes", - "Number of nodes running", + "Number of nodes running.", unit="nodes", namespace="autoscaler", registry=self.registry) self.exceptions: Counter = Counter( "exceptions", - "Number of exceptions", + "Number of exceptions raised during execution of autoscaler.", unit="exceptions", namespace="autoscaler", registry=self.registry) From 6f58b812764c3e1e0b454168a8b67f784a80792a Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 11:12:07 -0700 Subject: [PATCH 11/32] more lint --- python/ray/_private/metrics_agent.py | 8 +++----- python/ray/autoscaler/_private/monitor.py | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/python/ray/_private/metrics_agent.py b/python/ray/_private/metrics_agent.py index 50bdbf2e8784b..118f2385cc5ba 100644 --- a/python/ray/_private/metrics_agent.py +++ b/python/ray/_private/metrics_agent.py @@ -24,7 +24,6 @@ from ray._private import services import ray._private.prometheus_exporter as prometheus_exporter -from ray.autoscaler._private.constants import AUTOSCALER_METRIC_PORT from ray.core.generated.metrics_pb2 import Metric logger = logging.getLogger(__name__) @@ -204,10 +203,9 @@ def get_file_discovery_content(self): # TODO(ckw): store monitor_ip in RayCluster scenario redis_client = services.create_redis_client(self.redis_address, self.redis_password) - monitor_ip = redis_client.get("autoscaler_metrics_address") - if monitor_ip: - autoscaler_export_addr = f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}" - metrics_export_addresses.append(autoscaler_export_addr) + autoscaler_addr = redis_client.get("autoscaler_metrics_address") + if autoscaler_addr: + metrics_export_addresses.append(autoscaler_addr) return json.dumps([{ "labels": { "job": "ray" diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index d2bfde4fa98fb..21ff60cf27b0f 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -367,7 +367,7 @@ def run(self): "--monitor-ip", required=False, type=str, - default="", + default=None, help="The IP address of the machine hosting the monitor process") args = parser.parse_args() setup_component_logger( From 9050bab4c6d7951496c52c9bc9eb5e8730ad128e Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 11:16:09 -0700 Subject: [PATCH 12/32] propogate exception when starting prom http --- python/ray/autoscaler/_private/autoscaler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 504eb7e5eacdc..49ca846d69bb1 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -153,10 +153,10 @@ def __init__(self, AUTOSCALER_METRIC_PORT, registry=self.prom_metrics.registry) _metric_server_started = True - except Exception: + except Exception as e: logger.error( "An error occurred while starting the metrics server.") - raise Exception + raise e logger.info("StandardAutoscaler: {}".format(self.config)) From 7be69c1ec592292310d1d978cad37b6c728c2ab4 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 11:49:12 -0700 Subject: [PATCH 13/32] lint --- python/ray/autoscaler/_private/monitor.py | 6 +++--- python/ray/node.py | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index 21ff60cf27b0f..3acbb1dca4615 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -4,7 +4,6 @@ import logging import logging.handlers import os -from python.ray.autoscaler._private.constants import AUTOSCALER_METRIC_PORT import sys import signal import time @@ -18,7 +17,8 @@ import ray from ray.autoscaler._private.autoscaler import StandardAutoscaler from ray.autoscaler._private.commands import teardown_cluster -from ray.autoscaler._private.constants import AUTOSCALER_UPDATE_INTERVAL_S +from ray.autoscaler._private.constants import AUTOSCALER_UPDATE_INTERVAL_S, \ + AUTOSCALER_METRIC_PORT from ray.autoscaler._private.event_summarizer import EventSummarizer from ray.autoscaler._private.load_metrics import LoadMetrics from ray.autoscaler._private.constants import \ @@ -101,7 +101,7 @@ def __init__(self, self.redis = ray._private.services.create_redis_client( redis_address, password=redis_password) if monitor_ip: - self.redis.put("autoscaler_metrics_port", + self.redis.set("autoscaler_metrics_port", f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}") (ip, port) = redis_address.split(":") self.gcs_client = connect_to_gcs(ip, int(port), redis_password) diff --git a/python/ray/node.py b/python/ray/node.py index c575f74ca4145..e7abeef911d02 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -831,8 +831,7 @@ def start_monitor(self): fate_share=self.kernel_fate_share, max_bytes=self.max_bytes, backup_count=self.backup_count, - monitor_ip=self._node_ip_address, - ) + monitor_ip=self._node_ip_address) assert ray_constants.PROCESS_TYPE_MONITOR not in self.all_processes self.all_processes[ray_constants.PROCESS_TYPE_MONITOR] = [process_info] From 8f682b4e6a52a438042157ae12b8b86f62b9214d Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 12:35:28 -0700 Subject: [PATCH 14/32] fix redis set/get --- python/ray/_private/metrics_agent.py | 4 ++-- python/ray/autoscaler/_private/monitor.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/_private/metrics_agent.py b/python/ray/_private/metrics_agent.py index 118f2385cc5ba..82171c7d48160 100644 --- a/python/ray/_private/metrics_agent.py +++ b/python/ray/_private/metrics_agent.py @@ -203,9 +203,9 @@ def get_file_discovery_content(self): # TODO(ckw): store monitor_ip in RayCluster scenario redis_client = services.create_redis_client(self.redis_address, self.redis_password) - autoscaler_addr = redis_client.get("autoscaler_metrics_address") + autoscaler_addr = redis_client.get("AutoscalerMetricsAddress") if autoscaler_addr: - metrics_export_addresses.append(autoscaler_addr) + metrics_export_addresses.append(autoscaler_addr.decode("utf-8")) return json.dumps([{ "labels": { "job": "ray" diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index 3acbb1dca4615..066790caf6111 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -101,7 +101,7 @@ def __init__(self, self.redis = ray._private.services.create_redis_client( redis_address, password=redis_password) if monitor_ip: - self.redis.set("autoscaler_metrics_port", + self.redis.set("AutoscalerMetricsAddress", f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}") (ip, port) = redis_address.split(":") self.gcs_client = connect_to_gcs(ip, int(port), redis_password) From 51f7a5cce113fd4043a85599ea3c88ea59270cf3 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 14:24:25 -0700 Subject: [PATCH 15/32] move start_http to monitor.py --- python/ray/autoscaler/_private/autoscaler.py | 29 +++---------------- python/ray/autoscaler/_private/monitor.py | 15 +++++++++- .../ray/autoscaler/_private/node_launcher.py | 2 +- .../ray/autoscaler/_private/prom_metrics.py | 9 ++---- 4 files changed, 22 insertions(+), 33 deletions(-) diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 49ca846d69bb1..c5c9c1fc89f69 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -1,5 +1,5 @@ from collections import defaultdict, namedtuple, Counter -from ray.autoscaler._private.prom_metrics import DEFAULT_AUTOSCALER_METRICS +from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics from typing import Any, Optional, Dict, List from urllib3.exceptions import MaxRetryError import copy @@ -7,7 +7,6 @@ import math import operator import os -import prometheus_client import subprocess import threading import time @@ -34,8 +33,7 @@ format_info_string from ray.autoscaler._private.constants import AUTOSCALER_MAX_NUM_FAILURES, \ AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \ - AUTOSCALER_METRIC_PORT, AUTOSCALER_UPDATE_INTERVAL_S, \ - AUTOSCALER_HEARTBEAT_TIMEOUT_S + AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S from six.moves import queue logger = logging.getLogger(__name__) @@ -50,10 +48,6 @@ "AutoscalerSummary", ["active_nodes", "pending_nodes", "pending_launches", "failed_nodes"]) -# Prevent multiple metric servers from starting if multiple autoscalers -# are instantiated, for example when testing -_metric_server_started = False - class StandardAutoscaler: """The autoscaling control loop for a Ray cluster. @@ -92,7 +86,8 @@ def __init__(self, # Keep this before self.reset (if an exception occurs in reset # then prom_metrics must be instantitiated to increment the # exception counter) - self.prom_metrics = prom_metrics or DEFAULT_AUTOSCALER_METRICS + self.prom_metrics = prom_metrics or \ + AutoscalerPrometheusMetrics() self.resource_demand_scheduler = None self.reset(errors_fatal=True) self.head_node_ip = load_metrics.local_ip @@ -142,22 +137,6 @@ def __init__(self, for local_path in self.config["file_mounts"].values(): assert os.path.exists(local_path) - - global _metric_server_started - if not _metric_server_started: - try: - logger.info( - "Starting autoscaler metrics server on port {}".format( - AUTOSCALER_METRIC_PORT)) - prometheus_client.start_http_server( - AUTOSCALER_METRIC_PORT, - registry=self.prom_metrics.registry) - _metric_server_started = True - except Exception as e: - logger.error( - "An error occurred while starting the metrics server.") - raise e - logger.info("StandardAutoscaler: {}".format(self.config)) def update(self): diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index 066790caf6111..3832c3e1d8ff7 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -13,6 +13,7 @@ from typing import Optional import grpc +import prometheus_client import ray from ray.autoscaler._private.autoscaler import StandardAutoscaler @@ -21,6 +22,7 @@ AUTOSCALER_METRIC_PORT from ray.autoscaler._private.event_summarizer import EventSummarizer from ray.autoscaler._private.load_metrics import LoadMetrics +from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics from ray.autoscaler._private.constants import \ AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE from ray.autoscaler._private.util import DEBUG_AUTOSCALING_STATUS, \ @@ -130,6 +132,16 @@ def __init__(self, self.autoscaling_config = autoscaling_config self.autoscaler = None + self.prom_metrics = AutoscalerPrometheusMetrics() + try: + logger.info("Starting autoscaler metrics server on port {}".format( + AUTOSCALER_METRIC_PORT)) + prometheus_client.start_http_server( + AUTOSCALER_METRIC_PORT, registry=self.prom_metrics.registry) + except Exception: + logger.exception( + "An exception occurred while starting the metrics server.") + logger.info("Monitor: Started") def __del__(self): @@ -141,7 +153,8 @@ def _initialize_autoscaler(self): self.autoscaling_config, self.load_metrics, prefix_cluster_info=self.prefix_cluster_info, - event_summarizer=self.event_summarizer) + event_summarizer=self.event_summarizer, + prom_metrics=self.prom_metrics) def update_load_metrics(self): """Fetches resource usage data from GCS and updates load metrics.""" diff --git a/python/ray/autoscaler/_private/node_launcher.py b/python/ray/autoscaler/_private/node_launcher.py index c0f31d9d65ac6..4068ac152cd01 100644 --- a/python/ray/autoscaler/_private/node_launcher.py +++ b/python/ray/autoscaler/_private/node_launcher.py @@ -69,7 +69,7 @@ def _launch_node(self, config: Dict[str, Any], count: int, # took all nodes to start up for each node. For example, if 4 # nodes were launched in 25 seconds, we would observe the 25 # second startup time 4 times. - self.prom_metrics.startup_time.observe(startup_time) + self.prom_metrics.worker_startup_time.observe(startup_time) self.prom_metrics.started_nodes.inc(count) after = self.provider.non_terminated_nodes(tag_filters=worker_filter) if set(after).issubset(before): diff --git a/python/ray/autoscaler/_private/prom_metrics.py b/python/ray/autoscaler/_private/prom_metrics.py index 5128d4d4a5359..52f600a08a8c3 100644 --- a/python/ray/autoscaler/_private/prom_metrics.py +++ b/python/ray/autoscaler/_private/prom_metrics.py @@ -7,8 +7,9 @@ class AutoscalerPrometheusMetrics: - def __init__(self, registry: CollectorRegistry): - self.registry: CollectorRegistry = registry + def __init__(self, registry: CollectorRegistry = None): + self.registry: CollectorRegistry = registry or \ + CollectorRegistry(auto_describe=True) # Buckets: 30 seconds, 1 minute, 2 minutes, 4 minutes # 8 minutes, 15 minutes, 30 minutes, 1 hour self.worker_startup_time: Histogram = Histogram( @@ -42,7 +43,3 @@ def __init__(self, registry: CollectorRegistry): unit="exceptions", namespace="autoscaler", registry=self.registry) - - -DEFAULT_AUTOSCALER_METRICS = AutoscalerPrometheusMetrics( - CollectorRegistry(auto_describe=True)) From 4259ecdc09e0e66543d672cac4b569114461c96c Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 15:01:36 -0700 Subject: [PATCH 16/32] break up exception types and add pending_nodes metric --- python/ray/autoscaler/_private/autoscaler.py | 8 +++-- .../ray/autoscaler/_private/node_launcher.py | 3 +- .../ray/autoscaler/_private/prom_metrics.py | 32 +++++++++++++++++-- 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index c5c9c1fc89f69..6aa123c61f051 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -144,7 +144,7 @@ def update(self): self.reset(errors_fatal=False) self._update() except Exception as e: - self.prom_metrics.exceptions.inc() + self.prom_metrics.update_loop_exceptions.inc() logger.exception("StandardAutoscaler: " "Error during autoscaling.") # Don't abort the autoscaler if the K8s API server is down. @@ -478,7 +478,7 @@ def reset(self, errors_fatal=False): try: validate_config(new_config) except Exception as e: - self.prom_metrics.exceptions.inc() + self.prom_metrics.config_validation_exceptions().inc() logger.debug( "Cluster config validation failed. The version of " "the ray CLI you launched this cluster with may " @@ -542,7 +542,7 @@ def reset(self, errors_fatal=False): upscaling_speed) except Exception as e: - self.prom_metrics.exceptions.inc() + self.prom_metrics.reset_exceptions.inc() if errors_fatal: raise e else: @@ -735,6 +735,7 @@ def launch_new_node(self, count: int, node_type: Optional[str]) -> None: quantity=count, aggregate=operator.add) self.pending_launches.inc(node_type, count) + self.prom_metrics.set(self.pending_launches.value) config = copy.deepcopy(self.config) # Split into individual launch requests of the max batch size. while count > 0: @@ -763,6 +764,7 @@ def kill_workers(self): self.provider.terminate_nodes(nodes) for node in nodes: self.node_tracker.untrack(node) + self.prom_metrics.stopped_nodes.inc() logger.error("StandardAutoscaler: terminated {} node(s)".format( len(nodes))) diff --git a/python/ray/autoscaler/_private/node_launcher.py b/python/ray/autoscaler/_private/node_launcher.py index 4068ac152cd01..7e48960986a0a 100644 --- a/python/ray/autoscaler/_private/node_launcher.py +++ b/python/ray/autoscaler/_private/node_launcher.py @@ -83,10 +83,11 @@ def run(self): self._launch_node(config, count, node_type) except Exception: if self.prom_metrics: - self.prom_metrics.exceptions.inc() + self.prom_metrics.node_launch_exceptions.inc() logger.exception("Launch failed") finally: self.pending.dec(node_type, count) + self.pending_nodes.set(self.pending.value) def log(self, statement): prefix = "NodeLauncher{}:".format(self.index) diff --git a/python/ray/autoscaler/_private/prom_metrics.py b/python/ray/autoscaler/_private/prom_metrics.py index 52f600a08a8c3..1747ceba2c4bc 100644 --- a/python/ray/autoscaler/_private/prom_metrics.py +++ b/python/ray/autoscaler/_private/prom_metrics.py @@ -19,6 +19,12 @@ def __init__(self, registry: CollectorRegistry = None): namespace="autoscaler", registry=self.registry, buckets=[30, 60, 120, 240, 480, 900, 1800, 3600]) + self.pending_nodes: Gauge = Gauge( + "pending_nodes", + "Number of nodes pending to be started.", + unit="nodes", + namespace="autoscaler", + registry=self.registry) self.started_nodes: Counter = Counter( "started_nodes", "Number of nodes started.", @@ -37,9 +43,29 @@ def __init__(self, registry: CollectorRegistry = None): unit="nodes", namespace="autoscaler", registry=self.registry) - self.exceptions: Counter = Counter( - "exceptions", - "Number of exceptions raised during execution of autoscaler.", + self.update_loop_exceptions: Counter = Counter( + "update_loop_exceptions", + "Number of exceptions raised in the update loop of th e" + "autoscaler.", + unit="exceptions", + namespace="autoscaler", + registry=self.registry) + self.node_launch_exceptions: Counter = Counter( + "node_launch_exceptions", + "Number of exceptions raised while launching nodes.", + unit="exceptions", + namespace="autoscaler", + registry=self.registry) + self.reset_exceptions: Counter = Counter( + "reset_exceptions", + "Number of exceptions raised while resetting the autoscaler.", + unit="exceptions", + namespace="autoscaler", + registry=self.registry) + self.config_validation_exceptions: Counter = Counter( + "config_validation_exceptions", + "Number of exceptions raised while validating the config " + "during a reset", unit="exceptions", namespace="autoscaler", registry=self.registry) From bb9896e94f81cf45a7ef501a6eed688c571ae292 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 15:13:07 -0700 Subject: [PATCH 17/32] Adjust buckets, fix test_autoscaler failures --- python/ray/_private/metrics_agent.py | 1 - python/ray/autoscaler/_private/autoscaler.py | 4 ++-- python/ray/autoscaler/_private/node_launcher.py | 2 +- python/ray/autoscaler/_private/prom_metrics.py | 9 +++++---- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/python/ray/_private/metrics_agent.py b/python/ray/_private/metrics_agent.py index 82171c7d48160..0cc674401c902 100644 --- a/python/ray/_private/metrics_agent.py +++ b/python/ray/_private/metrics_agent.py @@ -200,7 +200,6 @@ def get_file_discovery_content(self): node["MetricsExportPort"]) for node in nodes if node["alive"] is True ] - # TODO(ckw): store monitor_ip in RayCluster scenario redis_client = services.create_redis_client(self.redis_address, self.redis_password) autoscaler_addr = redis_client.get("AutoscalerMetricsAddress") diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 6aa123c61f051..7c8b20fb29fef 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -478,7 +478,7 @@ def reset(self, errors_fatal=False): try: validate_config(new_config) except Exception as e: - self.prom_metrics.config_validation_exceptions().inc() + self.prom_metrics.config_validation_exceptions.inc() logger.debug( "Cluster config validation failed. The version of " "the ray CLI you launched this cluster with may " @@ -735,7 +735,7 @@ def launch_new_node(self, count: int, node_type: Optional[str]) -> None: quantity=count, aggregate=operator.add) self.pending_launches.inc(node_type, count) - self.prom_metrics.set(self.pending_launches.value) + self.prom_metrics.pending_nodes.set(self.pending_launches.value) config = copy.deepcopy(self.config) # Split into individual launch requests of the max batch size. while count > 0: diff --git a/python/ray/autoscaler/_private/node_launcher.py b/python/ray/autoscaler/_private/node_launcher.py index 7e48960986a0a..f1236aa72f3b8 100644 --- a/python/ray/autoscaler/_private/node_launcher.py +++ b/python/ray/autoscaler/_private/node_launcher.py @@ -87,7 +87,7 @@ def run(self): logger.exception("Launch failed") finally: self.pending.dec(node_type, count) - self.pending_nodes.set(self.pending.value) + self.prom_metrics.pending_nodes.set(self.pending.value) def log(self, statement): prefix = "NodeLauncher{}:".format(self.index) diff --git a/python/ray/autoscaler/_private/prom_metrics.py b/python/ray/autoscaler/_private/prom_metrics.py index 1747ceba2c4bc..999f113ba3be7 100644 --- a/python/ray/autoscaler/_private/prom_metrics.py +++ b/python/ray/autoscaler/_private/prom_metrics.py @@ -10,15 +10,16 @@ class AutoscalerPrometheusMetrics: def __init__(self, registry: CollectorRegistry = None): self.registry: CollectorRegistry = registry or \ CollectorRegistry(auto_describe=True) - # Buckets: 30 seconds, 1 minute, 2 minutes, 4 minutes - # 8 minutes, 15 minutes, 30 minutes, 1 hour + # Buckets: 30 seconds, 1 minute, 2 minutes, 4 minutes, + # 6 minutes, 8 minutes, 10 minutes, 12 minutes, + # 15 minutes, 20 minutes, 30 minutes, 1 hour self.worker_startup_time: Histogram = Histogram( "worker_startup_time_seconds", "Worker startup time.", unit="seconds", namespace="autoscaler", registry=self.registry, - buckets=[30, 60, 120, 240, 480, 900, 1800, 3600]) + buckets=[30, 60, 120, 240, 360, 480, 600, 720, 900, 1800, 3600]) self.pending_nodes: Gauge = Gauge( "pending_nodes", "Number of nodes pending to be started.", @@ -45,7 +46,7 @@ def __init__(self, registry: CollectorRegistry = None): registry=self.registry) self.update_loop_exceptions: Counter = Counter( "update_loop_exceptions", - "Number of exceptions raised in the update loop of th e" + "Number of exceptions raised in the update loop of the " "autoscaler.", unit="exceptions", namespace="autoscaler", From 091610a0130656723cf33d8dd10341ae76f984b4 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 16:03:14 -0700 Subject: [PATCH 18/32] Add metric_agent tests --- python/ray/tests/test_metrics_agent.py | 32 ++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index 6026d711069c8..fcb845094df59 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -9,6 +9,7 @@ import pytest import ray +from ray.autoscaler._private.constants import AUTOSCALER_METRIC_PORT from ray.ray_constants import PROMETHEUS_SERVICE_DISCOVERY_FILE from ray._private.metrics_agent import PrometheusServiceDiscoveryWriter from ray.util.metrics import Counter, Histogram, Gauge @@ -48,6 +49,14 @@ "ray_outbound_heartbeat_size_kb_sum", ] +_AUTOSCALER_METRICS = [ + "autoscaler_config_validation_exceptions", + "autoscaler_node_launch_exceptions", "autoscaler_pending_nodes", + "autoscaler_reset_exceptions", "autoscaler_running_nodes", + "autoscaler_started_nodes", "autoscaler_stopped_nodes", + "autoscaler_update_loop_exceptions", "autoscaler_worker_startup_time" +] + @pytest.fixture def _setup_cluster_for_test(ray_start_cluster): @@ -94,8 +103,9 @@ async def ping(self): metrics_export_port = node_info["MetricsExportPort"] addr = node_info["NodeManagerAddress"] prom_addresses.append(f"{addr}:{metrics_export_port}") - - yield prom_addresses + autoscaler_export_addr = "{}:{}".format(cluster.head_node.node_ip_address, + AUTOSCALER_METRIC_PORT) + yield prom_addresses, autoscaler_export_addr ray.get(worker_should_exit.send.remote()) ray.get(obj_refs) @@ -107,7 +117,7 @@ async def ping(self): def test_metrics_export_end_to_end(_setup_cluster_for_test): TEST_TIMEOUT_S = 20 - prom_addresses = _setup_cluster_for_test + prom_addresses, autoscaler_export_addr = _setup_cluster_for_test def test_cases(): components_dict, metric_names, metric_samples = fetch_prometheus( @@ -165,6 +175,16 @@ def test_cases(): assert hist_count == 1 assert hist_sum == 1.5 + # Autoscaler metrics + _, autoscaler_metric_names, _ = fetch_prometheus( + [autoscaler_export_addr]) + for metric in _AUTOSCALER_METRICS: + # Metric name should appear with some suffix (_count, _total, + # etc...) in the list of all names + assert any(name.startswith(metric) for name in + autoscaler_metric_names), \ + f"{metric} not in {autoscaler_metric_names}" + def wrap_test_case_for_retry(): try: test_cases() @@ -197,10 +217,14 @@ def test_prometheus_file_based_service_discovery(ray_start_cluster): redis_address, ray.ray_constants.REDIS_DEFAULT_PASSWORD, "/tmp/ray") def get_metrics_export_address_from_node(nodes): - return [ + node_export_addrs = [ "{}:{}".format(node.node_ip_address, node.metrics_export_port) for node in nodes ] + # monitor should is run on head node for `ray_start_cluster` fixture + autoscaler_export_addr = "{}:{}".format( + cluster.head_node.node_ip_address, AUTOSCALER_METRIC_PORT) + return node_export_addrs + [autoscaler_export_addr] loaded_json_data = json.loads(writer.get_file_discovery_content())[0] assert (set(get_metrics_export_address_from_node(nodes)) == set( From ee005b1d9003236b50b505119339da7efac487af Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 16:05:26 -0700 Subject: [PATCH 19/32] explain _AUTOSCALER_METRICS --- python/ray/tests/test_metrics_agent.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index fcb845094df59..0e3481baf0d8c 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -49,6 +49,8 @@ "ray_outbound_heartbeat_size_kb_sum", ] +# This list of metrics should be kept in sync with +# ray/python/ray/autoscaler/_private/prom_metrics.py _AUTOSCALER_METRICS = [ "autoscaler_config_validation_exceptions", "autoscaler_node_launch_exceptions", "autoscaler_pending_nodes", From 1d821671d225deae23dc29531beb89b44c448d20 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 16:56:42 -0700 Subject: [PATCH 20/32] add basic exception count checks --- python/ray/tests/test_autoscaler.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 23d95abcb4edc..b463470b222ad 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -19,6 +19,7 @@ from ray.autoscaler.sdk import get_docker_host_mount_location from ray.autoscaler._private.load_metrics import LoadMetrics from ray.autoscaler._private.autoscaler import StandardAutoscaler +from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics from ray.autoscaler._private.providers import ( _NODE_PROVIDERS, _clear_provider_cache, _DEFAULT_CONFIGS) from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_NODE_STATUS, \ @@ -1368,6 +1369,7 @@ def testIgnoresCorruptedConfig(self): }, 1) lm = LoadMetrics() lm.update("172.0.0.0", {"CPU": 1}, {"CPU": 0}, {}) + mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) autoscaler = StandardAutoscaler( config_path, lm, @@ -1375,14 +1377,17 @@ def testIgnoresCorruptedConfig(self): max_concurrent_launches=10, process_runner=runner, max_failures=0, - update_interval_s=0) + update_interval_s=0, + prom_metrics=mock_metrics) autoscaler.update() self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) # Write a corrupted config self.write_config("asdf", call_prepare_config=False) - for _ in range(10): + for i in range(10): autoscaler.update() + # config validation exceptions metrics should be incremented + assert mock_metrics.config_validation_exceptions.inc.call_count == 10 time.sleep(0.1) assert autoscaler.pending_launches.value == 0 assert len( @@ -1409,14 +1414,18 @@ def testMaxFailures(self): self.provider = MockProvider() self.provider.throw = True runner = MockProcessRunner() + mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) autoscaler = StandardAutoscaler( config_path, LoadMetrics(), max_failures=2, process_runner=runner, - update_interval_s=0) + update_interval_s=0, + prom_metrics=mock_metrics) autoscaler.update() + assert mock_metrics.update_loop_exceptions.inc.call_count == 1 autoscaler.update() + assert mock_metrics.update_loop_exceptions.inc.call_count == 2 with pytest.raises(Exception): autoscaler.update() From a43b38eb04375570ab2a915543750ca33de31dff Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 17:46:34 -0700 Subject: [PATCH 21/32] more autoscaler metric tests --- python/ray/tests/test_autoscaler.py | 17 +++++++++++++++-- python/ray/tests/test_metrics_agent.py | 2 +- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index b463470b222ad..0d7db477f1934 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -879,18 +879,27 @@ def testScaleUp(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() runner = MockProcessRunner() + mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) autoscaler = StandardAutoscaler( config_path, LoadMetrics(), max_failures=0, process_runner=runner, - update_interval_s=0) + update_interval_s=0, + prom_metrics=mock_metrics) assert len(self.provider.non_terminated_nodes({})) == 0 autoscaler.update() self.waitForNodes(2) + + # started_nodes metric should have been incremented by 2 + assert mock_metrics.started_nodes.inc.call_count == 1 + mock_metrics.started_nodes.inc.assert_called_with(2) autoscaler.update() self.waitForNodes(2) + # running_nodes metric should be set to 2 + mock_metrics.running_nodes.set.assert_called_with(2) + def testTerminateOutdatedNodesGracefully(self): config = SMALL_CLUSTER.copy() config["min_workers"] = 5 @@ -904,12 +913,14 @@ def testTerminateOutdatedNodesGracefully(self): }, 10) runner = MockProcessRunner() runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)]) + mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) autoscaler = StandardAutoscaler( config_path, LoadMetrics(), max_failures=0, process_runner=runner, - update_interval_s=0) + update_interval_s=0, + prom_metrics=mock_metrics) self.waitForNodes(10) # Gradually scales down to meet target size, never going too low @@ -926,6 +937,8 @@ def testTerminateOutdatedNodesGracefully(self): events = autoscaler.event_summarizer.summary() assert ("Removing 10 nodes of type " "ray-legacy-worker-node-type (outdated)." in events), events + assert mock_metrics.stopped_nodes.inc.call_count == 10 + mock_metrics.started_nodes.inc.assert_called_with(5) def testDynamicScaling(self): config_path = self.write_config(SMALL_CLUSTER) diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index 0e3481baf0d8c..11a25c057ed2d 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -223,7 +223,7 @@ def get_metrics_export_address_from_node(nodes): "{}:{}".format(node.node_ip_address, node.metrics_export_port) for node in nodes ] - # monitor should is run on head node for `ray_start_cluster` fixture + # monitor should be run on head node for `ray_start_cluster` fixture autoscaler_export_addr = "{}:{}".format( cluster.head_node.node_ip_address, AUTOSCALER_METRIC_PORT) return node_export_addrs + [autoscaler_export_addr] From 25f55dcfddd5a88d80f4393fb54d515f59847f07 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 18:05:01 -0700 Subject: [PATCH 22/32] less dangerous way to handle no prom_metrics --- .../ray/autoscaler/_private/node_launcher.py | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/python/ray/autoscaler/_private/node_launcher.py b/python/ray/autoscaler/_private/node_launcher.py index f1236aa72f3b8..2b9335bf18717 100644 --- a/python/ray/autoscaler/_private/node_launcher.py +++ b/python/ray/autoscaler/_private/node_launcher.py @@ -8,6 +8,7 @@ TAG_RAY_NODE_KIND, TAG_RAY_NODE_NAME, TAG_RAY_USER_NODE_TYPE, STATUS_UNINITIALIZED, NODE_KIND_WORKER) +from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics from ray.autoscaler._private.util import hash_launch_conf logger = logging.getLogger(__name__) @@ -27,7 +28,7 @@ def __init__(self, **kwargs): self.queue = queue self.pending = pending - self.prom_metrics = prom_metrics + self.prom_metrics = prom_metrics or AutoscalerPrometheusMetrics() self.provider = provider self.node_types = node_types self.index = str(index) if index is not None else "" @@ -63,14 +64,13 @@ def _launch_node(self, config: Dict[str, Any], count: int, launch_start_time = time.time() self.provider.create_node(node_config, node_tags, count) startup_time = time.time() - launch_start_time - if self.prom_metrics: - for _ in range(count): - # Note: when launching multiple nodes we observe the time it - # took all nodes to start up for each node. For example, if 4 - # nodes were launched in 25 seconds, we would observe the 25 - # second startup time 4 times. - self.prom_metrics.worker_startup_time.observe(startup_time) - self.prom_metrics.started_nodes.inc(count) + for _ in range(count): + # Note: when launching multiple nodes we observe the time it + # took all nodes to start up for each node. For example, if 4 + # nodes were launched in 25 seconds, we would observe the 25 + # second startup time 4 times. + self.prom_metrics.worker_startup_time.observe(startup_time) + self.prom_metrics.started_nodes.inc(count) after = self.provider.non_terminated_nodes(tag_filters=worker_filter) if set(after).issubset(before): self.log("No new nodes reported after node creation.") @@ -82,8 +82,7 @@ def run(self): try: self._launch_node(config, count, node_type) except Exception: - if self.prom_metrics: - self.prom_metrics.node_launch_exceptions.inc() + self.prom_metrics.node_launch_exceptions.inc() logger.exception("Launch failed") finally: self.pending.dec(node_type, count) From f7013c2b4f51c92694752ece441e7b84235c27c3 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 18:05:28 -0700 Subject: [PATCH 23/32] more mock checks --- python/ray/tests/test_autoscaler.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 0d7db477f1934..e37f18ef61323 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -952,6 +952,7 @@ def testDynamicScaling(self): TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD }, 1) lm.update("172.0.0.0", {"CPU": 1}, {"CPU": 0}, {}) + mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) autoscaler = StandardAutoscaler( config_path, lm, @@ -959,7 +960,8 @@ def testDynamicScaling(self): max_concurrent_launches=5, max_failures=0, process_runner=runner, - update_interval_s=0) + update_interval_s=0, + prom_metrics=mock_metrics) self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) autoscaler.update() self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) @@ -991,6 +993,8 @@ def testDynamicScaling(self): events = autoscaler.event_summarizer.summary() assert ("Removing 1 nodes of type " "ray-legacy-worker-node-type (max workers)." in events), events + assert mock_metrics.stopped_nodes.inc.call_count == 1 + mock_metrics.running_nodes.set.assert_called_with(10) def testInitialWorkers(self): """initial_workers is deprecated, this tests that it is ignored.""" @@ -1295,6 +1299,7 @@ def testDelayedLaunchWithMinWorkers(self): self.provider = MockProvider() runner = MockProcessRunner() runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)]) + mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) autoscaler = StandardAutoscaler( config_path, LoadMetrics(), @@ -1302,7 +1307,8 @@ def testDelayedLaunchWithMinWorkers(self): max_concurrent_launches=8, max_failures=0, process_runner=runner, - update_interval_s=0) + update_interval_s=0, + prom_metrics=mock_metrics) assert len(self.provider.non_terminated_nodes({})) == 0 # update() should launch a wave of 5 nodes (max_launch_batch) @@ -1314,6 +1320,7 @@ def testDelayedLaunchWithMinWorkers(self): waiters = rtc1._cond._waiters self.waitFor(lambda: len(waiters) == 2) assert autoscaler.pending_launches.value == 10 + mock_metrics.pending_nodes.set.assert_called_with(10) assert len(self.provider.non_terminated_nodes({})) == 0 autoscaler.update() self.waitForNodes(0) # Nodes are not added on top of pending. @@ -1322,9 +1329,11 @@ def testDelayedLaunchWithMinWorkers(self): assert len(self.provider.non_terminated_nodes({})) == 10 self.waitForNodes(10) assert autoscaler.pending_launches.value == 0 + mock_metrics.pending_nodes.set.assert_called_with(0) autoscaler.update() self.waitForNodes(10) assert autoscaler.pending_launches.value == 0 + mock_metrics.pending_nodes.set.assert_called_with(0) def testUpdateThrottling(self): config_path = self.write_config(SMALL_CLUSTER) From b2bd1e539b25950acb5c4c5cd1d62ed4bda4bc8d Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 18:18:36 -0700 Subject: [PATCH 24/32] better docs --- python/ray/autoscaler/_private/prom_metrics.py | 8 +++++++- python/ray/tests/test_autoscaler.py | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/python/ray/autoscaler/_private/prom_metrics.py b/python/ray/autoscaler/_private/prom_metrics.py index 999f113ba3be7..210be7b17d1f8 100644 --- a/python/ray/autoscaler/_private/prom_metrics.py +++ b/python/ray/autoscaler/_private/prom_metrics.py @@ -6,6 +6,8 @@ ) +# The metrics in this class should be kept in sync with +# python/ray/tests/test_metrics_agent.py class AutoscalerPrometheusMetrics: def __init__(self, registry: CollectorRegistry = None): self.registry: CollectorRegistry = registry or \ @@ -15,7 +17,11 @@ def __init__(self, registry: CollectorRegistry = None): # 15 minutes, 20 minutes, 30 minutes, 1 hour self.worker_startup_time: Histogram = Histogram( "worker_startup_time_seconds", - "Worker startup time.", + "Worker startup time. Note that when nodes are launched in " + "batches, the startup time for that batch will be observed " + "once for *each* node in that batch. For example, if 8 nodes " + "are launched in 3 minutes, a startup time of 3 minutes will " + "be observed 8 times.", unit="seconds", namespace="autoscaler", registry=self.registry, diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index e37f18ef61323..560396b5d738e 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -1406,9 +1406,9 @@ def testIgnoresCorruptedConfig(self): # Write a corrupted config self.write_config("asdf", call_prepare_config=False) - for i in range(10): + for _ in range(10): autoscaler.update() - # config validation exceptions metrics should be incremented + # config validation exceptions metrics should be incremented 10 times assert mock_metrics.config_validation_exceptions.inc.call_count == 10 time.sleep(0.1) assert autoscaler.pending_launches.value == 0 From a0c10f0a47600467239299c8dafca5d8d6b14c37 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 18:24:04 -0700 Subject: [PATCH 25/32] nits --- python/ray/_private/services.py | 4 ++-- python/ray/autoscaler/_private/monitor.py | 2 +- python/ray/autoscaler/_private/prom_metrics.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 9f0c624b7793f..f2f7d6fa774e9 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -1851,8 +1851,8 @@ def start_monitor(redis_address, RotatingFileHandler's maxBytes. backup_count (int): Log rotation parameter. Corresponding to RotatingFileHandler's backupCount. - monitor_ip (str): ip address of the machine that the monitor will be - run on + monitor_ip (str): IP address of the machine that the monitor will be + run on. Can be excluded, but required for autoscaler metrics. Returns: ProcessInfo for the process that was started. """ diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index 3832c3e1d8ff7..39b68aff99edc 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -381,7 +381,7 @@ def run(self): required=False, type=str, default=None, - help="The IP address of the machine hosting the monitor process") + help="The IP address of the machine hosting the monitor process.") args = parser.parse_args() setup_component_logger( logging_level=args.logging_level, diff --git a/python/ray/autoscaler/_private/prom_metrics.py b/python/ray/autoscaler/_private/prom_metrics.py index 210be7b17d1f8..f676e39ee440f 100644 --- a/python/ray/autoscaler/_private/prom_metrics.py +++ b/python/ray/autoscaler/_private/prom_metrics.py @@ -72,7 +72,7 @@ def __init__(self, registry: CollectorRegistry = None): self.config_validation_exceptions: Counter = Counter( "config_validation_exceptions", "Number of exceptions raised while validating the config " - "during a reset", + "during a reset.", unit="exceptions", namespace="autoscaler", registry=self.registry) From c069b8ccb9f60fdd252254bb4a9c561486f227b3 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 18:51:55 -0700 Subject: [PATCH 26/32] cases for started_nodes and worker_startup_time histogram --- python/ray/tests/test_autoscaler.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 560396b5d738e..58154b1d5d284 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -894,6 +894,9 @@ def testScaleUp(self): # started_nodes metric should have been incremented by 2 assert mock_metrics.started_nodes.inc.call_count == 1 mock_metrics.started_nodes.inc.assert_called_with(2) + # two startup times should have been observed + assert mock_metrics.worker_startup_time.observe.call_count == 2 + autoscaler.update() self.waitForNodes(2) @@ -939,6 +942,7 @@ def testTerminateOutdatedNodesGracefully(self): "ray-legacy-worker-node-type (outdated)." in events), events assert mock_metrics.stopped_nodes.inc.call_count == 10 mock_metrics.started_nodes.inc.assert_called_with(5) + mock_metrics.worker_startup_time.observe.call_count == 5 def testDynamicScaling(self): config_path = self.write_config(SMALL_CLUSTER) From 2c18da5a3a3cd402557105203872a3453a23f556 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Thu, 27 May 2021 19:10:38 -0700 Subject: [PATCH 27/32] add node_launch_exceptions case --- python/ray/tests/test_autoscaler.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 58154b1d5d284..509bfe2532036 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -161,6 +161,7 @@ def __init__(self, cache_stopped=False, unique_ips=False): self.mock_nodes = {} self.next_id = 0 self.throw = False + self.error_creates = False self.fail_creates = False self.ready_to_create = threading.Event() self.ready_to_create.set() @@ -213,6 +214,8 @@ def external_ip(self, node_id): return self.mock_nodes[node_id].external_ip def create_node(self, node_config, tags, count, _skip_wait=False): + if self.error_creates: + raise Exception if not _skip_wait: self.ready_to_create.wait() if self.fail_creates: @@ -2388,6 +2391,27 @@ def terminate_worker_zero(): assert set(autoscaler.workers()) == {2, 3},\ "Unexpected node_ids" + def testFlakyProvider(self): + config_path = self.write_config(SMALL_CLUSTER) + self.provider = MockProvider() + self.provider.error_creates = True + runner = MockProcessRunner() + mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) + autoscaler = StandardAutoscaler( + config_path, + LoadMetrics(), + max_failures=0, + process_runner=runner, + update_interval_s=0, + prom_metrics=mock_metrics) + autoscaler.update() + for _ in range(50): + if mock_metrics.node_launch_exceptions.inc.call_count == 1: + break + time.sleep(.1) + assert mock_metrics.node_launch_exceptions.inc.call_count == 1,\ + "Expected to observe a node launch exception" + if __name__ == "__main__": import sys From 216060c2ff3527e621fba84ac9cb8e71eb0cd86a Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Fri, 28 May 2021 09:14:54 -0700 Subject: [PATCH 28/32] use waitFor --- python/ray/tests/test_autoscaler.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 509bfe2532036..6870abaa6acb2 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -2391,7 +2391,7 @@ def terminate_worker_zero(): assert set(autoscaler.workers()) == {2, 3},\ "Unexpected node_ids" - def testFlakyProvider(self): + def testProviderException(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() self.provider.error_creates = True @@ -2405,12 +2405,13 @@ def testFlakyProvider(self): update_interval_s=0, prom_metrics=mock_metrics) autoscaler.update() - for _ in range(50): - if mock_metrics.node_launch_exceptions.inc.call_count == 1: - break - time.sleep(.1) - assert mock_metrics.node_launch_exceptions.inc.call_count == 1,\ - "Expected to observe a node launch exception" + + def exceptions_incremented(): + return mock_metrics.node_launch_exceptions.inc.call_count == 1 + + self.waitFor( + exceptions_incremented, + fail_msg="Expected to see a node launch exception") if __name__ == "__main__": From 377d2c82332885cba8ff4f4860c5968a0983ae7f Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Sun, 30 May 2021 22:30:29 -0700 Subject: [PATCH 29/32] don't start http server if monitor_ip isn't provided --- python/ray/autoscaler/_private/monitor.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index 39b68aff99edc..fcce961e43bb8 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -133,14 +133,18 @@ def __init__(self, self.autoscaler = None self.prom_metrics = AutoscalerPrometheusMetrics() - try: - logger.info("Starting autoscaler metrics server on port {}".format( - AUTOSCALER_METRIC_PORT)) - prometheus_client.start_http_server( - AUTOSCALER_METRIC_PORT, registry=self.prom_metrics.registry) - except Exception: - logger.exception( - "An exception occurred while starting the metrics server.") + if monitor_ip: + # If monitor_ip wasn't passed in, then don't attempt to start the + # metric server to keep behavior identical to before metrics were + # introduced + try: + logger.info("Starting autoscaler metrics server on port {}".format( + AUTOSCALER_METRIC_PORT)) + prometheus_client.start_http_server( + AUTOSCALER_METRIC_PORT, registry=self.prom_metrics.registry) + except Exception: + logger.exception( + "An exception occurred while starting the metrics server.") logger.info("Monitor: Started") From 0f1a6b210e59091e59642095919ece3858534b2a Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Sun, 30 May 2021 22:43:13 -0700 Subject: [PATCH 30/32] drop worker_startup_time --- python/ray/autoscaler/_private/node_launcher.py | 8 -------- python/ray/autoscaler/_private/prom_metrics.py | 14 -------------- python/ray/tests/test_autoscaler.py | 3 --- python/ray/tests/test_metrics_agent.py | 2 +- 4 files changed, 1 insertion(+), 26 deletions(-) diff --git a/python/ray/autoscaler/_private/node_launcher.py b/python/ray/autoscaler/_private/node_launcher.py index 2b9335bf18717..bf112d5515686 100644 --- a/python/ray/autoscaler/_private/node_launcher.py +++ b/python/ray/autoscaler/_private/node_launcher.py @@ -61,15 +61,7 @@ def _launch_node(self, config: Dict[str, Any], count: int, if node_type: node_tags[TAG_RAY_USER_NODE_TYPE] = node_type node_config.update(launch_config) - launch_start_time = time.time() self.provider.create_node(node_config, node_tags, count) - startup_time = time.time() - launch_start_time - for _ in range(count): - # Note: when launching multiple nodes we observe the time it - # took all nodes to start up for each node. For example, if 4 - # nodes were launched in 25 seconds, we would observe the 25 - # second startup time 4 times. - self.prom_metrics.worker_startup_time.observe(startup_time) self.prom_metrics.started_nodes.inc(count) after = self.provider.non_terminated_nodes(tag_filters=worker_filter) if set(after).issubset(before): diff --git a/python/ray/autoscaler/_private/prom_metrics.py b/python/ray/autoscaler/_private/prom_metrics.py index f676e39ee440f..5a16b88ee4400 100644 --- a/python/ray/autoscaler/_private/prom_metrics.py +++ b/python/ray/autoscaler/_private/prom_metrics.py @@ -12,20 +12,6 @@ class AutoscalerPrometheusMetrics: def __init__(self, registry: CollectorRegistry = None): self.registry: CollectorRegistry = registry or \ CollectorRegistry(auto_describe=True) - # Buckets: 30 seconds, 1 minute, 2 minutes, 4 minutes, - # 6 minutes, 8 minutes, 10 minutes, 12 minutes, - # 15 minutes, 20 minutes, 30 minutes, 1 hour - self.worker_startup_time: Histogram = Histogram( - "worker_startup_time_seconds", - "Worker startup time. Note that when nodes are launched in " - "batches, the startup time for that batch will be observed " - "once for *each* node in that batch. For example, if 8 nodes " - "are launched in 3 minutes, a startup time of 3 minutes will " - "be observed 8 times.", - unit="seconds", - namespace="autoscaler", - registry=self.registry, - buckets=[30, 60, 120, 240, 360, 480, 600, 720, 900, 1800, 3600]) self.pending_nodes: Gauge = Gauge( "pending_nodes", "Number of nodes pending to be started.", diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 6870abaa6acb2..d5676ec262c64 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -897,8 +897,6 @@ def testScaleUp(self): # started_nodes metric should have been incremented by 2 assert mock_metrics.started_nodes.inc.call_count == 1 mock_metrics.started_nodes.inc.assert_called_with(2) - # two startup times should have been observed - assert mock_metrics.worker_startup_time.observe.call_count == 2 autoscaler.update() self.waitForNodes(2) @@ -945,7 +943,6 @@ def testTerminateOutdatedNodesGracefully(self): "ray-legacy-worker-node-type (outdated)." in events), events assert mock_metrics.stopped_nodes.inc.call_count == 10 mock_metrics.started_nodes.inc.assert_called_with(5) - mock_metrics.worker_startup_time.observe.call_count == 5 def testDynamicScaling(self): config_path = self.write_config(SMALL_CLUSTER) diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index 11a25c057ed2d..63732f0158cb9 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -56,7 +56,7 @@ "autoscaler_node_launch_exceptions", "autoscaler_pending_nodes", "autoscaler_reset_exceptions", "autoscaler_running_nodes", "autoscaler_started_nodes", "autoscaler_stopped_nodes", - "autoscaler_update_loop_exceptions", "autoscaler_worker_startup_time" + "autoscaler_update_loop_exceptions" ] From a6cc2293bde71320d289a50664c65962382ba559 Mon Sep 17 00:00:00 2001 From: Chris Wong Date: Sun, 30 May 2021 23:46:12 -0700 Subject: [PATCH 31/32] lint --- python/ray/autoscaler/_private/monitor.py | 8 +++++--- python/ray/autoscaler/_private/node_launcher.py | 1 - python/ray/autoscaler/_private/prom_metrics.py | 1 - 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index fcce961e43bb8..cc0d74d1154e5 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -138,10 +138,12 @@ def __init__(self, # metric server to keep behavior identical to before metrics were # introduced try: - logger.info("Starting autoscaler metrics server on port {}".format( - AUTOSCALER_METRIC_PORT)) + logger.info( + "Starting autoscaler metrics server on port {}".format( + AUTOSCALER_METRIC_PORT)) prometheus_client.start_http_server( - AUTOSCALER_METRIC_PORT, registry=self.prom_metrics.registry) + AUTOSCALER_METRIC_PORT, + registry=self.prom_metrics.registry) except Exception: logger.exception( "An exception occurred while starting the metrics server.") diff --git a/python/ray/autoscaler/_private/node_launcher.py b/python/ray/autoscaler/_private/node_launcher.py index bf112d5515686..a913f219c4357 100644 --- a/python/ray/autoscaler/_private/node_launcher.py +++ b/python/ray/autoscaler/_private/node_launcher.py @@ -2,7 +2,6 @@ import copy import logging import threading -import time from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND, TAG_RAY_NODE_NAME, diff --git a/python/ray/autoscaler/_private/prom_metrics.py b/python/ray/autoscaler/_private/prom_metrics.py index 5a16b88ee4400..26bb2140e65f3 100644 --- a/python/ray/autoscaler/_private/prom_metrics.py +++ b/python/ray/autoscaler/_private/prom_metrics.py @@ -2,7 +2,6 @@ CollectorRegistry, Counter, Gauge, - Histogram, ) From 2edcb1a244865dd13ea0ac0a975a623574ea9425 Mon Sep 17 00:00:00 2001 From: Ian Date: Mon, 31 May 2021 23:04:08 +0000 Subject: [PATCH 32/32] Hotfix [nodes -> workers] + [count failed nodes as stopped] --- python/ray/autoscaler/_private/autoscaler.py | 4 +++- python/ray/autoscaler/_private/prom_metrics.py | 6 +++--- python/ray/tests/test_autoscaler.py | 12 ++++++++---- python/ray/tests/test_metrics_agent.py | 2 +- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 7c8b20fb29fef..97f37e6d39f5b 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -303,6 +303,8 @@ def _update(self): " Failed to update node." " Node has already been terminated.") if nodes_to_terminate: + self.prom_metrics.stopped_nodes.inc( + len(nodes_to_terminate)) self.provider.terminate_nodes(nodes_to_terminate) nodes = self.workers() @@ -750,7 +752,7 @@ def workers(self): nodes = self.provider.non_terminated_nodes( tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) # Update running nodes gauge whenever we check workers - self.prom_metrics.running_nodes.set(len(nodes)) + self.prom_metrics.running_workers.set(len(nodes)) return nodes def unmanaged_workers(self): diff --git a/python/ray/autoscaler/_private/prom_metrics.py b/python/ray/autoscaler/_private/prom_metrics.py index 26bb2140e65f3..07992bfb29dae 100644 --- a/python/ray/autoscaler/_private/prom_metrics.py +++ b/python/ray/autoscaler/_private/prom_metrics.py @@ -29,9 +29,9 @@ def __init__(self, registry: CollectorRegistry = None): unit="nodes", namespace="autoscaler", registry=self.registry) - self.running_nodes: Gauge = Gauge( - "running_nodes", - "Number of nodes running.", + self.running_workers: Gauge = Gauge( + "running_workers", + "Number of worker nodes running.", unit="nodes", namespace="autoscaler", registry=self.registry) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index d5676ec262c64..829b86c76fabb 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -901,8 +901,8 @@ def testScaleUp(self): autoscaler.update() self.waitForNodes(2) - # running_nodes metric should be set to 2 - mock_metrics.running_nodes.set.assert_called_with(2) + # running_workers metric should be set to 2 + mock_metrics.running_workers.set.assert_called_with(2) def testTerminateOutdatedNodesGracefully(self): config = SMALL_CLUSTER.copy() @@ -998,7 +998,7 @@ def testDynamicScaling(self): assert ("Removing 1 nodes of type " "ray-legacy-worker-node-type (max workers)." in events), events assert mock_metrics.stopped_nodes.inc.call_count == 1 - mock_metrics.running_nodes.set.assert_called_with(10) + mock_metrics.running_workers.set.assert_called_with(10) def testInitialWorkers(self): """initial_workers is deprecated, this tests that it is ignored.""" @@ -2305,12 +2305,14 @@ def testNodeTerminatedDuringUpdate(self): self.provider = MockProvider() runner = MockProcessRunner() lm = LoadMetrics() + mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) autoscaler = StandardAutoscaler( config_path, lm, max_failures=0, process_runner=runner, - update_interval_s=0) + update_interval_s=0, + prom_metrics=mock_metrics) # Scale up to two up-to-date workers autoscaler.update() @@ -2388,6 +2390,8 @@ def terminate_worker_zero(): assert set(autoscaler.workers()) == {2, 3},\ "Unexpected node_ids" + assert len(mock_metrics.stopped_nodes.mock_calls) == 1 + def testProviderException(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index 63732f0158cb9..c2dc8ce36c177 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -54,7 +54,7 @@ _AUTOSCALER_METRICS = [ "autoscaler_config_validation_exceptions", "autoscaler_node_launch_exceptions", "autoscaler_pending_nodes", - "autoscaler_reset_exceptions", "autoscaler_running_nodes", + "autoscaler_reset_exceptions", "autoscaler_running_workers", "autoscaler_started_nodes", "autoscaler_stopped_nodes", "autoscaler_update_loop_exceptions" ]