Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[autoscaler] Autoscaler metrics #16066

Merged
merged 32 commits into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
46ea81b
initial
ckw017 May 24, 2021
7c7cd1a
sanity check
ckw017 May 24, 2021
f170974
lint and more
ckw017 May 25, 2021
4150bf0
remove extra file?
ckw017 May 25, 2021
771b304
format
ckw017 May 26, 2021
3f11bc6
store ip of machine running monitor process
ckw017 May 27, 2021
eb9edb1
autoscaler_ip -> monitor_ip
ckw017 May 27, 2021
b4130a8
lint
ckw017 May 27, 2021
09a8998
add worker startup time buckets
ckw017 May 27, 2021
849c1ba
better descriptions
ckw017 May 27, 2021
6f58b81
more lint
ckw017 May 27, 2021
9050bab
propogate exception when starting prom http
ckw017 May 27, 2021
7be69c1
lint
ckw017 May 27, 2021
8f682b4
fix redis set/get
ckw017 May 27, 2021
51f7a5c
move start_http to monitor.py
ckw017 May 27, 2021
4259ecd
break up exception types and add pending_nodes metric
ckw017 May 27, 2021
bb9896e
Adjust buckets, fix test_autoscaler failures
ckw017 May 27, 2021
091610a
Add metric_agent tests
ckw017 May 27, 2021
ee005b1
explain _AUTOSCALER_METRICS
ckw017 May 27, 2021
1d82167
add basic exception count checks
ckw017 May 27, 2021
a43b38e
more autoscaler metric tests
ckw017 May 28, 2021
25f55dc
less dangerous way to handle no prom_metrics
ckw017 May 28, 2021
f7013c2
more mock checks
ckw017 May 28, 2021
b2bd1e5
better docs
ckw017 May 28, 2021
a0c10f0
nits
ckw017 May 28, 2021
c069b8c
cases for started_nodes and worker_startup_time histogram
ckw017 May 28, 2021
2c18da5
add node_launch_exceptions case
ckw017 May 28, 2021
216060c
use waitFor
ckw017 May 28, 2021
377d2c8
don't start http server if monitor_ip isn't provided
ckw017 May 31, 2021
0f1a6b2
drop worker_startup_time
ckw017 May 31, 2021
a6cc229
lint
ckw017 May 31, 2021
2edcb1a
Hotfix [nodes -> workers] + [count failed nodes as stopped]
ijrsvt May 31, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
lint and more
  • Loading branch information
ckw017 committed May 27, 2021
commit f170974bdada3b7c18c9a35998ebb734c43dfd06
2 changes: 2 additions & 0 deletions python/ray/_private/metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ def get_file_discovery_content(self):
node["MetricsExportPort"]) for node in nodes
if node["alive"] is True
]
# TODO(ckw): how to get autoscaler ip?
# autoscaler_export_addr = "{}:{}".format("????", AUTOSCALER_METRIC_PO
return json.dumps([{
"labels": {
"job": "ray"
Expand Down
69 changes: 23 additions & 46 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@
from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \
with_head_node_ip, hash_launch_conf, hash_runtime_conf, \
format_info_string
from ray.autoscaler._private.constants import \
AUTOSCALER_MAX_NUM_FAILURES, AUTOSCALER_MAX_LAUNCH_BATCH, \
AUTOSCALER_MAX_CONCURRENT_LAUNCHES, AUTOSCALER_METRIC_PORT, \
AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S
from ray.autoscaler._private.constants import AUTOSCALER_MAX_NUM_FAILURES, \
AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \
AUTOSCALER_METRIC_PORT, AUTOSCALER_UPDATE_INTERVAL_S, \
AUTOSCALER_HEARTBEAT_TIMEOUT_S
from ray.autoscaler._private.prom_metrics import \
AUTOSCALER_EXCEPTIONS_COUNTER, AUTOSCALER_STOPPED_NODES_COUNTER, \
AUTOSCALER_METRIC_REGISTRY, AUTOSCALER_RUNNING_NODES_GAUGE
from six.moves import queue

logger = logging.getLogger(__name__)
Expand All @@ -49,40 +52,9 @@
"AutoscalerSummary",
["active_nodes", "pending_nodes", "pending_launches", "failed_nodes"])

# Metrics
AUTOSCALER_METRIC_REGISTRY = prometheus_client.CollectorRegistry()
prometheus_client.start_http_server(
AUTOSCALER_METRIC_PORT, registry=AUTOSCALER_METRIC_REGISTRY)
WORKER_STARTUP_TIME_HISTOGRAM = prometheus_client.Histogram(
"worker_startup_time_seconds",
"Worker startup time",
unit="seconds",
namespace="autoscaler",
registry=AUTOSCALER_METRIC_REGISTRY)
NODES_STARTED_COUNTER = prometheus_client.Counter(
"started_nodes",
"Number of nodes started",
unit="nodes",
namespace="autoscaler",
registry=AUTOSCALER_METRIC_REGISTRY)
NODES_STOPPED_COUNTER = prometheus_client.Counter(
"stopped_nodes",
"Number of nodes stopped",
unit="nodes",
namespace="autoscaler",
registry=AUTOSCALER_METRIC_REGISTRY)
NUM_EXCEPTIONS_COUNTER = prometheus_client.Counter(
"exceptions",
"Number of exceptions",
unit="exceptions",
namespace="autoscaler",
registry=AUTOSCALER_METRIC_REGISTRY)
NODES_RUNNING_GAUGE = prometheus_client.Gauge(
"running_nodes",
"Number of nodes running",
unit="nodes",
namespace="autoscaler",
registry=AUTOSCALER_METRIC_REGISTRY)
# Prevent multiple servers from starting if multiple autoscaler's
# are instantiated for some reaso
_metric_server_started = False


class StandardAutoscaler:
Expand Down Expand Up @@ -148,7 +120,6 @@ def __init__(self,
queue=self.launch_queue,
index=i,
pending=self.pending_launches,
startup_histogram=WORKER_STARTUP_TIME_HISTOGRAM,
node_types=self.available_node_types,
)
node_launcher.daemon = True
Expand All @@ -169,13 +140,20 @@ def __init__(self,
for local_path in self.config["file_mounts"].values():
assert os.path.exists(local_path)

global _metric_server_started
if not _metric_server_started:
_metric_server_started = True
prometheus_client.start_http_server(
AUTOSCALER_METRIC_PORT, registry=AUTOSCALER_METRIC_REGISTRY)
ckw017 marked this conversation as resolved.
Show resolved Hide resolved

logger.info("StandardAutoscaler: {}".format(self.config))

def update(self):
try:
self.reset(errors_fatal=False)
self._update()
except Exception as e:
AUTOSCALER_EXCEPTIONS_COUNTER.inc()
logger.exception("StandardAutoscaler: "
"Error during autoscaling.")
# Don't abort the autoscaler if the K8s API server is down.
Expand All @@ -199,7 +177,6 @@ def _update(self):

self.last_update_time = now
nodes = self.workers()
NODES_RUNNING_GAUGE.set(len(nodes))

self.load_metrics.prune_active_ips([
self.provider.internal_ip(node_id)
Expand Down Expand Up @@ -255,9 +232,8 @@ def _update(self):
self.provider.terminate_nodes(nodes_to_terminate)
for node in nodes_to_terminate:
self.node_tracker.untrack(node)
NODES_STOPPED_COUNTER.inc()
AUTOSCALER_STOPPED_NODES_COUNTER.inc()
nodes = self.workers()
NODES_RUNNING_GAUGE.set(len(nodes))

# Terminate nodes if there are too many
nodes_to_terminate = []
Expand All @@ -277,9 +253,8 @@ def _update(self):
self.provider.terminate_nodes(nodes_to_terminate)
for node in nodes_to_terminate:
self.node_tracker.untrack(node)
NODES_STOPPED_COUNTER.inc()
AUTOSCALER_STOPPED_NODES_COUNTER.inc()
nodes = self.workers()
NODES_RUNNING_GAUGE.set(len(nodes))

to_launch = self.resource_demand_scheduler.get_nodes_to_launch(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also increment self.prom_metrics.stopped_nodes.inc() here:
Screen Shot 2021-05-31 at 3 06 04 PM

self.provider.non_terminated_nodes(tag_filters={}),
Expand All @@ -291,11 +266,9 @@ def _update(self):
ensure_min_cluster_size=self.load_metrics.get_resource_requests())
for node_type, count in to_launch.items():
self.launch_new_node(count, node_type=node_type)
NODES_STARTED_COUNTER.inc()

if to_launch:
nodes = self.workers()
NODES_RUNNING_GAUGE.set(len(nodes))

# Process any completed updates
completed_nodes = []
Expand Down Expand Up @@ -514,6 +487,7 @@ def reset(self, errors_fatal=False):
try:
validate_config(new_config)
except Exception as e:
AUTOSCALER_EXCEPTIONS_COUNTER.inc()
logger.debug(
"Cluster config validation failed. The version of "
"the ray CLI you launched this cluster with may "
Expand Down Expand Up @@ -577,6 +551,7 @@ def reset(self, errors_fatal=False):
upscaling_speed)

except Exception as e:
AUTOSCALER_EXCEPTIONS_COUNTER.inc()
if errors_fatal:
raise e
else:
Expand Down Expand Up @@ -782,6 +757,8 @@ def all_workers(self):
def workers(self):
nodes = self.provider.non_terminated_nodes(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# Update running nodes gauge whenever we check workers
AUTOSCALER_RUNNING_NODES_GAUGE.set(len(nodes))
return nodes

def unmanaged_workers(self):
Expand Down
1 change: 1 addition & 0 deletions python/ray/autoscaler/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def env_integer(key, default):
# to run.
AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE = 1000

# Port that autoscaler prometheus metrics will be exported to
AUTOSCALER_METRIC_PORT = env_integer("AUTOSCALER_METRIC_PORT", 44217)

# Max number of retries to AWS (default is 5, time increases exponentially)
Expand Down
12 changes: 7 additions & 5 deletions python/ray/autoscaler/_private/node_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
TAG_RAY_USER_NODE_TYPE, STATUS_UNINITIALIZED,
NODE_KIND_WORKER)
from ray.autoscaler._private.util import hash_launch_conf
from ray.autoscaler._private.prom_metrics import (
AUTOSCALER_EXCEPTIONS_COUNTER, AUTOSCALER_WORKER_STARTUP_TIME_HISTOGRAM,
AUTOSCALER_STARTED_NODES_COUNTER)

logger = logging.getLogger(__name__)

Expand All @@ -20,15 +23,13 @@ def __init__(self,
provider,
queue,
pending,
startup_histogram=None,
node_types=None,
index=None,
*args,
**kwargs):
self.queue = queue
self.pending = pending
self.provider = provider
self.startup_histogram = startup_histogram
self.node_types = node_types
self.index = str(index) if index is not None else ""
super(NodeLauncher, self).__init__(*args, **kwargs)
Expand Down Expand Up @@ -63,9 +64,9 @@ def _launch_node(self, config: Dict[str, Any], count: int,
launch_start_time = time.time()
self.provider.create_node(node_config, node_tags, count)
startup_time = time.time() - launch_start_time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this reflects startup time. For most (all?) providers, create_node sends a non-blocking API call to provision compute.
Startup time is the time from "create_node" to completion of ray start commands on the node, which in theory one could slightly overestimate as the time between
the autoscaler sticking the node into the launch queue
and
the node's first NodeUpdater thread completing.

Not sure how easy that is to measure.

if self.startup_histogram:
for _ in range(count):
self.startup_histogram.observe(startup_time)
for _ in range(count):
AUTOSCALER_WORKER_STARTUP_TIME_HISTOGRAM.observe(startup_time)
ckw017 marked this conversation as resolved.
Show resolved Hide resolved
AUTOSCALER_STARTED_NODES_COUNTER.inc(count)
after = self.provider.non_terminated_nodes(tag_filters=worker_filter)
if set(after).issubset(before):
self.log("No new nodes reported after node creation.")
Expand All @@ -77,6 +78,7 @@ def run(self):
try:
self._launch_node(config, count, node_type)
except Exception:
AUTOSCALER_EXCEPTIONS_COUNTER.inc()
logger.exception("Launch failed")
finally:
self.pending.dec(node_type, count)
Expand Down
33 changes: 33 additions & 0 deletions python/ray/autoscaler/_private/prom_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import prometheus_client

AUTOSCALER_METRIC_REGISTRY = prometheus_client.CollectorRegistry()
AUTOSCALER_WORKER_STARTUP_TIME_HISTOGRAM = prometheus_client.Histogram(
ckw017 marked this conversation as resolved.
Show resolved Hide resolved
"worker_startup_time_seconds",
"Worker startup time",
unit="seconds",
namespace="autoscaler",
registry=AUTOSCALER_METRIC_REGISTRY)
AUTOSCALER_STARTED_NODES_COUNTER = prometheus_client.Counter(
"started_nodes",
"Number of nodes started",
unit="nodes",
namespace="autoscaler",
registry=AUTOSCALER_METRIC_REGISTRY)
AUTOSCALER_STOPPED_NODES_COUNTER = prometheus_client.Counter(
"stopped_nodes",
"Number of nodes stopped",
unit="nodes",
namespace="autoscaler",
registry=AUTOSCALER_METRIC_REGISTRY)
AUTOSCALER_RUNNING_NODES_GAUGE = prometheus_client.Gauge(
"running_nodes",
"Number of nodes running",
unit="nodes",
namespace="autoscaler",
registry=AUTOSCALER_METRIC_REGISTRY)
AUTOSCALER_EXCEPTIONS_COUNTER = prometheus_client.Counter(
"exceptions",
"Number of exceptions",
unit="exceptions",
namespace="autoscaler",
registry=AUTOSCALER_METRIC_REGISTRY)