Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[autoscaler] Autoscaler metrics #16066

Merged
merged 32 commits into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
46ea81b
initial
ckw017 May 24, 2021
7c7cd1a
sanity check
ckw017 May 24, 2021
f170974
lint and more
ckw017 May 25, 2021
4150bf0
remove extra file?
ckw017 May 25, 2021
771b304
format
ckw017 May 26, 2021
3f11bc6
store ip of machine running monitor process
ckw017 May 27, 2021
eb9edb1
autoscaler_ip -> monitor_ip
ckw017 May 27, 2021
b4130a8
lint
ckw017 May 27, 2021
09a8998
add worker startup time buckets
ckw017 May 27, 2021
849c1ba
better descriptions
ckw017 May 27, 2021
6f58b81
more lint
ckw017 May 27, 2021
9050bab
propogate exception when starting prom http
ckw017 May 27, 2021
7be69c1
lint
ckw017 May 27, 2021
8f682b4
fix redis set/get
ckw017 May 27, 2021
51f7a5c
move start_http to monitor.py
ckw017 May 27, 2021
4259ecd
break up exception types and add pending_nodes metric
ckw017 May 27, 2021
bb9896e
Adjust buckets, fix test_autoscaler failures
ckw017 May 27, 2021
091610a
Add metric_agent tests
ckw017 May 27, 2021
ee005b1
explain _AUTOSCALER_METRICS
ckw017 May 27, 2021
1d82167
add basic exception count checks
ckw017 May 27, 2021
a43b38e
more autoscaler metric tests
ckw017 May 28, 2021
25f55dc
less dangerous way to handle no prom_metrics
ckw017 May 28, 2021
f7013c2
more mock checks
ckw017 May 28, 2021
b2bd1e5
better docs
ckw017 May 28, 2021
a0c10f0
nits
ckw017 May 28, 2021
c069b8c
cases for started_nodes and worker_startup_time histogram
ckw017 May 28, 2021
2c18da5
add node_launch_exceptions case
ckw017 May 28, 2021
216060c
use waitFor
ckw017 May 28, 2021
377d2c8
don't start http server if monitor_ip isn't provided
ckw017 May 31, 2021
0f1a6b2
drop worker_startup_time
ckw017 May 31, 2021
a6cc229
lint
ckw017 May 31, 2021
2edcb1a
Hotfix [nodes -> workers] + [count failed nodes as stopped]
ijrsvt May 31, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion python/ray/_private/metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from opencensus.tags import tag_value as tag_value_module

import ray
from ray._private import services

import ray._private.prometheus_exporter as prometheus_exporter
from ray.core.generated.metrics_pb2 import Metric
Expand Down Expand Up @@ -185,18 +186,25 @@ class PrometheusServiceDiscoveryWriter(threading.Thread):
def __init__(self, redis_address, redis_password, temp_dir):
ray.state.state._initialize_global_state(
redis_address=redis_address, redis_password=redis_password)
self.redis_address = redis_address
self.redis_password = redis_password
self.temp_dir = temp_dir
self.default_service_discovery_flush_period = 5
super().__init__()

def get_file_discovery_content(self):
"""Return the content for Prometheus serivce discovery."""
"""Return the content for Prometheus service discovery."""
nodes = ray.nodes()
metrics_export_addresses = [
"{}:{}".format(node["NodeManagerAddress"],
node["MetricsExportPort"]) for node in nodes
if node["alive"] is True
]
redis_client = services.create_redis_client(self.redis_address,
self.redis_password)
autoscaler_addr = redis_client.get("AutoscalerMetricsAddress")
if autoscaler_addr:
metrics_export_addresses.append(autoscaler_addr.decode("utf-8"))
return json.dumps([{
"labels": {
"job": "ray"
Expand Down
8 changes: 6 additions & 2 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1834,7 +1834,8 @@ def start_monitor(redis_address,
redis_password=None,
fate_share=None,
max_bytes=0,
backup_count=0):
backup_count=0,
monitor_ip=None):
"""Run a process to monitor the other processes.

Args:
Expand All @@ -1850,7 +1851,8 @@ def start_monitor(redis_address,
RotatingFileHandler's maxBytes.
backup_count (int): Log rotation parameter. Corresponding to
RotatingFileHandler's backupCount.

monitor_ip (str): IP address of the machine that the monitor will be
run on. Can be excluded, but required for autoscaler metrics.
Returns:
ProcessInfo for the process that was started.
"""
Expand All @@ -1865,6 +1867,8 @@ def start_monitor(redis_address,
command.append("--autoscaling-config=" + str(autoscaling_config))
if redis_password:
command.append("--redis-password=" + redis_password)
if monitor_ip:
command.append("--monitor-ip=" + monitor_ip)
process_info = start_ray_process(
command,
ray_constants.PROCESS_TYPE_MONITOR,
Expand Down
33 changes: 25 additions & 8 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections import defaultdict, namedtuple, Counter
from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
from typing import Any, Optional, Dict, List
from urllib3.exceptions import MaxRetryError
import copy
Expand Down Expand Up @@ -30,10 +31,9 @@
from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \
with_head_node_ip, hash_launch_conf, hash_runtime_conf, \
format_info_string
from ray.autoscaler._private.constants import \
AUTOSCALER_MAX_NUM_FAILURES, AUTOSCALER_MAX_LAUNCH_BATCH, \
AUTOSCALER_MAX_CONCURRENT_LAUNCHES, AUTOSCALER_UPDATE_INTERVAL_S, \
AUTOSCALER_HEARTBEAT_TIMEOUT_S
from ray.autoscaler._private.constants import AUTOSCALER_MAX_NUM_FAILURES, \
AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \
AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S
from six.moves import queue

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -75,13 +75,19 @@ def __init__(self,
process_runner=subprocess,
update_interval_s=AUTOSCALER_UPDATE_INTERVAL_S,
prefix_cluster_info=False,
event_summarizer=None):
event_summarizer=None,
prom_metrics=None):
self.config_path = config_path
# Prefix each line of info string with cluster name if True
self.prefix_cluster_info = prefix_cluster_info
# Keep this before self.reset (self.provider needs to be created
# exactly once).
self.provider = None
# Keep this before self.reset (if an exception occurs in reset
# then prom_metrics must be instantitiated to increment the
# exception counter)
self.prom_metrics = prom_metrics or \
AutoscalerPrometheusMetrics()
self.resource_demand_scheduler = None
self.reset(errors_fatal=True)
self.head_node_ip = load_metrics.local_ip
Expand Down Expand Up @@ -113,7 +119,7 @@ def __init__(self,
index=i,
pending=self.pending_launches,
node_types=self.available_node_types,
)
prom_metrics=self.prom_metrics)
node_launcher.daemon = True
node_launcher.start()

Expand All @@ -131,14 +137,14 @@ def __init__(self,

for local_path in self.config["file_mounts"].values():
assert os.path.exists(local_path)

logger.info("StandardAutoscaler: {}".format(self.config))

def update(self):
try:
self.reset(errors_fatal=False)
self._update()
except Exception as e:
self.prom_metrics.update_loop_exceptions.inc()
logger.exception("StandardAutoscaler: "
"Error during autoscaling.")
# Don't abort the autoscaler if the K8s API server is down.
Expand Down Expand Up @@ -217,6 +223,7 @@ def _update(self):
self.provider.terminate_nodes(nodes_to_terminate)
for node in nodes_to_terminate:
self.node_tracker.untrack(node)
self.prom_metrics.stopped_nodes.inc()
nodes = self.workers()

# Terminate nodes if there are too many
Expand All @@ -237,6 +244,7 @@ def _update(self):
self.provider.terminate_nodes(nodes_to_terminate)
for node in nodes_to_terminate:
self.node_tracker.untrack(node)
self.prom_metrics.stopped_nodes.inc()
nodes = self.workers()

to_launch = self.resource_demand_scheduler.get_nodes_to_launch(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also increment self.prom_metrics.stopped_nodes.inc() here:
Screen Shot 2021-05-31 at 3 06 04 PM

Expand Down Expand Up @@ -295,6 +303,8 @@ def _update(self):
" Failed to update node."
" Node has already been terminated.")
if nodes_to_terminate:
self.prom_metrics.stopped_nodes.inc(
len(nodes_to_terminate))
self.provider.terminate_nodes(nodes_to_terminate)
nodes = self.workers()

Expand Down Expand Up @@ -470,6 +480,7 @@ def reset(self, errors_fatal=False):
try:
validate_config(new_config)
except Exception as e:
self.prom_metrics.config_validation_exceptions.inc()
logger.debug(
"Cluster config validation failed. The version of "
"the ray CLI you launched this cluster with may "
Expand Down Expand Up @@ -533,6 +544,7 @@ def reset(self, errors_fatal=False):
upscaling_speed)

except Exception as e:
self.prom_metrics.reset_exceptions.inc()
if errors_fatal:
raise e
else:
Expand Down Expand Up @@ -725,6 +737,7 @@ def launch_new_node(self, count: int, node_type: Optional[str]) -> None:
quantity=count,
aggregate=operator.add)
self.pending_launches.inc(node_type, count)
self.prom_metrics.pending_nodes.set(self.pending_launches.value)
config = copy.deepcopy(self.config)
# Split into individual launch requests of the max batch size.
while count > 0:
Expand All @@ -736,8 +749,11 @@ def all_workers(self):
return self.workers() + self.unmanaged_workers()

def workers(self):
return self.provider.non_terminated_nodes(
nodes = self.provider.non_terminated_nodes(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# Update running nodes gauge whenever we check workers
self.prom_metrics.running_workers.set(len(nodes))
return nodes

def unmanaged_workers(self):
return self.provider.non_terminated_nodes(
Expand All @@ -750,6 +766,7 @@ def kill_workers(self):
self.provider.terminate_nodes(nodes)
for node in nodes:
self.node_tracker.untrack(node)
self.prom_metrics.stopped_nodes.inc()
logger.error("StandardAutoscaler: terminated {} node(s)".format(
len(nodes)))

Expand Down
3 changes: 3 additions & 0 deletions python/ray/autoscaler/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def env_integer(key, default):
# to run.
AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE = 1000

# Port that autoscaler prometheus metrics will be exported to
AUTOSCALER_METRIC_PORT = env_integer("AUTOSCALER_METRIC_PORT", 44217)

# Max number of retries to AWS (default is 5, time increases exponentially)
BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12)
# Max number of retries to create an EC2 node (retry different subnet)
Expand Down
38 changes: 34 additions & 4 deletions python/ray/autoscaler/_private/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
from typing import Optional

import grpc
import prometheus_client

import ray
from ray.autoscaler._private.autoscaler import StandardAutoscaler
from ray.autoscaler._private.commands import teardown_cluster
from ray.autoscaler._private.constants import AUTOSCALER_UPDATE_INTERVAL_S
from ray.autoscaler._private.constants import AUTOSCALER_UPDATE_INTERVAL_S, \
AUTOSCALER_METRIC_PORT
from ray.autoscaler._private.event_summarizer import EventSummarizer
from ray.autoscaler._private.load_metrics import LoadMetrics
from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
from ray.autoscaler._private.constants import \
AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE
from ray.autoscaler._private.util import DEBUG_AUTOSCALING_STATUS, \
Expand Down Expand Up @@ -92,13 +95,16 @@ def __init__(self,
autoscaling_config,
redis_password=None,
prefix_cluster_info=False,
monitor_ip=None,
stop_event: Optional[Event] = None):
# Initialize the Redis clients.
ray.state.state._initialize_global_state(
redis_address, redis_password=redis_password)
self.redis = ray._private.services.create_redis_client(
redis_address, password=redis_password)

if monitor_ip:
self.redis.set("AutoscalerMetricsAddress",
f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}")
(ip, port) = redis_address.split(":")
self.gcs_client = connect_to_gcs(ip, int(port), redis_password)
# Initialize the gcs stub for getting all node resource usage.
Expand Down Expand Up @@ -126,6 +132,22 @@ def __init__(self,
self.autoscaling_config = autoscaling_config
self.autoscaler = None

self.prom_metrics = AutoscalerPrometheusMetrics()
if monitor_ip:
# If monitor_ip wasn't passed in, then don't attempt to start the
# metric server to keep behavior identical to before metrics were
# introduced
try:
logger.info(
"Starting autoscaler metrics server on port {}".format(
AUTOSCALER_METRIC_PORT))
prometheus_client.start_http_server(
AUTOSCALER_METRIC_PORT,
registry=self.prom_metrics.registry)
except Exception:
logger.exception(
"An exception occurred while starting the metrics server.")

logger.info("Monitor: Started")

def __del__(self):
Expand All @@ -137,7 +159,8 @@ def _initialize_autoscaler(self):
self.autoscaling_config,
self.load_metrics,
prefix_cluster_info=self.prefix_cluster_info,
event_summarizer=self.event_summarizer)
event_summarizer=self.event_summarizer,
prom_metrics=self.prom_metrics)

def update_load_metrics(self):
"""Fetches resource usage data from GCS and updates load metrics."""
Expand Down Expand Up @@ -359,6 +382,12 @@ def run(self):
default=ray_constants.LOGGING_ROTATE_BACKUP_COUNT,
help="Specify the backup count of rotated log file, default is "
f"{ray_constants.LOGGING_ROTATE_BACKUP_COUNT}.")
parser.add_argument(
"--monitor-ip",
required=False,
type=str,
default=None,
help="The IP address of the machine hosting the monitor process.")
args = parser.parse_args()
setup_component_logger(
logging_level=args.logging_level,
Expand All @@ -381,6 +410,7 @@ def run(self):
monitor = Monitor(
args.redis_address,
autoscaling_config,
redis_password=args.redis_password)
redis_password=args.redis_password,
monitor_ip=args.monitor_ip)

monitor.run()
6 changes: 6 additions & 0 deletions python/ray/autoscaler/_private/node_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
TAG_RAY_NODE_KIND, TAG_RAY_NODE_NAME,
TAG_RAY_USER_NODE_TYPE, STATUS_UNINITIALIZED,
NODE_KIND_WORKER)
from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
from ray.autoscaler._private.util import hash_launch_conf

logger = logging.getLogger(__name__)
Expand All @@ -19,12 +20,14 @@ def __init__(self,
provider,
queue,
pending,
prom_metrics=None,
node_types=None,
index=None,
*args,
**kwargs):
self.queue = queue
self.pending = pending
self.prom_metrics = prom_metrics or AutoscalerPrometheusMetrics()
self.provider = provider
self.node_types = node_types
self.index = str(index) if index is not None else ""
Expand Down Expand Up @@ -58,6 +61,7 @@ def _launch_node(self, config: Dict[str, Any], count: int,
node_tags[TAG_RAY_USER_NODE_TYPE] = node_type
node_config.update(launch_config)
self.provider.create_node(node_config, node_tags, count)
self.prom_metrics.started_nodes.inc(count)
after = self.provider.non_terminated_nodes(tag_filters=worker_filter)
if set(after).issubset(before):
self.log("No new nodes reported after node creation.")
Expand All @@ -69,9 +73,11 @@ def run(self):
try:
self._launch_node(config, count, node_type)
except Exception:
self.prom_metrics.node_launch_exceptions.inc()
logger.exception("Launch failed")
finally:
self.pending.dec(node_type, count)
self.prom_metrics.pending_nodes.set(self.pending.value)
ckw017 marked this conversation as resolved.
Show resolved Hide resolved

def log(self, statement):
prefix = "NodeLauncher{}:".format(self.index)
Expand Down
Loading