Skip to content

Commit

Permalink
[autoscaler] Autoscaler metrics (#16066)
Browse files Browse the repository at this point in the history
Co-authored-by: Ian <[email protected]>
  • Loading branch information
ckw017 and ijrsvt committed Jun 1, 2021
1 parent da6f28d commit 31364ed
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 27 deletions.
10 changes: 9 additions & 1 deletion python/ray/_private/metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from opencensus.tags import tag_value as tag_value_module

import ray
from ray._private import services

import ray._private.prometheus_exporter as prometheus_exporter
from ray.core.generated.metrics_pb2 import Metric
Expand Down Expand Up @@ -185,18 +186,25 @@ class PrometheusServiceDiscoveryWriter(threading.Thread):
def __init__(self, redis_address, redis_password, temp_dir):
ray.state.state._initialize_global_state(
redis_address=redis_address, redis_password=redis_password)
self.redis_address = redis_address
self.redis_password = redis_password
self.temp_dir = temp_dir
self.default_service_discovery_flush_period = 5
super().__init__()

def get_file_discovery_content(self):
"""Return the content for Prometheus serivce discovery."""
"""Return the content for Prometheus service discovery."""
nodes = ray.nodes()
metrics_export_addresses = [
"{}:{}".format(node["NodeManagerAddress"],
node["MetricsExportPort"]) for node in nodes
if node["alive"] is True
]
redis_client = services.create_redis_client(self.redis_address,
self.redis_password)
autoscaler_addr = redis_client.get("AutoscalerMetricsAddress")
if autoscaler_addr:
metrics_export_addresses.append(autoscaler_addr.decode("utf-8"))
return json.dumps([{
"labels": {
"job": "ray"
Expand Down
8 changes: 6 additions & 2 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1824,7 +1824,8 @@ def start_monitor(redis_address,
redis_password=None,
fate_share=None,
max_bytes=0,
backup_count=0):
backup_count=0,
monitor_ip=None):
"""Run a process to monitor the other processes.
Args:
Expand All @@ -1840,7 +1841,8 @@ def start_monitor(redis_address,
RotatingFileHandler's maxBytes.
backup_count (int): Log rotation parameter. Corresponding to
RotatingFileHandler's backupCount.
monitor_ip (str): IP address of the machine that the monitor will be
run on. Can be excluded, but required for autoscaler metrics.
Returns:
ProcessInfo for the process that was started.
"""
Expand All @@ -1855,6 +1857,8 @@ def start_monitor(redis_address,
command.append("--autoscaling-config=" + str(autoscaling_config))
if redis_password:
command.append("--redis-password=" + redis_password)
if monitor_ip:
command.append("--monitor-ip=" + monitor_ip)
process_info = start_ray_process(
command,
ray_constants.PROCESS_TYPE_MONITOR,
Expand Down
33 changes: 25 additions & 8 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections import defaultdict, namedtuple, Counter
from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
from typing import Any, Optional, Dict, List
from urllib3.exceptions import MaxRetryError
import copy
Expand Down Expand Up @@ -30,10 +31,9 @@
from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \
with_head_node_ip, hash_launch_conf, hash_runtime_conf, \
format_info_string
from ray.autoscaler._private.constants import \
AUTOSCALER_MAX_NUM_FAILURES, AUTOSCALER_MAX_LAUNCH_BATCH, \
AUTOSCALER_MAX_CONCURRENT_LAUNCHES, AUTOSCALER_UPDATE_INTERVAL_S, \
AUTOSCALER_HEARTBEAT_TIMEOUT_S
from ray.autoscaler._private.constants import AUTOSCALER_MAX_NUM_FAILURES, \
AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \
AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S
from six.moves import queue

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -75,13 +75,19 @@ def __init__(self,
process_runner=subprocess,
update_interval_s=AUTOSCALER_UPDATE_INTERVAL_S,
prefix_cluster_info=False,
event_summarizer=None):
event_summarizer=None,
prom_metrics=None):
self.config_path = config_path
# Prefix each line of info string with cluster name if True
self.prefix_cluster_info = prefix_cluster_info
# Keep this before self.reset (self.provider needs to be created
# exactly once).
self.provider = None
# Keep this before self.reset (if an exception occurs in reset
# then prom_metrics must be instantitiated to increment the
# exception counter)
self.prom_metrics = prom_metrics or \
AutoscalerPrometheusMetrics()
self.resource_demand_scheduler = None
self.reset(errors_fatal=True)
self.head_node_ip = load_metrics.local_ip
Expand Down Expand Up @@ -113,7 +119,7 @@ def __init__(self,
index=i,
pending=self.pending_launches,
node_types=self.available_node_types,
)
prom_metrics=self.prom_metrics)
node_launcher.daemon = True
node_launcher.start()

Expand All @@ -131,14 +137,14 @@ def __init__(self,

for local_path in self.config["file_mounts"].values():
assert os.path.exists(local_path)

logger.info("StandardAutoscaler: {}".format(self.config))

def update(self):
try:
self.reset(errors_fatal=False)
self._update()
except Exception as e:
self.prom_metrics.update_loop_exceptions.inc()
logger.exception("StandardAutoscaler: "
"Error during autoscaling.")
# Don't abort the autoscaler if the K8s API server is down.
Expand Down Expand Up @@ -217,6 +223,7 @@ def _update(self):
self.provider.terminate_nodes(nodes_to_terminate)
for node in nodes_to_terminate:
self.node_tracker.untrack(node)
self.prom_metrics.stopped_nodes.inc()
nodes = self.workers()

# Terminate nodes if there are too many
Expand All @@ -237,6 +244,7 @@ def _update(self):
self.provider.terminate_nodes(nodes_to_terminate)
for node in nodes_to_terminate:
self.node_tracker.untrack(node)
self.prom_metrics.stopped_nodes.inc()
nodes = self.workers()

to_launch = self.resource_demand_scheduler.get_nodes_to_launch(
Expand Down Expand Up @@ -295,6 +303,8 @@ def _update(self):
" Failed to update node."
" Node has already been terminated.")
if nodes_to_terminate:
self.prom_metrics.stopped_nodes.inc(
len(nodes_to_terminate))
self.provider.terminate_nodes(nodes_to_terminate)
nodes = self.workers()

Expand Down Expand Up @@ -470,6 +480,7 @@ def reset(self, errors_fatal=False):
try:
validate_config(new_config)
except Exception as e:
self.prom_metrics.config_validation_exceptions.inc()
logger.debug(
"Cluster config validation failed. The version of "
"the ray CLI you launched this cluster with may "
Expand Down Expand Up @@ -533,6 +544,7 @@ def reset(self, errors_fatal=False):
upscaling_speed)

except Exception as e:
self.prom_metrics.reset_exceptions.inc()
if errors_fatal:
raise e
else:
Expand Down Expand Up @@ -725,6 +737,7 @@ def launch_new_node(self, count: int, node_type: Optional[str]) -> None:
quantity=count,
aggregate=operator.add)
self.pending_launches.inc(node_type, count)
self.prom_metrics.pending_nodes.set(self.pending_launches.value)
config = copy.deepcopy(self.config)
# Split into individual launch requests of the max batch size.
while count > 0:
Expand All @@ -736,8 +749,11 @@ def all_workers(self):
return self.workers() + self.unmanaged_workers()

def workers(self):
return self.provider.non_terminated_nodes(
nodes = self.provider.non_terminated_nodes(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# Update running nodes gauge whenever we check workers
self.prom_metrics.running_workers.set(len(nodes))
return nodes

def unmanaged_workers(self):
return self.provider.non_terminated_nodes(
Expand All @@ -750,6 +766,7 @@ def kill_workers(self):
self.provider.terminate_nodes(nodes)
for node in nodes:
self.node_tracker.untrack(node)
self.prom_metrics.stopped_nodes.inc()
logger.error("StandardAutoscaler: terminated {} node(s)".format(
len(nodes)))

Expand Down
3 changes: 3 additions & 0 deletions python/ray/autoscaler/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def env_integer(key, default):
# to run.
AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE = 1000

# Port that autoscaler prometheus metrics will be exported to
AUTOSCALER_METRIC_PORT = env_integer("AUTOSCALER_METRIC_PORT", 44217)

# Max number of retries to AWS (default is 5, time increases exponentially)
BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12)
# Max number of retries to create an EC2 node (retry different subnet)
Expand Down
38 changes: 34 additions & 4 deletions python/ray/autoscaler/_private/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
from typing import Optional

import grpc
import prometheus_client

import ray
from ray.autoscaler._private.autoscaler import StandardAutoscaler
from ray.autoscaler._private.commands import teardown_cluster
from ray.autoscaler._private.constants import AUTOSCALER_UPDATE_INTERVAL_S
from ray.autoscaler._private.constants import AUTOSCALER_UPDATE_INTERVAL_S, \
AUTOSCALER_METRIC_PORT
from ray.autoscaler._private.event_summarizer import EventSummarizer
from ray.autoscaler._private.load_metrics import LoadMetrics
from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
from ray.autoscaler._private.constants import \
AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE
from ray.autoscaler._private.util import DEBUG_AUTOSCALING_STATUS, \
Expand Down Expand Up @@ -92,13 +95,16 @@ def __init__(self,
autoscaling_config,
redis_password=None,
prefix_cluster_info=False,
monitor_ip=None,
stop_event: Optional[Event] = None):
# Initialize the Redis clients.
ray.state.state._initialize_global_state(
redis_address, redis_password=redis_password)
self.redis = ray._private.services.create_redis_client(
redis_address, password=redis_password)

if monitor_ip:
self.redis.set("AutoscalerMetricsAddress",
f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}")
(ip, port) = redis_address.split(":")
self.gcs_client = connect_to_gcs(ip, int(port), redis_password)
# Initialize the gcs stub for getting all node resource usage.
Expand Down Expand Up @@ -126,6 +132,22 @@ def __init__(self,
self.autoscaling_config = autoscaling_config
self.autoscaler = None

self.prom_metrics = AutoscalerPrometheusMetrics()
if monitor_ip:
# If monitor_ip wasn't passed in, then don't attempt to start the
# metric server to keep behavior identical to before metrics were
# introduced
try:
logger.info(
"Starting autoscaler metrics server on port {}".format(
AUTOSCALER_METRIC_PORT))
prometheus_client.start_http_server(
AUTOSCALER_METRIC_PORT,
registry=self.prom_metrics.registry)
except Exception:
logger.exception(
"An exception occurred while starting the metrics server.")

logger.info("Monitor: Started")

def __del__(self):
Expand All @@ -137,7 +159,8 @@ def _initialize_autoscaler(self):
self.autoscaling_config,
self.load_metrics,
prefix_cluster_info=self.prefix_cluster_info,
event_summarizer=self.event_summarizer)
event_summarizer=self.event_summarizer,
prom_metrics=self.prom_metrics)

def update_load_metrics(self):
"""Fetches resource usage data from GCS and updates load metrics."""
Expand Down Expand Up @@ -359,6 +382,12 @@ def run(self):
default=ray_constants.LOGGING_ROTATE_BACKUP_COUNT,
help="Specify the backup count of rotated log file, default is "
f"{ray_constants.LOGGING_ROTATE_BACKUP_COUNT}.")
parser.add_argument(
"--monitor-ip",
required=False,
type=str,
default=None,
help="The IP address of the machine hosting the monitor process.")
args = parser.parse_args()
setup_component_logger(
logging_level=args.logging_level,
Expand All @@ -381,6 +410,7 @@ def run(self):
monitor = Monitor(
args.redis_address,
autoscaling_config,
redis_password=args.redis_password)
redis_password=args.redis_password,
monitor_ip=args.monitor_ip)

monitor.run()
6 changes: 6 additions & 0 deletions python/ray/autoscaler/_private/node_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
TAG_RAY_NODE_KIND, TAG_RAY_NODE_NAME,
TAG_RAY_USER_NODE_TYPE, STATUS_UNINITIALIZED,
NODE_KIND_WORKER)
from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
from ray.autoscaler._private.util import hash_launch_conf

logger = logging.getLogger(__name__)
Expand All @@ -19,12 +20,14 @@ def __init__(self,
provider,
queue,
pending,
prom_metrics=None,
node_types=None,
index=None,
*args,
**kwargs):
self.queue = queue
self.pending = pending
self.prom_metrics = prom_metrics or AutoscalerPrometheusMetrics()
self.provider = provider
self.node_types = node_types
self.index = str(index) if index is not None else ""
Expand Down Expand Up @@ -58,6 +61,7 @@ def _launch_node(self, config: Dict[str, Any], count: int,
node_tags[TAG_RAY_USER_NODE_TYPE] = node_type
node_config.update(launch_config)
self.provider.create_node(node_config, node_tags, count)
self.prom_metrics.started_nodes.inc(count)
after = self.provider.non_terminated_nodes(tag_filters=worker_filter)
if set(after).issubset(before):
self.log("No new nodes reported after node creation.")
Expand All @@ -69,9 +73,11 @@ def run(self):
try:
self._launch_node(config, count, node_type)
except Exception:
self.prom_metrics.node_launch_exceptions.inc()
logger.exception("Launch failed")
finally:
self.pending.dec(node_type, count)
self.prom_metrics.pending_nodes.set(self.pending.value)

def log(self, statement):
prefix = "NodeLauncher{}:".format(self.index)
Expand Down
Loading

0 comments on commit 31364ed

Please sign in to comment.