Skip to content

Commit

Permalink
[serve] Drop stale handle metrics (ray-project#44045)
Browse files Browse the repository at this point in the history
We don't currently drop handle metrics after a handle has been removed (i.e., a replica or proxy with a handle has been shut down). This causes "phantom" metrics to prevent downscaling.

This PR adds a timeout after which we drop the metrics: 2 * the reporting period (with a constant floor of 10s in case the reporting period is configured to be very low).

Also cleans up a few log statements.

---------

Signed-off-by: Cindy Zhang <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Co-authored-by: Cindy Zhang <[email protected]>
Co-authored-by: shrekris-anyscale <[email protected]>
  • Loading branch information
3 people committed Mar 15, 2024
1 parent d1b5edc commit 228c7ee
Show file tree
Hide file tree
Showing 10 changed files with 367 additions and 107 deletions.
4 changes: 4 additions & 0 deletions python/ray/serve/_private/autoscaling_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,7 @@ def apply_bounds(
target_capacity_direction,
)
return max(lower_bound, min(upper_bound, curr_target_num_replicas))

def get_metrics_interval_s(self) -> Optional[float]:
if self.config:
return self.config.metrics_interval_s
6 changes: 5 additions & 1 deletion python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,15 @@
DEFAULT_AUTOSCALING_POLICY = "ray.serve.autoscaling_policy:default_autoscaling_policy"

# Feature flag to enable collecting all queued and ongoing request
# metrics at handles instead of replicas. OFF by default.
# metrics at handles instead of replicas. ON by default.
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE = (
os.environ.get("RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE", "1") == "1"
)

RAY_SERVE_MIN_HANDLE_METRICS_TIMEOUT_S = float(
os.environ.get("RAY_SERVE_MIN_HANDLE_METRICS_TIMEOUT_S", 10.0)
)

# Feature flag to always run a proxy on the head node even if it has no replicas.
RAY_SERVE_ALWAYS_RUN_PROXY_ON_HEAD_NODE = (
os.environ.get("RAY_SERVE_ALWAYS_RUN_PROXY_ON_HEAD_NODE", "1") == "1"
Expand Down
41 changes: 35 additions & 6 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
RAY_SERVE_EAGERLY_START_REPLACEMENT_REPLICAS,
RAY_SERVE_ENABLE_TASK_EVENTS,
RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS,
RAY_SERVE_MIN_HANDLE_METRICS_TIMEOUT_S,
RAY_SERVE_USE_COMPACT_SCHEDULING_STRATEGY,
REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD,
SERVE_LOGGER_NAME,
Expand Down Expand Up @@ -1608,6 +1609,26 @@ def get_total_num_requests(self) -> float:
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE
or len(running_replicas) == 0
):
# Drop metrics for handles that haven't sent an update in a while.
# This is expected behavior for handles that were on replicas or
# proxies that have been shut down.
timeout_s = max(
2 * self.autoscaling_policy_manager.get_metrics_interval_s(),
RAY_SERVE_MIN_HANDLE_METRICS_TIMEOUT_S,
)
for handle_id, handle_metric in list(self.handle_requests.items()):
if time.time() - handle_metric.timestamp >= timeout_s:
del self.handle_requests[handle_id]
total_ongoing_requests = handle_metric.queued_requests + sum(
handle_metric.running_requests.values()
)
if total_ongoing_requests > 0:
logger.info(
f"Dropping stale metrics for handle '{handle_id}' "
f"because no update was received for {timeout_s:.1f}s. "
f"Ongoing requests was: {total_ongoing_requests}."
)

for handle_metric in self.handle_requests.values():
total_requests += handle_metric.queued_requests
for replica in running_replicas:
Expand Down Expand Up @@ -1645,12 +1666,6 @@ def autoscale(self) -> int:
):
return

logger.info(
f"Autoscaling {self._id} to {decision_num_replicas} replicas. "
f"Current num requests: {total_num_requests}, "
f"current num running replicas: {num_running_replicas}."
)

new_info = copy(self._target_state.info)
new_info.version = self._target_state.version.code_version

Expand All @@ -1662,13 +1677,25 @@ def autoscale(self) -> int:
if not self._is_within_autoscaling_bounds():
return

curr_stats_str = (
f"Current ongoing requests: {total_num_requests:.2f}, "
f"current running replicas: {num_running_replicas}."
)
new_num = self._target_state.target_num_replicas
if new_num > old_num:
logger.info(
f"Upscaling {self._id} from {old_num} to {new_num} replicas. "
f"{curr_stats_str}"
)
self._curr_status_info = self._curr_status_info.handle_transition(
trigger=DeploymentStatusInternalTrigger.AUTOSCALE_UP,
message=f"Upscaling from {old_num} to {new_num} replicas.",
)
elif new_num < old_num:
logger.info(
f"Downscaling {self._id} from {old_num} to {new_num} replicas. "
f"{curr_stats_str}"
)
self._curr_status_info = self._curr_status_info.handle_transition(
trigger=DeploymentStatusInternalTrigger.AUTOSCALE_DOWN,
message=f"Downscaling from {old_num} to {new_num} replicas.",
Expand Down Expand Up @@ -2433,6 +2460,8 @@ def get_autoscaling_metrics(self):

return {
deployment: deployment_state.get_total_num_requests()
if deployment_state.should_autoscale()
else None
for deployment, deployment_state in self._deployment_states.items()
}

Expand Down
8 changes: 2 additions & 6 deletions python/ray/serve/_private/proxy_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,18 +486,14 @@ def _reconcile_internal(self, draining: bool):
return
elif self._status == ProxyStatus.HEALTHY:
if draining:
logger.info(
f"Start draining the proxy actor on node {self._node_id}"
)
logger.info(f"Draining proxy on node '{self._node_id}'.")
assert self._last_drain_check_time is None

self._actor_proxy_wrapper.update_draining(draining=True)
self.try_update_status(ProxyStatus.DRAINING)
elif self._status == ProxyStatus.DRAINING:
if not draining:
logger.info(
f"Stop draining the proxy actor on node {self._node_id}"
)
logger.info(f"No longer draining proxy on node '{self._node_id}'.")
self._last_drain_check_time = None

self._actor_proxy_wrapper.update_draining(draining=False)
Expand Down
4 changes: 4 additions & 0 deletions python/ray/serve/_private/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ def __init__(
self._event_loop = event_loop
self.deployment_id = deployment_id

logger.info(
f"Created DeploymentHandle '{handle_id}' for {deployment_id}.",
extra={"log_to_stderr": False},
)
if inside_ray_client_context():
# Streaming ObjectRefGenerators are not supported in Ray Client, so we need
# to override the behavior.
Expand Down
19 changes: 19 additions & 0 deletions python/ray/serve/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,28 @@ def set_available_resources_per_node(self, node_id: str, resources: Dict):
self.available_resources_per_node[node_id] = deepcopy(resources)


class FakeRemoteFunction:
def remote(self):
pass


class MockActorHandle:
def __init__(self, **kwargs):
self._options = kwargs
self._actor_id = "fake_id"
self.initialize_and_get_metadata_called = False
self.is_allocated_called = False

@property
def initialize_and_get_metadata(self):
self.initialize_and_get_metadata_called = True
# return a mock object so that we can call `remote()` on it.
return FakeRemoteFunction()

@property
def is_allocated(self):
self.is_allocated_called = True
return FakeRemoteFunction()


class MockActorClass:
Expand Down
3 changes: 3 additions & 0 deletions python/ray/serve/handle.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import concurrent.futures
import logging
import threading
import time
import warnings
Expand All @@ -10,6 +11,7 @@
from ray import serve
from ray._raylet import GcsClient, ObjectRefGenerator
from ray.serve._private.common import DeploymentID, RequestMetadata, RequestProtocol
from ray.serve._private.constants import SERVE_LOGGER_NAME
from ray.serve._private.default_impl import create_cluster_node_info_cache
from ray.serve._private.router import Router
from ray.serve._private.usage import ServeUsageTag
Expand All @@ -27,6 +29,7 @@

_global_async_loop = None
_global_async_loop_creation_lock = threading.Lock()
logger = logging.getLogger(SERVE_LOGGER_NAME)


def _create_or_get_global_asyncio_event_loop_in_thread():
Expand Down
16 changes: 13 additions & 3 deletions python/ray/serve/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ py_test_module_list(
"test_regression.py",
"test_request_timeout.py",
"test_cluster.py",
"test_autoscaling_policy.py",
"test_cancellation.py",
"test_streaming_response.py",
"test_controller_recovery.py",
Expand All @@ -80,7 +79,7 @@ py_test_module_list(
# Test autoscaling with RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE set to 0
py_test(
name = "test_autoscaling_policy_with_metr_disab",
size = "medium",
size = "large",
main = "test_autoscaling_policy.py",
srcs = ["test_autoscaling_policy.py"],
tags = ["exclusive", "team:serve", "autoscaling"],
Expand Down Expand Up @@ -110,6 +109,7 @@ py_test_module_list(
"test_standalone.py",
"test_standalone_3.py",
"test_deploy_app.py",
"test_autoscaling_policy.py",
],
size = "large",
tags = ["exclusive", "team:serve"],
Expand Down Expand Up @@ -185,7 +185,6 @@ py_test_module_list(
name_suffix="_with_queue_len_cache_disabled",
env={"RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE": "0"},
files = [
"test_autoscaling_policy.py",
"test_cancellation.py",
"test_handle.py",
"test_handle_api.py",
Expand All @@ -199,6 +198,17 @@ py_test_module_list(
data = glob(["test_config_files/**/*"]),
)

py_test_module_list(
name_suffix="_with_queue_len_cache_disabled",
env={"RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE": "0"},
files = [
"test_autoscaling_policy.py",
],
size = "large",
tags = ["exclusive", "no_windows", "team:serve"],
deps = ["//python/ray/serve:serve_lib", ":conftest", ":common"],
)

# Test old stop-fully-then-start behavior.
# TODO(zcin): remove this after the old behavior is completely removed
py_test_module_list(
Expand Down
Loading

0 comments on commit 228c7ee

Please sign in to comment.