Skip to content

Commit

Permalink
[Serve] add blocking to serve.run() (#43227)
Browse files Browse the repository at this point in the history
Rename the existing serve.run to serve._run and added blocking option to the new serve.run

---------

Signed-off-by: Gene Su <[email protected]>
  • Loading branch information
GeneDer committed Feb 20, 2024
1 parent 3e8333c commit 3b9adee
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 46 deletions.
2 changes: 2 additions & 0 deletions python/ray/serve/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ray.serve.api import (
Application,
Deployment,
_run,
delete,
deployment,
get_app_handle,
Expand Down Expand Up @@ -35,6 +36,7 @@
ray._private.worker.blocking_get_inside_async_warned = True

__all__ = [
"_run",
"batch",
"start",
"HTTPOptions",
Expand Down
76 changes: 54 additions & 22 deletions python/ray/serve/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import collections
import inspect
import logging
import time
from functools import wraps
from typing import Any, Callable, Dict, List, Optional, Union

Expand Down Expand Up @@ -436,7 +437,7 @@ def list_deployments() -> Dict[str, Deployment]:


@PublicAPI(stability="stable")
def run(
def _run(
target: Application,
_blocking: bool = True,
name: str = SERVE_DEFAULT_APP_NAME,
Expand All @@ -445,28 +446,9 @@ def run(
) -> DeploymentHandle:
"""Run an application and return a handle to its ingress deployment.
The application is returned by `Deployment.bind()`. Example:
.. code-block:: python
handle = serve.run(MyDeployment.bind())
ray.get(handle.remote())
Args:
target:
A Serve application returned by `Deployment.bind()`.
name: Application name. If not provided, this will be the only
application running on the cluster (it will delete all others).
route_prefix: Route prefix for HTTP requests. If not provided, it will use
route_prefix of the ingress deployment. If specified neither as an argument
nor in the ingress deployment, the route prefix will default to '/'.
logging_config: Application logging config. If provided, the config will
be applied to all deployments which doesn't have logging config.
Returns:
DeploymentHandle: A handle that can be used to call the application.
This is only used internally with the _blocking not totally blocking the following
code indefinitely until Ctrl-C'd.
"""

if len(name) == 0:
raise RayServeException("Application name must a non-empty string.")

Expand Down Expand Up @@ -530,6 +512,56 @@ def run(
return handle


@PublicAPI(stability="stable")
def run(
target: Application,
blocking: bool = False,
name: str = SERVE_DEFAULT_APP_NAME,
route_prefix: str = DEFAULT.VALUE,
logging_config: Optional[Union[Dict, LoggingConfig]] = None,
) -> DeploymentHandle:
"""Run an application and return a handle to its ingress deployment.
The application is returned by `Deployment.bind()`. Example:
.. code-block:: python
handle = serve.run(MyDeployment.bind())
ray.get(handle.remote())
Args:
target:
A Serve application returned by `Deployment.bind()`.
blocking: Whether this call should be blocking. If True, it
will loop and log status until Ctrl-C'd.
name: Application name. If not provided, this will be the only
application running on the cluster (it will delete all others).
route_prefix: Route prefix for HTTP requests. If not provided, it will use
route_prefix of the ingress deployment. If specified neither as an argument
nor in the ingress deployment, the route prefix will default to '/'.
logging_config: Application logging config. If provided, the config will
be applied to all deployments which doesn't have logging config.
Returns:
DeploymentHandle: A handle that can be used to call the application.
"""
handle = _run(
target=target,
name=name,
route_prefix=route_prefix,
logging_config=logging_config,
)

if blocking:
try:
while True:
# Block, letting Ray print logs to the terminal.
time.sleep(10)
except KeyboardInterrupt:
logger.info("Got KeyboardInterrupt, release blocking...")
return handle


@PublicAPI(stability="stable")
def delete(name: str, _blocking: bool = True):
"""Delete an application by its name.
Expand Down
17 changes: 10 additions & 7 deletions python/ray/serve/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,14 @@ def run(
if is_config:
client.deploy_apps(config, _blocking=False)
cli_logger.success("Submitted deploy config successfully.")
if blocking:
while True:
# Block, letting Ray print logs to the terminal.
time.sleep(10)
else:
serve.run(app, name=name, route_prefix=route_prefix)
# This should not block if reload is true so the watchfiles can be triggered
should_block = blocking and not reload
serve.run(app, blocking=should_block, name=name, route_prefix=route_prefix)
cli_logger.success("Deployed app successfully.")

if reload:
Expand All @@ -618,14 +624,11 @@ def run(
app = _private_api.call_app_builder_with_args_if_necessary(
import_attr(import_path, reload_module=True), args_dict
)
serve.run(app, name=name, route_prefix=route_prefix)
serve.run(
target=app, blocking=True, name=name, route_prefix=route_prefix
)
cli_logger.success("Redeployed app successfully.")

if blocking:
while True:
# Block, letting Ray print logs to the terminal.
time.sleep(10)

except KeyboardInterrupt:
cli_logger.info("Got KeyboardInterrupt, shutting down...")
serve.shutdown()
Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ class A:
def __init__(self):
1 / 0

serve.run(A.bind(), _blocking=False)
serve._run(A.bind(), _blocking=False)

def check_for_failed_deployment():
default_app = serve.status().applications[SERVE_DEFAULT_APP_NAME]
Expand Down Expand Up @@ -880,7 +880,7 @@ def __init__(self):
create_engine("mysql:https://some_wrong_url:3306").connect()

ray_actor_options = {"runtime_env": {"pip": ["PyMySQL", "sqlalchemy==1.3.19"]}}
serve.run(
serve._run(
MyDeployment.options(ray_actor_options=ray_actor_options).bind(),
_blocking=False,
)
Expand Down
6 changes: 3 additions & 3 deletions python/ray/serve/tests/test_autoscaling_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,7 @@ async def __init__(self):
app = AutoscalingDeployment.bind()

# Start the AutoscalingDeployment.
serve.run(app, name=app_name, _blocking=False)
serve._run(app, name=app_name, _blocking=False)

# Active replicas are replicas that are waiting or running.
expected_num_active_replicas: int = min_replicas
Expand Down Expand Up @@ -1165,7 +1165,7 @@ def check_expected_statuses(
max_replicas=max_replicas,
)
).bind()
serve.run(app, name=app_name, _blocking=False)
serve._run(app, name=app_name, _blocking=False)
expected_num_active_replicas = min_replicas

wait_for_condition(check_num_active_replicas, expected=expected_num_active_replicas)
Expand Down Expand Up @@ -1221,7 +1221,7 @@ def check_expected_statuses(
max_replicas=max_replicas,
)
).bind()
serve.run(app, name=app_name, _blocking=False)
serve._run(app, name=app_name, _blocking=False)
expected_num_active_replicas = min_replicas

wait_for_condition(check_num_active_replicas, expected=expected_num_active_replicas)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class E:
async def __init__(self):
await signal.wait.remote()

serve.run(E.bind(), _blocking=False)
serve._run(E.bind(), _blocking=False)

def get_replicas(replica_state):
controller = client._controller
Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/tests/test_controller_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def make_nonblocking_calls(expected, expect_blocking=False, num_returns=1):
# Redeploy new version. Since there is one replica blocking, only one new
# replica should be started up.
V2 = V1.options(func_or_class=V2, version="2")
serve.run(V2.bind(), _blocking=False, name="app")
serve._run(V2.bind(), _blocking=False, name="app")
with pytest.raises(TimeoutError):
client._wait_for_application_running("app", timeout_s=0.1)
responses3, blocking3 = make_nonblocking_calls({"1": 1}, expect_blocking=True)
Expand Down Expand Up @@ -231,7 +231,7 @@ async def __init__(self):
def __call__(self, request):
return f"1|{os.getpid()}"

serve.run(V1.bind(), _blocking=False, name="app")
serve._run(V1.bind(), _blocking=False, name="app")
ray.get(pending_init_indicator.remote())

def get_actor_info(name: str):
Expand Down
8 changes: 5 additions & 3 deletions python/ray/serve/tests/test_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ async def __call__(self, request):
# Redeploy new version. This should not go through until the old version
# replica completely stops.
V2 = V1.options(func_or_class=V2, version="2")
serve.run(V2.bind(), _blocking=False, name="app")
serve._run(V2.bind(), _blocking=False, name="app")
with pytest.raises(TimeoutError):
client._wait_for_application_running("app", timeout_s=0.1)

Expand Down Expand Up @@ -264,7 +264,7 @@ def make_nonblocking_calls(expected, expect_blocking=False):
# Redeploy new version. Since there is one replica blocking, only one new
# replica should be started up.
V2 = V1.options(func_or_class=V2, version="2")
serve.run(V2.bind(), _blocking=False, name="app")
serve._run(V2.bind(), _blocking=False, name="app")
with pytest.raises(TimeoutError):
client._wait_for_application_running("app", timeout_s=0.1)
responses3, blocking3 = make_nonblocking_calls({"1": 1}, expect_blocking=True)
Expand Down Expand Up @@ -351,7 +351,9 @@ def make_nonblocking_calls(expected, expect_blocking=False):

# Reconfigure should block one replica until the signal is sent. Check that
# some requests are now blocking.
serve.run(V1.options(user_config={"test": "2"}).bind(), name="app", _blocking=False)
serve._run(
V1.options(user_config={"test": "2"}).bind(), name="app", _blocking=False
)
responses2, blocking2 = make_nonblocking_calls({"1": 1}, expect_blocking=True)
assert list(responses2["1"])[0] in pids1

Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_deploy_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ class Model:
def __call__(self):
return "hello world"

serve.run(Model.bind(), _blocking=False)
serve._run(Model.bind(), _blocking=False)

def check_fail():
app_status = serve.status().applications["default"]
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def make_blocked_request():
for _ in range(2):
starting_actor = SignalActor.remote()
finish_starting_actor = SignalActor.remote()
serve.run(
serve._run(
SlowStarter.bind(starting_actor, finish_starting_actor), _blocking=False
)

Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/tests/test_standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ def test_updating_status_message(lower_slow_startup_threshold_and_reset):
def f(*args):
pass

serve.run(f.bind(), _blocking=False)
serve._run(f.bind(), _blocking=False)

def updating_message():
deployment_status = (
Expand Down Expand Up @@ -699,7 +699,7 @@ def __init__(self):
def __call__(self, request):
pass

serve.run(f.bind(), _blocking=False)
serve._run(f.bind(), _blocking=False)

wait_for_condition(
lambda: serve.status()
Expand Down
4 changes: 2 additions & 2 deletions release/long_running_tests/workloads/serve_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def handler(self, *args):

if blocking:
ray.get(self.random_killer.spare.remote(new_name))
serve.run(
serve._run(
handler.bind(),
name=new_name,
route_prefix=f"/{new_name}",
Expand All @@ -149,7 +149,7 @@ def handler(self, *args):
self.applications.append(new_name)
ray.get(self.random_killer.stop_spare.remote(new_name))
else:
serve.run(
serve._run(
handler.bind(),
name=new_name,
route_prefix=f"/{new_name}",
Expand Down

0 comments on commit 3b9adee

Please sign in to comment.