Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Remove old grpc code #39863

Merged
merged 18 commits into from
Sep 28, 2023
80 changes: 34 additions & 46 deletions python/ray/serve/_private/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import inspect
import logging
import os
from types import FunctionType
from typing import Any, Dict, Tuple, Union

Expand All @@ -15,7 +14,6 @@
CONTROLLER_MAX_CONCURRENCY,
HTTP_PROXY_TIMEOUT,
SERVE_CONTROLLER_NAME,
SERVE_EXPERIMENTAL_DISABLE_PROXY,
SERVE_NAMESPACE,
)
from ray.serve._private.controller import ServeController
Expand All @@ -27,8 +25,6 @@

logger = logging.getLogger(__file__)

FLAG_DISABLE_PROXY = os.environ.get(SERVE_EXPERIMENTAL_DISABLE_PROXY, "0") == "1"


def get_deployment(name: str, app_name: str = ""):
"""Dynamically fetch a handle to a Deployment object.
Expand Down Expand Up @@ -151,49 +147,41 @@ def _start_controller(
"max_concurrency": CONTROLLER_MAX_CONCURRENCY,
}

if FLAG_DISABLE_PROXY:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no logic change, only remove the if case

controller = ServeController.options(**controller_actor_options).remote(
controller_name,
http_config=http_options,
detached=detached,
_disable_proxy=True,
)
else:
# Legacy http proxy actor check
http_deprecated_args = ["http_host", "http_port", "http_middlewares"]
for key in http_deprecated_args:
if key in kwargs:
raise ValueError(
f"{key} is deprecated, please use serve.start(http_options="
f'{{"{key}": {kwargs[key]}}}) instead.'
)

if isinstance(http_options, dict):
http_options = HTTPOptions.parse_obj(http_options)
if http_options is None:
http_options = HTTPOptions()

if isinstance(grpc_options, dict):
grpc_options = gRPCOptions(**grpc_options)

controller = ServeController.options(**controller_actor_options).remote(
controller_name,
http_config=http_options,
detached=detached,
grpc_options=grpc_options,
)
# Legacy http proxy actor check
http_deprecated_args = ["http_host", "http_port", "http_middlewares"]
for key in http_deprecated_args:
if key in kwargs:
raise ValueError(
f"{key} is deprecated, please use serve.start(http_options="
f'{{"{key}": {kwargs[key]}}}) instead.'
)

if isinstance(http_options, dict):
http_options = HTTPOptions.parse_obj(http_options)
if http_options is None:
http_options = HTTPOptions()

proxy_handles = ray.get(controller.get_proxies.remote())
if len(proxy_handles) > 0:
try:
ray.get(
[handle.ready.remote() for handle in proxy_handles.values()],
timeout=HTTP_PROXY_TIMEOUT,
)
except ray.exceptions.GetTimeoutError:
raise TimeoutError(
f"HTTP proxies not available after {HTTP_PROXY_TIMEOUT}s."
)
if isinstance(grpc_options, dict):
grpc_options = gRPCOptions(**grpc_options)

controller = ServeController.options(**controller_actor_options).remote(
controller_name,
http_config=http_options,
detached=detached,
grpc_options=grpc_options,
)

proxy_handles = ray.get(controller.get_proxies.remote())
if len(proxy_handles) > 0:
try:
ray.get(
[handle.ready.remote() for handle in proxy_handles.values()],
timeout=HTTP_PROXY_TIMEOUT,
)
except ray.exceptions.GetTimeoutError:
raise TimeoutError(
f"HTTP proxies not available after {HTTP_PROXY_TIMEOUT}s."
)
return controller, controller_name


Expand Down
5 changes: 0 additions & 5 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,11 +984,6 @@ def override_deployment_info(
if deployment_route_prefix is not DEFAULT.VALUE:
override_options["route_prefix"] = deployment_route_prefix

# Override is_driver_deployment if specified in deployment config
is_driver_deployment = options.pop("is_driver_deployment", None)
if is_driver_deployment is not None:
override_options["is_driver_deployment"] = is_driver_deployment

# Merge app-level and deployment-level runtime_envs.
replica_config = info.replica_config
app_runtime_env = override_config.runtime_env
Expand Down
1 change: 0 additions & 1 deletion python/ray/serve/_private/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ def deploy_application(
deployment_config=deployment["deployment_config"],
version=deployment["version"],
route_prefix=deployment["route_prefix"],
is_driver_deployment=deployment["is_driver_deployment"],
docs_path=deployment["docs_path"],
)
)
Expand Down
7 changes: 0 additions & 7 deletions python/ray/serve/_private/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ def __init__(
actor_name: Optional[str] = None,
version: Optional[str] = None,
end_time_ms: Optional[int] = None,
is_driver_deployment: Optional[bool] = False,
route_prefix: str = None,
docs_path: str = None,
ingress: bool = False,
Expand All @@ -231,8 +230,6 @@ def __init__(
# ephermal state
self._cached_actor_def = None

self.is_driver_deployment = is_driver_deployment

self.route_prefix = route_prefix
self.docs_path = docs_path
self.ingress = ingress
Expand Down Expand Up @@ -264,7 +261,6 @@ def update(
deployment_config: DeploymentConfig = None,
replica_config: ReplicaConfig = None,
version: str = None,
is_driver_deployment: bool = None,
route_prefix: str = None,
) -> "DeploymentInfo":
return DeploymentInfo(
Expand All @@ -275,9 +271,6 @@ def update(
actor_name=self.actor_name,
version=version or self.version,
end_time_ms=self.end_time_ms,
is_driver_deployment=is_driver_deployment
if is_driver_deployment is not None
else self.is_driver_deployment,
route_prefix=route_prefix or self.route_prefix,
docs_path=self.docs_path,
ingress=self.ingress,
Expand Down
4 changes: 0 additions & 4 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,6 @@
"See https://docs.ray.io/en/latest/serve/index.html for more information."
)


# [EXPERIMENTAL] Disable the proxy actor
SERVE_EXPERIMENTAL_DISABLE_PROXY = "SERVE_EXPERIMENTAL_DISABLE_PROXY"

# Message
MULTI_APP_MIGRATION_MESSAGE = (
"Please see the documentation for ServeDeploySchema for more details on multi-app "
Expand Down
22 changes: 8 additions & 14 deletions python/ray/serve/_private/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ async def __init__(
*,
http_config: HTTPOptions,
detached: bool = False,
_disable_proxy: bool = False,
grpc_options: Optional[gRPCOptions] = None,
):
self._controller_node_id = ray.get_runtime_context().get_node_id()
Expand Down Expand Up @@ -155,17 +154,14 @@ async def __init__(
self.long_poll_host = LongPollHost()
self.done_recovering_event = asyncio.Event()

if _disable_proxy:
self.proxy_state_manager = None
else:
self.proxy_state_manager = ProxyStateManager(
controller_name,
detached,
http_config,
self._controller_node_id,
self.cluster_node_info_cache,
grpc_options,
)
self.proxy_state_manager = ProxyStateManager(
controller_name,
detached,
http_config,
self._controller_node_id,
self.cluster_node_info_cache,
grpc_options,
)

self.endpoint_state = EndpointState(self.kv_store, self.long_poll_host)

Expand Down Expand Up @@ -592,7 +588,6 @@ def deploy(
route_prefix: Optional[str],
deployer_job_id: Union[str, bytes],
docs_path: Optional[str] = None,
is_driver_deployment: Optional[bool] = False,
# TODO(edoakes): this is a hack because the deployment_language doesn't seem
# to get set properly from Java.
is_deployed_from_python: bool = False,
Expand All @@ -610,7 +605,6 @@ def deploy(
deployer_job_id=deployer_job_id,
route_prefix=route_prefix,
docs_path=docs_path,
is_driver_deployment=is_driver_deployment,
app_name="",
)

Expand Down
4 changes: 0 additions & 4 deletions python/ray/serve/_private/deploy_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def get_deploy_args(
deployment_config: Optional[Union[DeploymentConfig, Dict[str, Any]]] = None,
version: Optional[str] = None,
route_prefix: Optional[str] = None,
is_driver_deployment: Optional[str] = None,
docs_path: Optional[str] = None,
) -> Dict:
"""
Expand Down Expand Up @@ -55,7 +54,6 @@ def get_deploy_args(
"replica_config_proto_bytes": replica_config.to_proto_bytes(),
"route_prefix": route_prefix,
"deployer_job_id": ray.get_runtime_context().get_job_id(),
"is_driver_deployment": is_driver_deployment,
"docs_path": docs_path,
"ingress": ingress,
}
Expand All @@ -70,7 +68,6 @@ def deploy_args_to_deployment_info(
deployer_job_id: Union[str, bytes],
route_prefix: Optional[str],
docs_path: Optional[str],
is_driver_deployment: Optional[bool] = False,
app_name: Optional[str] = None,
ingress: bool = False,
**kwargs,
Expand Down Expand Up @@ -100,7 +97,6 @@ def deploy_args_to_deployment_info(
replica_config=replica_config,
deployer_job_id=deployer_job_id,
start_time_ms=int(time.time() * 1000),
is_driver_deployment=is_driver_deployment,
route_prefix=route_prefix,
docs_path=docs_path,
ingress=ingress,
Expand Down
3 changes: 0 additions & 3 deletions python/ray/serve/_private/deployment_graph_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ def replace_with_handle(node):
init_args=replaced_deployment_init_args,
init_kwargs=replaced_deployment_init_kwargs,
route_prefix=route_prefix,
is_driver_deployment=deployment_shell._is_driver_deployment,
_internal=True,
)

Expand Down Expand Up @@ -410,7 +409,6 @@ def replace_with_handle(node):
return original_driver_deployment.options(
init_args=replaced_deployment_init_args,
init_kwargs=replaced_deployment_init_kwargs,
is_driver_deployment=original_driver_deployment._is_driver_deployment,
_internal=True,
)

Expand Down Expand Up @@ -459,7 +457,6 @@ def process_ingress_deployment_in_serve_dag(
# didn't provide anything in particular.
new_ingress_deployment = ingress_deployment.options(
route_prefix="/",
is_driver_deployment=ingress_deployment._is_driver_deployment,
_internal=True,
)
deployments[-1] = new_ingress_deployment
Expand Down
69 changes: 5 additions & 64 deletions python/ray/serve/_private/deployment_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@
from abc import ABC, abstractmethod
from collections import defaultdict
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional, Set, Tuple, Union
from typing import Callable, Dict, List, Optional, Set, Tuple

import ray
from ray.serve._private.cluster_node_info_cache import ClusterNodeInfoCache
from ray.serve._private.common import DeploymentID
from ray.serve._private.utils import get_head_node_id
from ray.util.scheduling_strategies import (
NodeAffinitySchedulingStrategy,
PlacementGroupSchedulingStrategy,
)
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy


class SpreadDeploymentSchedulingPolicy:
Expand All @@ -21,12 +18,6 @@ class SpreadDeploymentSchedulingPolicy:
pass


class DriverDeploymentSchedulingPolicy:
"""A scheduling policy that schedules exactly one replica on each node."""

pass


@dataclass
class ReplicaSchedulingRequest:
"""Request to schedule a single replica.
Expand Down Expand Up @@ -71,9 +62,7 @@ class DeploymentScheduler(ABC):
def on_deployment_created(
self,
deployment_id: DeploymentID,
scheduling_policy: Union[
SpreadDeploymentSchedulingPolicy, DriverDeploymentSchedulingPolicy
],
scheduling_policy: SpreadDeploymentSchedulingPolicy,
) -> None:
"""Called whenever a new deployment is created."""
raise NotImplementedError
Expand Down Expand Up @@ -149,9 +138,7 @@ def __init__(self, cluster_node_info_cache: ClusterNodeInfoCache):
def on_deployment_created(
self,
deployment_id: DeploymentID,
scheduling_policy: Union[
SpreadDeploymentSchedulingPolicy, DriverDeploymentSchedulingPolicy
],
scheduling_policy: SpreadDeploymentSchedulingPolicy,
) -> None:
"""Called whenever a new deployment is created."""
assert deployment_id not in self._pending_replicas
Expand Down Expand Up @@ -231,16 +218,7 @@ def schedule(
if not pending_replicas:
continue

deployment_scheduling_policy = self._deployments[deployment_id]
if isinstance(
deployment_scheduling_policy, SpreadDeploymentSchedulingPolicy
):
self._schedule_spread_deployment(deployment_id)
else:
assert isinstance(
deployment_scheduling_policy, DriverDeploymentSchedulingPolicy
)
self._schedule_driver_deployment(deployment_id)
self._schedule_spread_deployment(deployment_id)

deployment_to_replicas_to_stop = {}
for downscale in downscales.values():
Expand Down Expand Up @@ -300,43 +278,6 @@ def _schedule_spread_deployment(self, deployment_id: DeploymentID) -> None:
actor_handle, placement_group=placement_group
)

def _schedule_driver_deployment(self, deployment_id: DeploymentID) -> None:
if self._recovering_replicas[deployment_id]:
# Wait until recovering is done before scheduling new replicas
# so that we can make sure we don't schedule two replicas on the same node.
return

all_active_nodes = self._cluster_node_info_cache.get_active_node_ids()
scheduled_nodes = set()
for node_id in self._launching_replicas[deployment_id].values():
assert node_id is not None
scheduled_nodes.add(node_id)
for node_id in self._running_replicas[deployment_id].values():
assert node_id is not None
scheduled_nodes.add(node_id)
unscheduled_nodes = all_active_nodes - scheduled_nodes

for pending_replica_name in list(self._pending_replicas[deployment_id].keys()):
if not unscheduled_nodes:
return

replica_scheduling_request = self._pending_replicas[deployment_id][
pending_replica_name
]

target_node_id = unscheduled_nodes.pop()
actor_handle = replica_scheduling_request.actor_def.options(
scheduling_strategy=NodeAffinitySchedulingStrategy(
target_node_id, soft=False
),
**replica_scheduling_request.actor_options,
).remote(*replica_scheduling_request.actor_init_args)
del self._pending_replicas[deployment_id][pending_replica_name]
self._launching_replicas[deployment_id][
pending_replica_name
] = target_node_id
replica_scheduling_request.on_scheduled(actor_handle, placement_group=None)

def _get_replicas_to_stop(
self, deployment_id: DeploymentID, max_num_to_stop: int
) -> Set[str]:
Expand Down
Loading
Loading