Skip to content

Commit

Permalink
Revert "[Serve] ServeHandle detects ActorError and drop replicas from…
Browse files Browse the repository at this point in the history
… target group (ray-project#26685)" (ray-project#27283)

This reverts commit 545c516.

Signed-off-by: Stefan van der Kleij <[email protected]>
  • Loading branch information
simon-mo authored and Stefan van der Kleij committed Aug 18, 2022
1 parent f0abe3c commit cc4a310
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 81 deletions.
54 changes: 7 additions & 47 deletions python/ray/serve/_private/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import ray
from ray.actor import ActorHandle
from ray.exceptions import RayActorError, RayTaskError
from ray.util import metrics

from ray.serve._private.common import RunningReplicaInfo
Expand Down Expand Up @@ -88,17 +87,6 @@ def __init__(
{"deployment": self.deployment_name}
)

def _reset_replica_iterator(self):
"""Reset the iterator used to load balance replicas.
This call is expected to be called after the replica membership has
been updated. It will shuffle the replicas randomly to avoid multiple
handle sending requests in the same order.
"""
replicas = list(self.in_flight_queries.keys())
random.shuffle(replicas)
self.replica_iterator = itertools.cycle(replicas)

def update_running_replicas(self, running_replicas: List[RunningReplicaInfo]):
added, removed, _ = compute_iterable_delta(
self.in_flight_queries.keys(), running_replicas
Expand All @@ -109,13 +97,14 @@ def update_running_replicas(self, running_replicas: List[RunningReplicaInfo]):

for removed_replica in removed:
# Delete it directly because shutdown is processed by controller.
# Replicas might already been deleted due to early detection of
# actor error.
self.in_flight_queries.pop(removed_replica, None)
del self.in_flight_queries[removed_replica]

if len(added) > 0 or len(removed) > 0:
# Shuffle the keys to avoid synchronization across clients.
replicas = list(self.in_flight_queries.keys())
random.shuffle(replicas)
self.replica_iterator = itertools.cycle(replicas)
logger.debug(f"ReplicaSet: +{len(added)}, -{len(removed)} replicas.")
self._reset_replica_iterator()
self.config_updated_event.set()

def _try_assign_replica(self, query: Query) -> Optional[ray.ObjectRef]:
Expand Down Expand Up @@ -171,38 +160,9 @@ def _all_query_refs(self):

def _drain_completed_object_refs(self) -> int:
refs = self._all_query_refs
# NOTE(simon): even though the timeout is 0, a large number of refs can still
# cause some blocking delay in the event loop. Consider moving this to async?
done, _ = ray.wait(refs, num_returns=len(refs), timeout=0)
replicas_to_remove = []
for replica_info, replica_in_flight_queries in self.in_flight_queries.items():
completed_queries = replica_in_flight_queries.intersection(done)
if len(completed_queries):
try:
# NOTE(simon): this ray.get call should be cheap because all these
# refs are ready as indicated by previous `ray.wait` call.
ray.get(list(completed_queries))
except RayActorError:
logger.debug(
f"Removing {replica_info.replica_tag} from replica set "
"because the actor exited."
)
replicas_to_remove.append(replica_info)
except RayTaskError:
# Ignore application error.
pass
except Exception:
logger.exception(
"Handle received unexpected error when processing request."
)

replica_in_flight_queries.difference_update(completed_queries)

if len(replicas_to_remove) > 0:
for replica_info in replicas_to_remove:
self.in_flight_queries.pop(replica_info, None)
self._reset_replica_iterator()

for replica_in_flight_queries in self.in_flight_queries.values():
replica_in_flight_queries.difference_update(done)
return len(done)

async def assign_replica(self, query: Query) -> ray.ObjectRef:
Expand Down
34 changes: 0 additions & 34 deletions python/ray/serve/tests/test_standalone2.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import requests

import ray
import ray.actor
import ray._private.state
from ray import serve
from ray._private.test_utils import wait_for_condition
Expand Down Expand Up @@ -651,39 +650,6 @@ def test_shutdown_remote(start_and_shutdown_ray_cli_function):
os.unlink(shutdown_file.name)


def test_handle_early_detect_failure(shutdown_ray):
"""Check that handle can be notified about replicas failure and take them out of the replicas set."""
ray.init()
serve.start(detached=True)

@serve.deployment(num_replicas=2, max_concurrent_queries=1)
def f(do_crash: bool = False):
if do_crash:
os._exit(1)
return os.getpid()

handle = serve.run(f.bind())
pids = ray.get([handle.remote() for _ in range(2)])
assert len(set(pids)) == 2
assert len(handle.router._replica_set.in_flight_queries.keys()) == 2

client = get_global_client()
# Kill the controller so that the replicas membership won't be updated
# through controller health check + long polling.
ray.kill(client._controller, no_restart=True)

with pytest.raises(RayActorError):
ray.get(handle.remote(do_crash=True))

pids = ray.get([handle.remote() for _ in range(10)])
assert len(set(pids)) == 1
assert len(handle.router._replica_set.in_flight_queries.keys()) == 1

# Restart the controller, and then clean up all the replicas
serve.start(detached=True)
serve.shutdown()


def test_autoscaler_shutdown_node_http_everynode(
shutdown_ray, call_ray_stop_only # noqa: F811
):
Expand Down

0 comments on commit cc4a310

Please sign in to comment.