From d8c9e89374daeb1816f8e945422a7d728bf74152 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Thu, 15 Jun 2023 14:07:12 -0500 Subject: [PATCH] [serve] Clear `replica_updated_event` in streaming router to avoid blocking proxy event loop (#36459) `replica_updated_event` was not being cleared. It seems that even though we were awaiting the event, it was not yielding the loop because the event was always set. This caused the `while replica is not None` loop to busy-spin, blocking the HTTP proxy event loop. Closes #36460 --- python/ray/serve/_private/http_proxy.py | 6 +++ python/ray/serve/_private/router.py | 23 +++++++++-- python/ray/serve/tests/test_failure.py | 52 ++++++++++++++++++++++++- 3 files changed, 77 insertions(+), 4 deletions(-) diff --git a/python/ray/serve/_private/http_proxy.py b/python/ray/serve/_private/http_proxy.py index 0ca63e266ec83..7cb91dd604985 100644 --- a/python/ray/serve/_private/http_proxy.py +++ b/python/ray/serve/_private/http_proxy.py @@ -178,6 +178,12 @@ def __init__(self, controller_name: str): self.self_actor_handle = ray.get_runtime_context().current_actor self.asgi_receive_queues: Dict[str, ASGIMessageQueue] = dict() + if RAY_SERVE_ENABLE_EXPERIMENTAL_STREAMING: + logger.info( + "Experimental streaming feature flag enabled.", + extra={"log_to_stderr": False}, + ) + def get_handle(name): return serve.context.get_global_client().get_handle( name, diff --git a/python/ray/serve/_private/router.py b/python/ray/serve/_private/router.py index ddcd596ee7e0f..94cb426dd94f4 100644 --- a/python/ray/serve/_private/router.py +++ b/python/ray/serve/_private/router.py @@ -89,7 +89,9 @@ class RoundRobinStreamingReplicaScheduler(ReplicaScheduler): This policy does *not* respect `max_concurrent_queries`. """ - def __init__(self): + def __init__(self, deployment_name: str): + self._deployment_name = deployment_name + self._replica_id_set = set() self._replica_iterator = itertools.cycle([]) self._replicas_updated_event = asyncio.Event() @@ -102,9 +104,13 @@ async def assign_replica( replica = next(self._replica_iterator) except StopIteration: logger.info( - "Tried to assign replica but none available", + "Tried to assign replica for deployment " + f"{self._deployment_name} but none available.", extra={"log_to_stderr": False}, ) + # Clear before waiting to avoid immediately waking up if there + # had ever been an update before. + self._replicas_updated_event.clear() await self._replicas_updated_event.wait() if replica.is_cross_language: @@ -121,6 +127,15 @@ async def assign_replica( ) def update_running_replicas(self, running_replicas: List[RunningReplicaInfo]): + new_replica_ids = {r.replica_tag for r in running_replicas} + if new_replica_ids != self._replica_id_set: + self._replica_id_set = new_replica_ids + logger.info( + "Got updated replicas for deployment " + f"{self._deployment_name}: {new_replica_ids}.", + extra={"log_to_stderr": False}, + ) + random.shuffle(running_replicas) self._replica_iterator = itertools.cycle(running_replicas) self._replicas_updated_event.set() @@ -384,7 +399,9 @@ def __init__( """ self._event_loop = event_loop if _stream: - self._replica_scheduler = RoundRobinStreamingReplicaScheduler() + self._replica_scheduler = RoundRobinStreamingReplicaScheduler( + deployment_name + ) else: self._replica_scheduler = RoundRobinReplicaScheduler(event_loop) diff --git a/python/ray/serve/tests/test_failure.py b/python/ray/serve/tests/test_failure.py index 98b6282553d28..eac18a099a889 100644 --- a/python/ray/serve/tests/test_failure.py +++ b/python/ray/serve/tests/test_failure.py @@ -6,7 +6,7 @@ import pytest import ray from ray import serve -from ray._private.test_utils import wait_for_condition +from ray._private.test_utils import wait_for_condition, SignalActor from ray.serve._private.constants import ( SERVE_DEFAULT_APP_NAME, DEPLOYMENT_NAME_PREFIX_SEPARATOR, @@ -207,5 +207,55 @@ def __call__(self, *args): time.sleep(0.1) +def test_no_available_replicas_does_not_block_proxy(serve_instance): + """Test that handle blocking waiting for replicas doesn't block proxy. + + This is essential so that other requests and health checks can pass while a + deployment is deploying/updating. + + See https://github.com/ray-project/ray/issues/36460. + """ + + @serve.deployment + class SlowStarter: + def __init__(self, starting_actor, finish_starting_actor): + ray.get(starting_actor.send.remote()) + ray.get(finish_starting_actor.wait.remote()) + + def __call__(self): + return "hi" + + @ray.remote + def make_blocked_request(): + r = requests.get("http://localhost:8000/") + r.raise_for_status() + return r.text + + # Loop twice: first iteration tests deploying from nothing, second iteration + # tests updating the replicas of an existing deployment. + for _ in range(2): + starting_actor = SignalActor.remote() + finish_starting_actor = SignalActor.remote() + serve.run( + SlowStarter.bind(starting_actor, finish_starting_actor), _blocking=False + ) + + # Ensure that the replica has been started (we use _blocking=False). + ray.get(starting_actor.wait.remote()) + + # The request shouldn't complete until the replica has finished started. + blocked_ref = make_blocked_request.remote() + with pytest.raises(TimeoutError): + ray.get(blocked_ref, timeout=1) + + # If the proxy's loop was blocked, these would hang. + requests.get("http://localhost:8000/-/routes").raise_for_status() + requests.get("http://localhost:8000/-/healthz").raise_for_status() + + # Signal the replica to finish starting; request should complete. + ray.get(finish_starting_actor.send.remote()) + assert ray.get(blocked_ref) == "hi" + + if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__]))