Skip to content

Commit

Permalink
[serve] Clear replica_updated_event in streaming router to avoid bl…
Browse files Browse the repository at this point in the history
…ocking proxy event loop (ray-project#36459)

`replica_updated_event` was not being cleared. It seems that even though we were awaiting the event, it was not yielding the loop because the event was always set. This caused the `while replica is not None` loop to busy-spin, blocking the HTTP proxy event loop.

Closes ray-project#36460
  • Loading branch information
edoakes committed Jun 15, 2023
1 parent 2cf264c commit d8c9e89
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 4 deletions.
6 changes: 6 additions & 0 deletions python/ray/serve/_private/http_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ def __init__(self, controller_name: str):
self.self_actor_handle = ray.get_runtime_context().current_actor
self.asgi_receive_queues: Dict[str, ASGIMessageQueue] = dict()

if RAY_SERVE_ENABLE_EXPERIMENTAL_STREAMING:
logger.info(
"Experimental streaming feature flag enabled.",
extra={"log_to_stderr": False},
)

def get_handle(name):
return serve.context.get_global_client().get_handle(
name,
Expand Down
23 changes: 20 additions & 3 deletions python/ray/serve/_private/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ class RoundRobinStreamingReplicaScheduler(ReplicaScheduler):
This policy does *not* respect `max_concurrent_queries`.
"""

def __init__(self):
def __init__(self, deployment_name: str):
self._deployment_name = deployment_name
self._replica_id_set = set()
self._replica_iterator = itertools.cycle([])
self._replicas_updated_event = asyncio.Event()

Expand All @@ -102,9 +104,13 @@ async def assign_replica(
replica = next(self._replica_iterator)
except StopIteration:
logger.info(
"Tried to assign replica but none available",
"Tried to assign replica for deployment "
f"{self._deployment_name} but none available.",
extra={"log_to_stderr": False},
)
# Clear before waiting to avoid immediately waking up if there
# had ever been an update before.
self._replicas_updated_event.clear()
await self._replicas_updated_event.wait()

if replica.is_cross_language:
Expand All @@ -121,6 +127,15 @@ async def assign_replica(
)

def update_running_replicas(self, running_replicas: List[RunningReplicaInfo]):
new_replica_ids = {r.replica_tag for r in running_replicas}
if new_replica_ids != self._replica_id_set:
self._replica_id_set = new_replica_ids
logger.info(
"Got updated replicas for deployment "
f"{self._deployment_name}: {new_replica_ids}.",
extra={"log_to_stderr": False},
)

random.shuffle(running_replicas)
self._replica_iterator = itertools.cycle(running_replicas)
self._replicas_updated_event.set()
Expand Down Expand Up @@ -384,7 +399,9 @@ def __init__(
"""
self._event_loop = event_loop
if _stream:
self._replica_scheduler = RoundRobinStreamingReplicaScheduler()
self._replica_scheduler = RoundRobinStreamingReplicaScheduler(
deployment_name
)
else:
self._replica_scheduler = RoundRobinReplicaScheduler(event_loop)

Expand Down
52 changes: 51 additions & 1 deletion python/ray/serve/tests/test_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest
import ray
from ray import serve
from ray._private.test_utils import wait_for_condition
from ray._private.test_utils import wait_for_condition, SignalActor
from ray.serve._private.constants import (
SERVE_DEFAULT_APP_NAME,
DEPLOYMENT_NAME_PREFIX_SEPARATOR,
Expand Down Expand Up @@ -207,5 +207,55 @@ def __call__(self, *args):
time.sleep(0.1)


def test_no_available_replicas_does_not_block_proxy(serve_instance):
"""Test that handle blocking waiting for replicas doesn't block proxy.
This is essential so that other requests and health checks can pass while a
deployment is deploying/updating.
See https://github.com/ray-project/ray/issues/36460.
"""

@serve.deployment
class SlowStarter:
def __init__(self, starting_actor, finish_starting_actor):
ray.get(starting_actor.send.remote())
ray.get(finish_starting_actor.wait.remote())

def __call__(self):
return "hi"

@ray.remote
def make_blocked_request():
r = requests.get("http:https://localhost:8000/")
r.raise_for_status()
return r.text

# Loop twice: first iteration tests deploying from nothing, second iteration
# tests updating the replicas of an existing deployment.
for _ in range(2):
starting_actor = SignalActor.remote()
finish_starting_actor = SignalActor.remote()
serve.run(
SlowStarter.bind(starting_actor, finish_starting_actor), _blocking=False
)

# Ensure that the replica has been started (we use _blocking=False).
ray.get(starting_actor.wait.remote())

# The request shouldn't complete until the replica has finished started.
blocked_ref = make_blocked_request.remote()
with pytest.raises(TimeoutError):
ray.get(blocked_ref, timeout=1)

# If the proxy's loop was blocked, these would hang.
requests.get("http:https://localhost:8000/-/routes").raise_for_status()
requests.get("http:https://localhost:8000/-/healthz").raise_for_status()

# Signal the replica to finish starting; request should complete.
ray.get(finish_starting_actor.send.remote())
assert ray.get(blocked_ref) == "hi"


if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))

0 comments on commit d8c9e89

Please sign in to comment.