diff --git a/python/ray/serve/_private/api.py b/python/ray/serve/_private/api.py index 76d02186e2816..b72ce39d867c9 100644 --- a/python/ray/serve/_private/api.py +++ b/python/ray/serve/_private/api.py @@ -1,6 +1,5 @@ import inspect import logging -import os from types import FunctionType from typing import Any, Dict, Tuple, Union @@ -15,7 +14,6 @@ CONTROLLER_MAX_CONCURRENCY, HTTP_PROXY_TIMEOUT, SERVE_CONTROLLER_NAME, - SERVE_EXPERIMENTAL_DISABLE_PROXY, SERVE_NAMESPACE, ) from ray.serve._private.controller import ServeController @@ -27,8 +25,6 @@ logger = logging.getLogger(__file__) -FLAG_DISABLE_PROXY = os.environ.get(SERVE_EXPERIMENTAL_DISABLE_PROXY, "0") == "1" - def get_deployment(name: str, app_name: str = ""): """Dynamically fetch a handle to a Deployment object. @@ -151,49 +147,41 @@ def _start_controller( "max_concurrency": CONTROLLER_MAX_CONCURRENCY, } - if FLAG_DISABLE_PROXY: - controller = ServeController.options(**controller_actor_options).remote( - controller_name, - http_config=http_options, - detached=detached, - _disable_proxy=True, - ) - else: - # Legacy http proxy actor check - http_deprecated_args = ["http_host", "http_port", "http_middlewares"] - for key in http_deprecated_args: - if key in kwargs: - raise ValueError( - f"{key} is deprecated, please use serve.start(http_options=" - f'{{"{key}": {kwargs[key]}}}) instead.' - ) - - if isinstance(http_options, dict): - http_options = HTTPOptions.parse_obj(http_options) - if http_options is None: - http_options = HTTPOptions() - - if isinstance(grpc_options, dict): - grpc_options = gRPCOptions(**grpc_options) - - controller = ServeController.options(**controller_actor_options).remote( - controller_name, - http_config=http_options, - detached=detached, - grpc_options=grpc_options, - ) + # Legacy http proxy actor check + http_deprecated_args = ["http_host", "http_port", "http_middlewares"] + for key in http_deprecated_args: + if key in kwargs: + raise ValueError( + f"{key} is deprecated, please use serve.start(http_options=" + f'{{"{key}": {kwargs[key]}}}) instead.' + ) + + if isinstance(http_options, dict): + http_options = HTTPOptions.parse_obj(http_options) + if http_options is None: + http_options = HTTPOptions() - proxy_handles = ray.get(controller.get_proxies.remote()) - if len(proxy_handles) > 0: - try: - ray.get( - [handle.ready.remote() for handle in proxy_handles.values()], - timeout=HTTP_PROXY_TIMEOUT, - ) - except ray.exceptions.GetTimeoutError: - raise TimeoutError( - f"HTTP proxies not available after {HTTP_PROXY_TIMEOUT}s." - ) + if isinstance(grpc_options, dict): + grpc_options = gRPCOptions(**grpc_options) + + controller = ServeController.options(**controller_actor_options).remote( + controller_name, + http_config=http_options, + detached=detached, + grpc_options=grpc_options, + ) + + proxy_handles = ray.get(controller.get_proxies.remote()) + if len(proxy_handles) > 0: + try: + ray.get( + [handle.ready.remote() for handle in proxy_handles.values()], + timeout=HTTP_PROXY_TIMEOUT, + ) + except ray.exceptions.GetTimeoutError: + raise TimeoutError( + f"HTTP proxies not available after {HTTP_PROXY_TIMEOUT}s." + ) return controller, controller_name diff --git a/python/ray/serve/_private/application_state.py b/python/ray/serve/_private/application_state.py index a671eaa9cfa00..9450d679f2f58 100644 --- a/python/ray/serve/_private/application_state.py +++ b/python/ray/serve/_private/application_state.py @@ -984,11 +984,6 @@ def override_deployment_info( if deployment_route_prefix is not DEFAULT.VALUE: override_options["route_prefix"] = deployment_route_prefix - # Override is_driver_deployment if specified in deployment config - is_driver_deployment = options.pop("is_driver_deployment", None) - if is_driver_deployment is not None: - override_options["is_driver_deployment"] = is_driver_deployment - # Merge app-level and deployment-level runtime_envs. replica_config = info.replica_config app_runtime_env = override_config.runtime_env diff --git a/python/ray/serve/_private/client.py b/python/ray/serve/_private/client.py index f1d6abddc41dd..a30f2956e9804 100644 --- a/python/ray/serve/_private/client.py +++ b/python/ray/serve/_private/client.py @@ -317,7 +317,6 @@ def deploy_application( deployment_config=deployment["deployment_config"], version=deployment["version"], route_prefix=deployment["route_prefix"], - is_driver_deployment=deployment["is_driver_deployment"], docs_path=deployment["docs_path"], ) ) diff --git a/python/ray/serve/_private/common.py b/python/ray/serve/_private/common.py index f824c8705d511..5d11d57865023 100644 --- a/python/ray/serve/_private/common.py +++ b/python/ray/serve/_private/common.py @@ -213,7 +213,6 @@ def __init__( actor_name: Optional[str] = None, version: Optional[str] = None, end_time_ms: Optional[int] = None, - is_driver_deployment: Optional[bool] = False, route_prefix: str = None, docs_path: str = None, ingress: bool = False, @@ -231,8 +230,6 @@ def __init__( # ephermal state self._cached_actor_def = None - self.is_driver_deployment = is_driver_deployment - self.route_prefix = route_prefix self.docs_path = docs_path self.ingress = ingress @@ -264,7 +261,6 @@ def update( deployment_config: DeploymentConfig = None, replica_config: ReplicaConfig = None, version: str = None, - is_driver_deployment: bool = None, route_prefix: str = None, ) -> "DeploymentInfo": return DeploymentInfo( @@ -275,9 +271,6 @@ def update( actor_name=self.actor_name, version=version or self.version, end_time_ms=self.end_time_ms, - is_driver_deployment=is_driver_deployment - if is_driver_deployment is not None - else self.is_driver_deployment, route_prefix=route_prefix or self.route_prefix, docs_path=self.docs_path, ingress=self.ingress, diff --git a/python/ray/serve/_private/constants.py b/python/ray/serve/_private/constants.py index 30324d69f41d1..7e5999d84d6a0 100644 --- a/python/ray/serve/_private/constants.py +++ b/python/ray/serve/_private/constants.py @@ -159,10 +159,6 @@ "See https://docs.ray.io/en/latest/serve/index.html for more information." ) - -# [EXPERIMENTAL] Disable the proxy actor -SERVE_EXPERIMENTAL_DISABLE_PROXY = "SERVE_EXPERIMENTAL_DISABLE_PROXY" - # Message MULTI_APP_MIGRATION_MESSAGE = ( "Please see the documentation for ServeDeploySchema for more details on multi-app " diff --git a/python/ray/serve/_private/controller.py b/python/ray/serve/_private/controller.py index e87f31f5d9e49..8fd3ba4857b7c 100644 --- a/python/ray/serve/_private/controller.py +++ b/python/ray/serve/_private/controller.py @@ -116,7 +116,6 @@ async def __init__( *, http_config: HTTPOptions, detached: bool = False, - _disable_proxy: bool = False, grpc_options: Optional[gRPCOptions] = None, ): self._controller_node_id = ray.get_runtime_context().get_node_id() @@ -155,17 +154,14 @@ async def __init__( self.long_poll_host = LongPollHost() self.done_recovering_event = asyncio.Event() - if _disable_proxy: - self.proxy_state_manager = None - else: - self.proxy_state_manager = ProxyStateManager( - controller_name, - detached, - http_config, - self._controller_node_id, - self.cluster_node_info_cache, - grpc_options, - ) + self.proxy_state_manager = ProxyStateManager( + controller_name, + detached, + http_config, + self._controller_node_id, + self.cluster_node_info_cache, + grpc_options, + ) self.endpoint_state = EndpointState(self.kv_store, self.long_poll_host) @@ -592,7 +588,6 @@ def deploy( route_prefix: Optional[str], deployer_job_id: Union[str, bytes], docs_path: Optional[str] = None, - is_driver_deployment: Optional[bool] = False, # TODO(edoakes): this is a hack because the deployment_language doesn't seem # to get set properly from Java. is_deployed_from_python: bool = False, @@ -610,7 +605,6 @@ def deploy( deployer_job_id=deployer_job_id, route_prefix=route_prefix, docs_path=docs_path, - is_driver_deployment=is_driver_deployment, app_name="", ) diff --git a/python/ray/serve/_private/deploy_utils.py b/python/ray/serve/_private/deploy_utils.py index 17089dc2372cf..61c2d63f4a196 100644 --- a/python/ray/serve/_private/deploy_utils.py +++ b/python/ray/serve/_private/deploy_utils.py @@ -21,7 +21,6 @@ def get_deploy_args( deployment_config: Optional[Union[DeploymentConfig, Dict[str, Any]]] = None, version: Optional[str] = None, route_prefix: Optional[str] = None, - is_driver_deployment: Optional[str] = None, docs_path: Optional[str] = None, ) -> Dict: """ @@ -55,7 +54,6 @@ def get_deploy_args( "replica_config_proto_bytes": replica_config.to_proto_bytes(), "route_prefix": route_prefix, "deployer_job_id": ray.get_runtime_context().get_job_id(), - "is_driver_deployment": is_driver_deployment, "docs_path": docs_path, "ingress": ingress, } @@ -70,7 +68,6 @@ def deploy_args_to_deployment_info( deployer_job_id: Union[str, bytes], route_prefix: Optional[str], docs_path: Optional[str], - is_driver_deployment: Optional[bool] = False, app_name: Optional[str] = None, ingress: bool = False, **kwargs, @@ -100,7 +97,6 @@ def deploy_args_to_deployment_info( replica_config=replica_config, deployer_job_id=deployer_job_id, start_time_ms=int(time.time() * 1000), - is_driver_deployment=is_driver_deployment, route_prefix=route_prefix, docs_path=docs_path, ingress=ingress, diff --git a/python/ray/serve/_private/deployment_graph_build.py b/python/ray/serve/_private/deployment_graph_build.py index 16baa38ec39e4..9d544be05d781 100644 --- a/python/ray/serve/_private/deployment_graph_build.py +++ b/python/ray/serve/_private/deployment_graph_build.py @@ -232,7 +232,6 @@ def replace_with_handle(node): init_args=replaced_deployment_init_args, init_kwargs=replaced_deployment_init_kwargs, route_prefix=route_prefix, - is_driver_deployment=deployment_shell._is_driver_deployment, _internal=True, ) @@ -410,7 +409,6 @@ def replace_with_handle(node): return original_driver_deployment.options( init_args=replaced_deployment_init_args, init_kwargs=replaced_deployment_init_kwargs, - is_driver_deployment=original_driver_deployment._is_driver_deployment, _internal=True, ) @@ -459,7 +457,6 @@ def process_ingress_deployment_in_serve_dag( # didn't provide anything in particular. new_ingress_deployment = ingress_deployment.options( route_prefix="/", - is_driver_deployment=ingress_deployment._is_driver_deployment, _internal=True, ) deployments[-1] = new_ingress_deployment diff --git a/python/ray/serve/_private/deployment_scheduler.py b/python/ray/serve/_private/deployment_scheduler.py index 6718c0575141a..bf3935f91661d 100644 --- a/python/ray/serve/_private/deployment_scheduler.py +++ b/python/ray/serve/_private/deployment_scheduler.py @@ -3,16 +3,13 @@ from abc import ABC, abstractmethod from collections import defaultdict from dataclasses import dataclass -from typing import Callable, Dict, List, Optional, Set, Tuple, Union +from typing import Callable, Dict, List, Optional, Set, Tuple import ray from ray.serve._private.cluster_node_info_cache import ClusterNodeInfoCache from ray.serve._private.common import DeploymentID from ray.serve._private.utils import get_head_node_id -from ray.util.scheduling_strategies import ( - NodeAffinitySchedulingStrategy, - PlacementGroupSchedulingStrategy, -) +from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy class SpreadDeploymentSchedulingPolicy: @@ -21,12 +18,6 @@ class SpreadDeploymentSchedulingPolicy: pass -class DriverDeploymentSchedulingPolicy: - """A scheduling policy that schedules exactly one replica on each node.""" - - pass - - @dataclass class ReplicaSchedulingRequest: """Request to schedule a single replica. @@ -71,9 +62,7 @@ class DeploymentScheduler(ABC): def on_deployment_created( self, deployment_id: DeploymentID, - scheduling_policy: Union[ - SpreadDeploymentSchedulingPolicy, DriverDeploymentSchedulingPolicy - ], + scheduling_policy: SpreadDeploymentSchedulingPolicy, ) -> None: """Called whenever a new deployment is created.""" raise NotImplementedError @@ -149,9 +138,7 @@ def __init__(self, cluster_node_info_cache: ClusterNodeInfoCache): def on_deployment_created( self, deployment_id: DeploymentID, - scheduling_policy: Union[ - SpreadDeploymentSchedulingPolicy, DriverDeploymentSchedulingPolicy - ], + scheduling_policy: SpreadDeploymentSchedulingPolicy, ) -> None: """Called whenever a new deployment is created.""" assert deployment_id not in self._pending_replicas @@ -231,16 +218,7 @@ def schedule( if not pending_replicas: continue - deployment_scheduling_policy = self._deployments[deployment_id] - if isinstance( - deployment_scheduling_policy, SpreadDeploymentSchedulingPolicy - ): - self._schedule_spread_deployment(deployment_id) - else: - assert isinstance( - deployment_scheduling_policy, DriverDeploymentSchedulingPolicy - ) - self._schedule_driver_deployment(deployment_id) + self._schedule_spread_deployment(deployment_id) deployment_to_replicas_to_stop = {} for downscale in downscales.values(): @@ -300,43 +278,6 @@ def _schedule_spread_deployment(self, deployment_id: DeploymentID) -> None: actor_handle, placement_group=placement_group ) - def _schedule_driver_deployment(self, deployment_id: DeploymentID) -> None: - if self._recovering_replicas[deployment_id]: - # Wait until recovering is done before scheduling new replicas - # so that we can make sure we don't schedule two replicas on the same node. - return - - all_active_nodes = self._cluster_node_info_cache.get_active_node_ids() - scheduled_nodes = set() - for node_id in self._launching_replicas[deployment_id].values(): - assert node_id is not None - scheduled_nodes.add(node_id) - for node_id in self._running_replicas[deployment_id].values(): - assert node_id is not None - scheduled_nodes.add(node_id) - unscheduled_nodes = all_active_nodes - scheduled_nodes - - for pending_replica_name in list(self._pending_replicas[deployment_id].keys()): - if not unscheduled_nodes: - return - - replica_scheduling_request = self._pending_replicas[deployment_id][ - pending_replica_name - ] - - target_node_id = unscheduled_nodes.pop() - actor_handle = replica_scheduling_request.actor_def.options( - scheduling_strategy=NodeAffinitySchedulingStrategy( - target_node_id, soft=False - ), - **replica_scheduling_request.actor_options, - ).remote(*replica_scheduling_request.actor_init_args) - del self._pending_replicas[deployment_id][pending_replica_name] - self._launching_replicas[deployment_id][ - pending_replica_name - ] = target_node_id - replica_scheduling_request.on_scheduled(actor_handle, placement_group=None) - def _get_replicas_to_stop( self, deployment_id: DeploymentID, max_num_to_stop: int ) -> Set[str]: diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 7dc20a3d57eb9..193fc974944ba 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -42,7 +42,6 @@ from ray.serve._private.deployment_scheduler import ( DeploymentDownscaleRequest, DeploymentScheduler, - DriverDeploymentSchedulingPolicy, ReplicaSchedulingRequest, SpreadDeploymentSchedulingPolicy, ) @@ -2121,140 +2120,6 @@ def _stop_one_running_replica_for_testing(self): self._replicas.add(ReplicaState.RUNNING, replica) -class DriverDeploymentState(DeploymentState): - """Manages the target state and replicas for a single driver deployment.""" - - def __init__( - self, - id: DeploymentID, - controller_name: str, - detached: bool, - long_poll_host: LongPollHost, - deployment_scheduler: DeploymentScheduler, - cluster_node_info_cache: ClusterNodeInfoCache, - _save_checkpoint_func: Callable, - ): - super().__init__( - id, - controller_name, - detached, - long_poll_host, - deployment_scheduler, - cluster_node_info_cache, - _save_checkpoint_func, - ) - - def _deploy_driver(self) -> List[ReplicaSchedulingRequest]: - """Deploy the driver deployment to each node.""" - num_running_replicas = self._replicas.count(states=[ReplicaState.RUNNING]) - if num_running_replicas >= self._target_state.num_replicas: - # Cancel starting replicas when driver deployment state creates - # more replicas than alive nodes. - # For example, get_active_node_ids returns 4 nodes when - # the driver deployment state decides the target number of replicas - # but later on when the deployment scheduler schedules these 4 replicas, - # there are only 3 alive nodes (1 node dies in between). - # In this case, 1 replica will be in the PENDING_ALLOCATION and we - # cancel it here. - for replica in self._replicas.pop(states=[ReplicaState.STARTING]): - self._stop_replica(replica) - - return [] - - upscale = [] - num_existing_replicas = self._replicas.count() - for _ in range(self._target_state.num_replicas - num_existing_replicas): - replica_name = ReplicaName( - self.app_name, self.deployment_name, get_random_letters() - ) - new_deployment_replica = DeploymentReplica( - self._controller_name, - self._detached, - replica_name.replica_tag, - self._id, - self._target_state.version, - ) - upscale.append(new_deployment_replica.start(self._target_state.info)) - - self._replicas.add(ReplicaState.STARTING, new_deployment_replica) - - return upscale - - def _stop_all_replicas(self) -> bool: - replica_changed = False - for replica in self._replicas.pop( - states=[ - ReplicaState.STARTING, - ReplicaState.RUNNING, - ReplicaState.RECOVERING, - ReplicaState.UPDATING, - ] - ): - self._stop_replica(replica) - replica_changed = True - return replica_changed - - def _calculate_max_replicas_to_stop(self) -> int: - num_nodes = len(self._cluster_node_info_cache.get_active_node_ids()) - rollout_size = max(int(0.2 * num_nodes), 1) - old_running_replicas = self._replicas.count( - exclude_version=self._target_state.version, - states=[ReplicaState.STARTING, ReplicaState.UPDATING, ReplicaState.RUNNING], - ) - new_running_replicas = self._replicas.count( - version=self._target_state.version, states=[ReplicaState.RUNNING] - ) - pending_replicas = num_nodes - new_running_replicas - old_running_replicas - return max(rollout_size - pending_replicas, 0) - - def update(self) -> DeploymentStateUpdateResult: - try: - self._check_and_update_replicas() - - upscale = [] - if self._target_state.deleting: - self._stop_all_replicas() - else: - num_nodes = len(self._cluster_node_info_cache.get_active_node_ids()) - # For driver deployment, when there are new node, - # it is supposed to update the target state. - if self._target_state.num_replicas != num_nodes: - self._target_state.num_replicas = num_nodes - curr_info = self._target_state.info - new_config = copy(curr_info) - new_config.deployment_config.num_replicas = num_nodes - if new_config.version is None: - new_config.version = self._target_state.version.code_version - self._set_target_state(new_config) - - self._stop_replicas_on_draining_nodes() - - max_to_stop = self._calculate_max_replicas_to_stop() - self._stop_or_update_outdated_version_replicas(max_to_stop) - - upscale = self._deploy_driver() - - deleted, any_replicas_recovering = self._check_curr_status() - return DeploymentStateUpdateResult( - deleted=deleted, - any_replicas_recovering=any_replicas_recovering, - upscale=upscale, - downscale=None, - ) - except Exception: - self._curr_status_info = DeploymentStatusInfo( - name=self.deployment_name, - status=DeploymentStatus.UNHEALTHY, - message="Failed to update deployment:" f"\n{traceback.format_exc()}", - ) - return DeploymentStateUpdateResult( - deleted=False, any_replicas_recovering=False, upscale=[], downscale=None - ) - - def should_autoscale(self) -> bool: - return False - - class DeploymentStateManager: """Manages all state for deployments in the system. @@ -2293,21 +2158,6 @@ def __init__( # TODO(simon): move autoscaling related stuff into a manager. self.handle_metrics_store = InMemoryMetricsStore() - def _create_driver_deployment_state(self, deployment_id): - self._deployment_scheduler.on_deployment_created( - deployment_id, DriverDeploymentSchedulingPolicy() - ) - - return DriverDeploymentState( - deployment_id, - self._controller_name, - self._detached, - self._long_poll_host, - self._deployment_scheduler, - self._cluster_node_info_cache, - self._save_checkpoint_func, - ) - def _create_deployment_state(self, deployment_id): self._deployment_scheduler.on_deployment_created( deployment_id, SpreadDeploymentSchedulingPolicy() @@ -2446,12 +2296,7 @@ def _recover_from_checkpoint( ) = cloudpickle.loads(checkpoint) for deployment_id, checkpoint_data in deployment_state_info.items(): - if checkpoint_data.info.is_driver_deployment: - deployment_state = self._create_driver_deployment_state( - deployment_id - ) - else: - deployment_state = self._create_deployment_state(deployment_id) + deployment_state = self._create_deployment_state(deployment_id) deployment_state.recover_target_state_from_checkpoint(checkpoint_data) if len(deployment_to_current_replicas[deployment_id]) > 0: deployment_state.recover_current_state_from_replica_actor_names( # noqa: E501 @@ -2594,14 +2439,9 @@ def deploy( del self._deleted_deployment_metadata[deployment_id] if deployment_id not in self._deployment_states: - if deployment_info.is_driver_deployment: - self._deployment_states[ - deployment_id - ] = self._create_driver_deployment_state(deployment_id) - else: - self._deployment_states[deployment_id] = self._create_deployment_state( - deployment_id - ) + self._deployment_states[deployment_id] = self._create_deployment_state( + deployment_id + ) self._record_deployment_usage() return self._deployment_states[deployment_id].deploy(deployment_info) diff --git a/python/ray/serve/_private/proxy.py b/python/ray/serve/_private/proxy.py index 1fa53abdb38ba..920bb55536a34 100644 --- a/python/ray/serve/_private/proxy.py +++ b/python/ray/serve/_private/proxy.py @@ -1598,6 +1598,7 @@ async def run_grpc_server(self): grpc_server = create_serve_grpc_server( service_handler_factory=self.grpc_proxy.service_handler_factory, ) + grpc_server.add_insecure_port(f"[::]:{self.grpc_port}") # Dummy servicer is used to be callable for the gRPC server. Serve have a diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 764a5ca03e6bc..61719362d75a8 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -281,7 +281,6 @@ def deployment( graceful_shutdown_timeout_s: Default[float] = DEFAULT.VALUE, health_check_period_s: Default[float] = DEFAULT.VALUE, health_check_timeout_s: Default[float] = DEFAULT.VALUE, - is_driver_deployment: Optional[bool] = DEFAULT.VALUE, ) -> Callable[[Callable], Deployment]: """Decorator that converts a Python class to a `Deployment`. @@ -336,8 +335,6 @@ class MyDeployment: no more work to be done before shutting down. Defaults to 2s. graceful_shutdown_timeout_s: Duration to wait for a replica to gracefully shut down before being forcefully killed. Defaults to 20s. - is_driver_deployment: [EXPERIMENTAL] when set, exactly one replica of this - deployment runs on every node (like a daemon set). max_replicas_per_node: [EXPERIMENTAL] The max number of deployment replicas can run on a single node. Valid values are None (no limitation) or an integer in the range of [1, 100]. @@ -384,9 +381,6 @@ class MyDeployment: "`serve.run` instead." ) - if is_driver_deployment is DEFAULT.VALUE: - is_driver_deployment = False - deployment_config = DeploymentConfig.from_default( num_replicas=num_replicas if num_replicas is not None else 1, user_config=user_config, @@ -430,7 +424,6 @@ def decorator(_func_or_class): replica_config, version=(version if version is not DEFAULT.VALUE else None), route_prefix=route_prefix, - is_driver_deployment=is_driver_deployment, _internal=True, ) @@ -565,7 +558,6 @@ def run( "version": deployment._version or get_random_letters(), "route_prefix": deployment.route_prefix, "url": deployment.url, - "is_driver_deployment": deployment._is_driver_deployment, "docs_path": deployment._docs_path, "ingress": deployment._name == ingress._name, } diff --git a/python/ray/serve/built_application.py b/python/ray/serve/built_application.py index 46383e9eef809..ef7596a21b385 100644 --- a/python/ray/serve/built_application.py +++ b/python/ray/serve/built_application.py @@ -81,7 +81,6 @@ def _get_deploy_args_from_built_app(app: BuiltApplication): deployment_config=deployment._deployment_config, version=deployment.version, route_prefix=deployment.route_prefix, - is_driver_deployment=deployment._is_driver_deployment, docs_path=deployment._docs_path, ) ) diff --git a/python/ray/serve/deployment.py b/python/ray/serve/deployment.py index bc3278cbfa7ab..ec3c523ed35b0 100644 --- a/python/ray/serve/deployment.py +++ b/python/ray/serve/deployment.py @@ -116,7 +116,6 @@ def __init__( replica_config: ReplicaConfig, version: Optional[str] = None, route_prefix: Union[str, None, DEFAULT] = DEFAULT.VALUE, - is_driver_deployment: Optional[bool] = False, _internal=False, ) -> None: if not _internal: @@ -140,12 +139,6 @@ def __init__( if "{" in route_prefix or "}" in route_prefix: raise ValueError("route_prefix may not contain wildcards.") - if is_driver_deployment is True: - if deployment_config.num_replicas != 1: - raise ValueError("num_replicas should not be set for driver deployment") - if deployment_config.autoscaling_config: - raise ValueError("autoscaling should not be set for driver deployment") - docs_path = None if ( inspect.isclass(replica_config.deployment_def) @@ -160,7 +153,6 @@ def __init__( self._deployment_config = deployment_config self._replica_config = replica_config self._route_prefix = route_prefix - self._is_driver_deployment = is_driver_deployment self._docs_path = docs_path @property @@ -214,7 +206,7 @@ def init_kwargs(self) -> Tuple[Any]: @property def url(self) -> Optional[str]: - if self._route_prefix is None or self._is_driver_deployment: + if self._route_prefix is None: # this deployment is not exposed over HTTP return None @@ -386,7 +378,6 @@ def options( graceful_shutdown_timeout_s: Default[float] = DEFAULT.VALUE, health_check_period_s: Default[float] = DEFAULT.VALUE, health_check_timeout_s: Default[float] = DEFAULT.VALUE, - is_driver_deployment: bool = DEFAULT.VALUE, _internal: bool = False, ) -> "Deployment": """Return a copy of this deployment with updated options. @@ -497,9 +488,6 @@ def options( if health_check_timeout_s is not DEFAULT.VALUE: new_deployment_config.health_check_timeout_s = health_check_timeout_s - if is_driver_deployment is DEFAULT.VALUE: - is_driver_deployment = self._is_driver_deployment - new_replica_config = ReplicaConfig.create( func_or_class, init_args=init_args, @@ -517,7 +505,6 @@ def options( version=version, route_prefix=route_prefix, _internal=True, - is_driver_deployment=is_driver_deployment, ) @Deprecated( @@ -545,7 +532,6 @@ def set_options( graceful_shutdown_timeout_s: Default[float] = DEFAULT.VALUE, health_check_period_s: Default[float] = DEFAULT.VALUE, health_check_timeout_s: Default[float] = DEFAULT.VALUE, - is_driver_deployment: bool = DEFAULT.VALUE, _internal: bool = False, ) -> None: """Overwrite this deployment's options in-place. @@ -579,7 +565,6 @@ def set_options( health_check_period_s=health_check_period_s, health_check_timeout_s=health_check_timeout_s, _internal=_internal, - is_driver_deployment=is_driver_deployment, ) self._name = validated._name @@ -648,7 +633,6 @@ def deployment_to_schema( "placement_group_strategy": d._replica_config.placement_group_strategy, "placement_group_bundles": d._replica_config.placement_group_bundles, "max_replicas_per_node": d._replica_config.max_replicas_per_node, - "is_driver_deployment": d._is_driver_deployment, } if include_route_prefix: @@ -701,11 +685,6 @@ def schema_to_deployment(s: DeploymentSchema) -> Deployment: else: max_replicas_per_node = s.max_replicas_per_node - if s.is_driver_deployment is DEFAULT.VALUE: - is_driver_deployment = False - else: - is_driver_deployment = s.is_driver_deployment - deployment_config = DeploymentConfig.from_default( num_replicas=s.num_replicas, user_config=s.user_config, @@ -736,5 +715,4 @@ def schema_to_deployment(s: DeploymentSchema) -> Deployment: replica_config=replica_config, route_prefix=s.route_prefix, _internal=True, - is_driver_deployment=is_driver_deployment, ) diff --git a/python/ray/serve/drivers.py b/python/ray/serve/drivers.py index 7511744176852..6776034dfa0d3 100644 --- a/python/ray/serve/drivers.py +++ b/python/ray/serve/drivers.py @@ -1,24 +1,17 @@ -import asyncio import functools import logging -import sys from typing import Any, Callable, Dict, Optional, Union -import grpc from fastapi import Depends, FastAPI -import ray from ray import cloudpickle, serve -from ray._private.tls_utils import add_port_to_grpc_server -from ray._private.utils import get_or_create_event_loop -from ray.serve._private.constants import DEFAULT_GRPC_PORT, SERVE_LOGGER_NAME +from ray.serve._private.constants import SERVE_LOGGER_NAME from ray.serve._private.http_util import ASGIAppReplicaWrapper from ray.serve._private.usage import ServeUsageTag from ray.serve._private.utils import install_serve_encoders_to_fastapi from ray.serve.deployment_graph import RayServeDAGHandle from ray.serve.drivers_utils import load_http_adapter from ray.serve.exceptions import RayServeException -from ray.serve.generated import serve_pb2, serve_pb2_grpc from ray.serve.handle import RayServeHandle from ray.util.annotations import PublicAPI @@ -131,104 +124,3 @@ async def get_intermediate_object_refs(self) -> Dict[str, Any]: async def get_pickled_dag_node(self) -> bytes: """Returns the serialized root dag node.""" return self.dags[self.MATCH_ALL_ROUTE_PREFIX].pickled_dag_node - - -@PublicAPI(stability="alpha") -class gRPCIngress: - """ - gRPC Ingress that starts gRPC server based on the port - """ - - def __init__(self, port: int = DEFAULT_GRPC_PORT): - """Create a gRPC Ingress. - - Args: - port: Set the port that the gRPC server will listen to. - """ - - self.server = grpc.aio.server() - self.port = port - - self._attach_grpc_server_with_schema() - - self.setup_complete = asyncio.Event() - self.running_task = get_or_create_event_loop().create_task(self.run()) - ServeUsageTag.GRPC_INGRESS_USED.record("1") - - async def run(self): - """Start gRPC Server""" - logger.info( - "Starting gRPC server with on node:{} " - "listening on port {}".format(ray.util.get_node_ip_address(), self.port) - ) - address = "[::]:{}".format(self.port) - try: - # Depending on whether RAY_USE_TLS is on, `add_port_to_grpc_server` - # can create a secure or insecure channel - self.grpc_port = add_port_to_grpc_server(self.server, address) - except Exception: - # TODO(SongGuyang): Catch the exception here because there is - # port conflict issue which brought from static port. We should - # remove this after we find better port resolution. - logger.exception( - "Failed to add port to grpc server. GRPC service will be disabled" - ) - self.server = None - self.grpc_port = None - - self.setup_complete.set() - await self.server.start() - await self.server.wait_for_termination() - - def _attach_grpc_server_with_schema(self): - """Attach the gRPC server with schema implementation - - Protobuf Schema gRPC should generate bind function - (e.g. add_PredictAPIsServiceServicer_to_server) to bind gRPC server - and schema interface - """ - # protobuf Schema gRPC should generate bind function - # (e.g. add_PredictAPIsServiceServicer_to_server) to bind gRPC server - # and schema interface - bind_function_name = "add_{}_to_server" - for index in range(len(self.__class__.__bases__)): - module_name = self.__class__.__bases__[index].__module__ - servicer_name = self.__class__.__bases__[index].__name__ - try: - getattr( - sys.modules[module_name], bind_function_name.format(servicer_name) - )(self, self.server) - return - except AttributeError: - pass - raise RayServeException( - "Fail to attach the gRPC server with schema implementation" - ) - - -@serve.deployment(is_driver_deployment=True, ray_actor_options={"num_cpus": 0}) -class DefaultgRPCDriver(serve_pb2_grpc.PredictAPIsServiceServicer, gRPCIngress): - """ - gRPC Driver that responsible for redirecting the gRPC requests - and hold dag handle - """ - - def __init__(self, dag: RayServeDAGHandle, port=DEFAULT_GRPC_PORT): - """Create a grpc driver based on the PredictAPIsService schema. - - Args: - dags: a handle to a Ray Serve DAG. - port: Port to use to listen to receive the request - """ - self.dag = dag - # TODO(Sihan) we will add a gRPCOption class - # once we have more options to use - super().__init__(port) - - async def Predict(self, request, context): - """ - gRPC Predict function implementation - """ - res = await (await self.dag.remote(dict(request.input))) - - return serve_pb2.PredictResponse(prediction=res) diff --git a/python/ray/serve/generated/serve_pb2.py b/python/ray/serve/generated/serve_pb2.py index 04141696cc84b..426bb4852534c 100644 --- a/python/ray/serve/generated/serve_pb2.py +++ b/python/ray/serve/generated/serve_pb2.py @@ -15,7 +15,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1csrc/ray/protobuf/serve.proto\x12\tray.serve\"\xb6\x03\n\x11\x41utoscalingConfig\x12\x14\n\x0cmin_replicas\x18\x01 \x01(\r\x12\x14\n\x0cmax_replicas\x18\x02 \x01(\r\x12/\n\'target_num_ongoing_requests_per_replica\x18\x03 \x01(\x01\x12\x1a\n\x12metrics_interval_s\x18\x04 \x01(\x01\x12\x1a\n\x12look_back_period_s\x18\x05 \x01(\x01\x12\x18\n\x10smoothing_factor\x18\x06 \x01(\x01\x12\x19\n\x11\x64ownscale_delay_s\x18\x07 \x01(\x01\x12\x17\n\x0fupscale_delay_s\x18\x08 \x01(\x01\x12\x1d\n\x10initial_replicas\x18\t \x01(\rH\x00\x88\x01\x01\x12%\n\x18upscale_smoothing_factor\x18\n \x01(\x01H\x01\x88\x01\x01\x12\'\n\x1a\x64ownscale_smoothing_factor\x18\x0b \x01(\x01H\x02\x88\x01\x01\x42\x13\n\x11_initial_replicasB\x1b\n\x19_upscale_smoothing_factorB\x1d\n\x1b_downscale_smoothing_factor\"\xb0\x03\n\x10\x44\x65ploymentConfig\x12\x14\n\x0cnum_replicas\x18\x01 \x01(\x05\x12\x1e\n\x16max_concurrent_queries\x18\x02 \x01(\x05\x12\x13\n\x0buser_config\x18\x03 \x01(\x0c\x12%\n\x1dgraceful_shutdown_wait_loop_s\x18\x04 \x01(\x01\x12#\n\x1bgraceful_shutdown_timeout_s\x18\x05 \x01(\x01\x12\x1d\n\x15health_check_period_s\x18\x06 \x01(\x01\x12\x1e\n\x16health_check_timeout_s\x18\x07 \x01(\x01\x12\x19\n\x11is_cross_language\x18\x08 \x01(\x08\x12:\n\x13\x64\x65ployment_language\x18\t \x01(\x0e\x32\x1d.ray.serve.DeploymentLanguage\x12\x38\n\x12\x61utoscaling_config\x18\n \x01(\x0b\x32\x1c.ray.serve.AutoscalingConfig\x12\x0f\n\x07version\x18\x0b \x01(\t\x12$\n\x1cuser_configured_option_names\x18\x0c \x03(\t\"\xb6\x01\n\x0fRequestMetadata\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x10\n\x08\x65ndpoint\x18\x02 \x01(\t\x12\x13\n\x0b\x63\x61ll_method\x18\x03 \x01(\t\x12\x38\n\x07\x63ontext\x18\x04 \x03(\x0b\x32\'.ray.serve.RequestMetadata.ContextEntry\x1a.\n\x0c\x43ontextEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x1e\n\x0eRequestWrapper\x12\x0c\n\x04\x62ody\x18\x01 \x01(\x0c\"=\n\rUpdatedObject\x12\x17\n\x0fobject_snapshot\x18\x01 \x01(\x0c\x12\x13\n\x0bsnapshot_id\x18\x02 \x01(\x05\"\x9c\x01\n\x0fLongPollRequest\x12O\n\x14keys_to_snapshot_ids\x18\x01 \x03(\x0b\x32\x31.ray.serve.LongPollRequest.KeysToSnapshotIdsEntry\x1a\x38\n\x16KeysToSnapshotIdsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x05:\x02\x38\x01\"\xa9\x01\n\x0eLongPollResult\x12\x46\n\x0fupdated_objects\x18\x01 \x03(\x0b\x32-.ray.serve.LongPollResult.UpdatedObjectsEntry\x1aO\n\x13UpdatedObjectsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\'\n\x05value\x18\x02 \x01(\x0b\x32\x18.ray.serve.UpdatedObject:\x02\x38\x01\"\x98\x01\n\x0c\x45ndpointInfo\x12\x15\n\rendpoint_name\x18\x01 \x01(\t\x12\r\n\x05route\x18\x02 \x01(\t\x12\x33\n\x06\x63onfig\x18\x03 \x03(\x0b\x32#.ray.serve.EndpointInfo.ConfigEntry\x1a-\n\x0b\x43onfigEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x92\x01\n\x0b\x45ndpointSet\x12\x38\n\tendpoints\x18\x01 \x03(\x0b\x32%.ray.serve.EndpointSet.EndpointsEntry\x1aI\n\x0e\x45ndpointsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12&\n\x05value\x18\x02 \x01(\x0b\x32\x17.ray.serve.EndpointInfo:\x02\x38\x01\"\x1e\n\rActorNameList\x12\r\n\x05names\x18\x01 \x03(\t\"\xde\x01\n\x11\x44\x65ploymentVersion\x12\x14\n\x0c\x63ode_version\x18\x01 \x01(\t\x12\x36\n\x11\x64\x65ployment_config\x18\x02 \x01(\x0b\x32\x1b.ray.serve.DeploymentConfig\x12\x19\n\x11ray_actor_options\x18\x03 \x01(\t\x12\x1f\n\x17placement_group_bundles\x18\x04 \x01(\t\x12 \n\x18placement_group_strategy\x18\x05 \x01(\t\x12\x1d\n\x15max_replicas_per_node\x18\x06 \x01(\x05\"\xe9\x01\n\rReplicaConfig\x12\x1b\n\x13\x64\x65ployment_def_name\x18\x01 \x01(\t\x12\x16\n\x0e\x64\x65ployment_def\x18\x02 \x01(\x0c\x12\x11\n\tinit_args\x18\x03 \x01(\x0c\x12\x13\n\x0binit_kwargs\x18\x04 \x01(\x0c\x12\x19\n\x11ray_actor_options\x18\x05 \x01(\t\x12\x1f\n\x17placement_group_bundles\x18\x06 \x01(\t\x12 \n\x18placement_group_strategy\x18\x07 \x01(\t\x12\x1d\n\x15max_replicas_per_node\x18\x08 \x01(\x05\"\xd9\x01\n\x0e\x44\x65ploymentInfo\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x36\n\x11\x64\x65ployment_config\x18\x02 \x01(\x0b\x32\x1b.ray.serve.DeploymentConfig\x12\x30\n\x0ereplica_config\x18\x03 \x01(\x0b\x32\x18.ray.serve.ReplicaConfig\x12\x15\n\rstart_time_ms\x18\x04 \x01(\x03\x12\x12\n\nactor_name\x18\x05 \x01(\t\x12\x0f\n\x07version\x18\x06 \x01(\t\x12\x13\n\x0b\x65nd_time_ms\x18\x07 \x01(\x03\"T\n\x0f\x44\x65ploymentRoute\x12\x32\n\x0f\x64\x65ployment_info\x18\x01 \x01(\x0b\x32\x19.ray.serve.DeploymentInfo\x12\r\n\x05route\x18\x02 \x01(\t\"L\n\x13\x44\x65ploymentRouteList\x12\x35\n\x11\x64\x65ployment_routes\x18\x01 \x03(\x0b\x32\x1a.ray.serve.DeploymentRoute\"b\n\x14\x44\x65ploymentStatusInfo\x12\x0c\n\x04name\x18\x01 \x01(\t\x12+\n\x06status\x18\x02 \x01(\x0e\x32\x1b.ray.serve.DeploymentStatus\x12\x0f\n\x07message\x18\x03 \x01(\t\"\\\n\x18\x44\x65ploymentStatusInfoList\x12@\n\x17\x64\x65ployment_status_infos\x18\x01 \x03(\x0b\x32\x1f.ray.serve.DeploymentStatusInfo\"t\n\x15\x41pplicationStatusInfo\x12,\n\x06status\x18\x01 \x01(\x0e\x32\x1c.ray.serve.ApplicationStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x1c\n\x14\x64\x65ployment_timestamp\x18\x03 \x01(\x01\"\x96\x01\n\x0eStatusOverview\x12\x34\n\napp_status\x18\x01 \x01(\x0b\x32 .ray.serve.ApplicationStatusInfo\x12@\n\x13\x64\x65ployment_statuses\x18\x02 \x01(\x0b\x32#.ray.serve.DeploymentStatusInfoList\x12\x0c\n\x04name\x18\x03 \x01(\t\"s\n\x0ePredictRequest\x12\x33\n\x05input\x18\x02 \x03(\x0b\x32$.ray.serve.PredictRequest.InputEntry\x1a,\n\nInputEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"%\n\x0fPredictResponse\x12\x12\n\nprediction\x18\x01 \x01(\x0c\"\x19\n\x17ListApplicationsRequest\"5\n\x18ListApplicationsResponse\x12\x19\n\x11\x61pplication_names\x18\x01 \x03(\t\"\x10\n\x0eHealthzRequest\"\"\n\x0fHealthzResponse\x12\x0f\n\x07message\x18\x01 \x01(\t\"<\n\x12UserDefinedMessage\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0b\n\x03\x66oo\x18\x02 \x01(\t\x12\x0b\n\x03num\x18\x03 \x01(\x03\"7\n\x13UserDefinedResponse\x12\x10\n\x08greeting\x18\x01 \x01(\t\x12\x0e\n\x06num_x2\x18\x02 \x01(\x03\"\x15\n\x13UserDefinedMessage2\"(\n\x14UserDefinedResponse2\x12\x10\n\x08greeting\x18\x01 \x01(\t\"=\n\x0c\x46ruitAmounts\x12\x0e\n\x06orange\x18\x01 \x01(\x03\x12\r\n\x05\x61pple\x18\x02 \x01(\x03\x12\x0e\n\x06\x62\x61nana\x18\x03 \x01(\x03\"\x1b\n\nFruitCosts\x12\r\n\x05\x63osts\x18\x01 \x01(\x02\"\x17\n\x07RawData\x12\x0c\n\x04nums\x18\x01 \x03(\x02\"\x1d\n\x0bModelOutput\x12\x0e\n\x06output\x18\x01 \x01(\x02**\n\x12\x44\x65ploymentLanguage\x12\n\n\x06PYTHON\x10\x00\x12\x08\n\x04JAVA\x10\x01*r\n\x10\x44\x65ploymentStatus\x12\x1e\n\x1a\x44\x45PLOYMENT_STATUS_UPDATING\x10\x00\x12\x1d\n\x19\x44\x45PLOYMENT_STATUS_HEALTHY\x10\x01\x12\x1f\n\x1b\x44\x45PLOYMENT_STATUS_UNHEALTHY\x10\x02*\xe2\x01\n\x11\x41pplicationStatus\x12 \n\x1c\x41PPLICATION_STATUS_DEPLOYING\x10\x00\x12\x1e\n\x1a\x41PPLICATION_STATUS_RUNNING\x10\x01\x12$\n APPLICATION_STATUS_DEPLOY_FAILED\x10\x02\x12\x1f\n\x1b\x41PPLICATION_STATUS_DELETING\x10\x03\x12\"\n\x1e\x41PPLICATION_STATUS_NOT_STARTED\x10\x05\x12 \n\x1c\x41PPLICATION_STATUS_UNHEALTHY\x10\x06\x32V\n\x12PredictAPIsService\x12@\n\x07Predict\x12\x19.ray.serve.PredictRequest\x1a\x1a.ray.serve.PredictResponse2\xb3\x01\n\x12RayServeAPIService\x12[\n\x10ListApplications\x12\".ray.serve.ListApplicationsRequest\x1a#.ray.serve.ListApplicationsResponse\x12@\n\x07Healthz\x12\x19.ray.serve.HealthzRequest\x1a\x1a.ray.serve.HealthzResponse2\xc3\x02\n\x12UserDefinedService\x12I\n\x08__call__\x12\x1d.ray.serve.UserDefinedMessage\x1a\x1e.ray.serve.UserDefinedResponse\x12H\n\x07Method1\x12\x1d.ray.serve.UserDefinedMessage\x1a\x1e.ray.serve.UserDefinedResponse\x12J\n\x07Method2\x12\x1e.ray.serve.UserDefinedMessage2\x1a\x1f.ray.serve.UserDefinedResponse2\x12L\n\tStreaming\x12\x1d.ray.serve.UserDefinedMessage\x1a\x1e.ray.serve.UserDefinedResponse0\x01\x32L\n\x0c\x46ruitService\x12<\n\nFruitStand\x12\x17.ray.serve.FruitAmounts\x1a\x15.ray.serve.FruitCosts2S\n\x18RayServeBenchmarkService\x12\x37\n\tgrpc_call\x12\x12.ray.serve.RawData\x1a\x16.ray.serve.ModelOutputB*\n\x16io.ray.serve.generatedB\x0bServeProtosP\x01\xf8\x01\x01\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1csrc/ray/protobuf/serve.proto\x12\tray.serve\"\xb6\x03\n\x11\x41utoscalingConfig\x12\x14\n\x0cmin_replicas\x18\x01 \x01(\r\x12\x14\n\x0cmax_replicas\x18\x02 \x01(\r\x12/\n\'target_num_ongoing_requests_per_replica\x18\x03 \x01(\x01\x12\x1a\n\x12metrics_interval_s\x18\x04 \x01(\x01\x12\x1a\n\x12look_back_period_s\x18\x05 \x01(\x01\x12\x18\n\x10smoothing_factor\x18\x06 \x01(\x01\x12\x19\n\x11\x64ownscale_delay_s\x18\x07 \x01(\x01\x12\x17\n\x0fupscale_delay_s\x18\x08 \x01(\x01\x12\x1d\n\x10initial_replicas\x18\t \x01(\rH\x00\x88\x01\x01\x12%\n\x18upscale_smoothing_factor\x18\n \x01(\x01H\x01\x88\x01\x01\x12\'\n\x1a\x64ownscale_smoothing_factor\x18\x0b \x01(\x01H\x02\x88\x01\x01\x42\x13\n\x11_initial_replicasB\x1b\n\x19_upscale_smoothing_factorB\x1d\n\x1b_downscale_smoothing_factor\"\xb0\x03\n\x10\x44\x65ploymentConfig\x12\x14\n\x0cnum_replicas\x18\x01 \x01(\x05\x12\x1e\n\x16max_concurrent_queries\x18\x02 \x01(\x05\x12\x13\n\x0buser_config\x18\x03 \x01(\x0c\x12%\n\x1dgraceful_shutdown_wait_loop_s\x18\x04 \x01(\x01\x12#\n\x1bgraceful_shutdown_timeout_s\x18\x05 \x01(\x01\x12\x1d\n\x15health_check_period_s\x18\x06 \x01(\x01\x12\x1e\n\x16health_check_timeout_s\x18\x07 \x01(\x01\x12\x19\n\x11is_cross_language\x18\x08 \x01(\x08\x12:\n\x13\x64\x65ployment_language\x18\t \x01(\x0e\x32\x1d.ray.serve.DeploymentLanguage\x12\x38\n\x12\x61utoscaling_config\x18\n \x01(\x0b\x32\x1c.ray.serve.AutoscalingConfig\x12\x0f\n\x07version\x18\x0b \x01(\t\x12$\n\x1cuser_configured_option_names\x18\x0c \x03(\t\"\xb6\x01\n\x0fRequestMetadata\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x10\n\x08\x65ndpoint\x18\x02 \x01(\t\x12\x13\n\x0b\x63\x61ll_method\x18\x03 \x01(\t\x12\x38\n\x07\x63ontext\x18\x04 \x03(\x0b\x32\'.ray.serve.RequestMetadata.ContextEntry\x1a.\n\x0c\x43ontextEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x1e\n\x0eRequestWrapper\x12\x0c\n\x04\x62ody\x18\x01 \x01(\x0c\"=\n\rUpdatedObject\x12\x17\n\x0fobject_snapshot\x18\x01 \x01(\x0c\x12\x13\n\x0bsnapshot_id\x18\x02 \x01(\x05\"\x9c\x01\n\x0fLongPollRequest\x12O\n\x14keys_to_snapshot_ids\x18\x01 \x03(\x0b\x32\x31.ray.serve.LongPollRequest.KeysToSnapshotIdsEntry\x1a\x38\n\x16KeysToSnapshotIdsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x05:\x02\x38\x01\"\xa9\x01\n\x0eLongPollResult\x12\x46\n\x0fupdated_objects\x18\x01 \x03(\x0b\x32-.ray.serve.LongPollResult.UpdatedObjectsEntry\x1aO\n\x13UpdatedObjectsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\'\n\x05value\x18\x02 \x01(\x0b\x32\x18.ray.serve.UpdatedObject:\x02\x38\x01\"\x98\x01\n\x0c\x45ndpointInfo\x12\x15\n\rendpoint_name\x18\x01 \x01(\t\x12\r\n\x05route\x18\x02 \x01(\t\x12\x33\n\x06\x63onfig\x18\x03 \x03(\x0b\x32#.ray.serve.EndpointInfo.ConfigEntry\x1a-\n\x0b\x43onfigEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x92\x01\n\x0b\x45ndpointSet\x12\x38\n\tendpoints\x18\x01 \x03(\x0b\x32%.ray.serve.EndpointSet.EndpointsEntry\x1aI\n\x0e\x45ndpointsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12&\n\x05value\x18\x02 \x01(\x0b\x32\x17.ray.serve.EndpointInfo:\x02\x38\x01\"\x1e\n\rActorNameList\x12\r\n\x05names\x18\x01 \x03(\t\"\xde\x01\n\x11\x44\x65ploymentVersion\x12\x14\n\x0c\x63ode_version\x18\x01 \x01(\t\x12\x36\n\x11\x64\x65ployment_config\x18\x02 \x01(\x0b\x32\x1b.ray.serve.DeploymentConfig\x12\x19\n\x11ray_actor_options\x18\x03 \x01(\t\x12\x1f\n\x17placement_group_bundles\x18\x04 \x01(\t\x12 \n\x18placement_group_strategy\x18\x05 \x01(\t\x12\x1d\n\x15max_replicas_per_node\x18\x06 \x01(\x05\"\xe9\x01\n\rReplicaConfig\x12\x1b\n\x13\x64\x65ployment_def_name\x18\x01 \x01(\t\x12\x16\n\x0e\x64\x65ployment_def\x18\x02 \x01(\x0c\x12\x11\n\tinit_args\x18\x03 \x01(\x0c\x12\x13\n\x0binit_kwargs\x18\x04 \x01(\x0c\x12\x19\n\x11ray_actor_options\x18\x05 \x01(\t\x12\x1f\n\x17placement_group_bundles\x18\x06 \x01(\t\x12 \n\x18placement_group_strategy\x18\x07 \x01(\t\x12\x1d\n\x15max_replicas_per_node\x18\x08 \x01(\x05\"\xd9\x01\n\x0e\x44\x65ploymentInfo\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x36\n\x11\x64\x65ployment_config\x18\x02 \x01(\x0b\x32\x1b.ray.serve.DeploymentConfig\x12\x30\n\x0ereplica_config\x18\x03 \x01(\x0b\x32\x18.ray.serve.ReplicaConfig\x12\x15\n\rstart_time_ms\x18\x04 \x01(\x03\x12\x12\n\nactor_name\x18\x05 \x01(\t\x12\x0f\n\x07version\x18\x06 \x01(\t\x12\x13\n\x0b\x65nd_time_ms\x18\x07 \x01(\x03\"T\n\x0f\x44\x65ploymentRoute\x12\x32\n\x0f\x64\x65ployment_info\x18\x01 \x01(\x0b\x32\x19.ray.serve.DeploymentInfo\x12\r\n\x05route\x18\x02 \x01(\t\"L\n\x13\x44\x65ploymentRouteList\x12\x35\n\x11\x64\x65ployment_routes\x18\x01 \x03(\x0b\x32\x1a.ray.serve.DeploymentRoute\"b\n\x14\x44\x65ploymentStatusInfo\x12\x0c\n\x04name\x18\x01 \x01(\t\x12+\n\x06status\x18\x02 \x01(\x0e\x32\x1b.ray.serve.DeploymentStatus\x12\x0f\n\x07message\x18\x03 \x01(\t\"\\\n\x18\x44\x65ploymentStatusInfoList\x12@\n\x17\x64\x65ployment_status_infos\x18\x01 \x03(\x0b\x32\x1f.ray.serve.DeploymentStatusInfo\"t\n\x15\x41pplicationStatusInfo\x12,\n\x06status\x18\x01 \x01(\x0e\x32\x1c.ray.serve.ApplicationStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x1c\n\x14\x64\x65ployment_timestamp\x18\x03 \x01(\x01\"\x96\x01\n\x0eStatusOverview\x12\x34\n\napp_status\x18\x01 \x01(\x0b\x32 .ray.serve.ApplicationStatusInfo\x12@\n\x13\x64\x65ployment_statuses\x18\x02 \x01(\x0b\x32#.ray.serve.DeploymentStatusInfoList\x12\x0c\n\x04name\x18\x03 \x01(\t\"\x19\n\x17ListApplicationsRequest\"5\n\x18ListApplicationsResponse\x12\x19\n\x11\x61pplication_names\x18\x01 \x03(\t\"\x10\n\x0eHealthzRequest\"\"\n\x0fHealthzResponse\x12\x0f\n\x07message\x18\x01 \x01(\t\"<\n\x12UserDefinedMessage\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0b\n\x03\x66oo\x18\x02 \x01(\t\x12\x0b\n\x03num\x18\x03 \x01(\x03\"7\n\x13UserDefinedResponse\x12\x10\n\x08greeting\x18\x01 \x01(\t\x12\x0e\n\x06num_x2\x18\x02 \x01(\x03\"\x15\n\x13UserDefinedMessage2\"(\n\x14UserDefinedResponse2\x12\x10\n\x08greeting\x18\x01 \x01(\t\"=\n\x0c\x46ruitAmounts\x12\x0e\n\x06orange\x18\x01 \x01(\x03\x12\r\n\x05\x61pple\x18\x02 \x01(\x03\x12\x0e\n\x06\x62\x61nana\x18\x03 \x01(\x03\"\x1b\n\nFruitCosts\x12\r\n\x05\x63osts\x18\x01 \x01(\x02\"\x17\n\x07RawData\x12\x0c\n\x04nums\x18\x01 \x03(\x02\"\x1d\n\x0bModelOutput\x12\x0e\n\x06output\x18\x01 \x01(\x02**\n\x12\x44\x65ploymentLanguage\x12\n\n\x06PYTHON\x10\x00\x12\x08\n\x04JAVA\x10\x01*r\n\x10\x44\x65ploymentStatus\x12\x1e\n\x1a\x44\x45PLOYMENT_STATUS_UPDATING\x10\x00\x12\x1d\n\x19\x44\x45PLOYMENT_STATUS_HEALTHY\x10\x01\x12\x1f\n\x1b\x44\x45PLOYMENT_STATUS_UNHEALTHY\x10\x02*\xe2\x01\n\x11\x41pplicationStatus\x12 \n\x1c\x41PPLICATION_STATUS_DEPLOYING\x10\x00\x12\x1e\n\x1a\x41PPLICATION_STATUS_RUNNING\x10\x01\x12$\n APPLICATION_STATUS_DEPLOY_FAILED\x10\x02\x12\x1f\n\x1b\x41PPLICATION_STATUS_DELETING\x10\x03\x12\"\n\x1e\x41PPLICATION_STATUS_NOT_STARTED\x10\x05\x12 \n\x1c\x41PPLICATION_STATUS_UNHEALTHY\x10\x06\x32\xb3\x01\n\x12RayServeAPIService\x12[\n\x10ListApplications\x12\".ray.serve.ListApplicationsRequest\x1a#.ray.serve.ListApplicationsResponse\x12@\n\x07Healthz\x12\x19.ray.serve.HealthzRequest\x1a\x1a.ray.serve.HealthzResponse2\xc3\x02\n\x12UserDefinedService\x12I\n\x08__call__\x12\x1d.ray.serve.UserDefinedMessage\x1a\x1e.ray.serve.UserDefinedResponse\x12H\n\x07Method1\x12\x1d.ray.serve.UserDefinedMessage\x1a\x1e.ray.serve.UserDefinedResponse\x12J\n\x07Method2\x12\x1e.ray.serve.UserDefinedMessage2\x1a\x1f.ray.serve.UserDefinedResponse2\x12L\n\tStreaming\x12\x1d.ray.serve.UserDefinedMessage\x1a\x1e.ray.serve.UserDefinedResponse0\x01\x32L\n\x0c\x46ruitService\x12<\n\nFruitStand\x12\x17.ray.serve.FruitAmounts\x1a\x15.ray.serve.FruitCosts2S\n\x18RayServeBenchmarkService\x12\x37\n\tgrpc_call\x12\x12.ray.serve.RawData\x1a\x16.ray.serve.ModelOutputB*\n\x16io.ray.serve.generatedB\x0bServeProtosP\x01\xf8\x01\x01\x62\x06proto3') _DEPLOYMENTLANGUAGE = DESCRIPTOR.enum_types_by_name['DeploymentLanguage'] DeploymentLanguage = enum_type_wrapper.EnumTypeWrapper(_DEPLOYMENTLANGUAGE) @@ -60,9 +60,6 @@ _DEPLOYMENTSTATUSINFOLIST = DESCRIPTOR.message_types_by_name['DeploymentStatusInfoList'] _APPLICATIONSTATUSINFO = DESCRIPTOR.message_types_by_name['ApplicationStatusInfo'] _STATUSOVERVIEW = DESCRIPTOR.message_types_by_name['StatusOverview'] -_PREDICTREQUEST = DESCRIPTOR.message_types_by_name['PredictRequest'] -_PREDICTREQUEST_INPUTENTRY = _PREDICTREQUEST.nested_types_by_name['InputEntry'] -_PREDICTRESPONSE = DESCRIPTOR.message_types_by_name['PredictResponse'] _LISTAPPLICATIONSREQUEST = DESCRIPTOR.message_types_by_name['ListApplicationsRequest'] _LISTAPPLICATIONSRESPONSE = DESCRIPTOR.message_types_by_name['ListApplicationsResponse'] _HEALTHZREQUEST = DESCRIPTOR.message_types_by_name['HealthzRequest'] @@ -248,28 +245,6 @@ }) _sym_db.RegisterMessage(StatusOverview) -PredictRequest = _reflection.GeneratedProtocolMessageType('PredictRequest', (_message.Message,), { - - 'InputEntry' : _reflection.GeneratedProtocolMessageType('InputEntry', (_message.Message,), { - 'DESCRIPTOR' : _PREDICTREQUEST_INPUTENTRY, - '__module__' : 'ray.serve.generated.serve_pb2' - # @@protoc_insertion_point(class_scope:ray.serve.PredictRequest.InputEntry) - }) - , - 'DESCRIPTOR' : _PREDICTREQUEST, - '__module__' : 'ray.serve.generated.serve_pb2' - # @@protoc_insertion_point(class_scope:ray.serve.PredictRequest) - }) -_sym_db.RegisterMessage(PredictRequest) -_sym_db.RegisterMessage(PredictRequest.InputEntry) - -PredictResponse = _reflection.GeneratedProtocolMessageType('PredictResponse', (_message.Message,), { - 'DESCRIPTOR' : _PREDICTRESPONSE, - '__module__' : 'ray.serve.generated.serve_pb2' - # @@protoc_insertion_point(class_scope:ray.serve.PredictResponse) - }) -_sym_db.RegisterMessage(PredictResponse) - ListApplicationsRequest = _reflection.GeneratedProtocolMessageType('ListApplicationsRequest', (_message.Message,), { 'DESCRIPTOR' : _LISTAPPLICATIONSREQUEST, '__module__' : 'ray.serve.generated.serve_pb2' @@ -354,7 +329,6 @@ }) _sym_db.RegisterMessage(ModelOutput) -_PREDICTAPISSERVICE = DESCRIPTOR.services_by_name['PredictAPIsService'] _RAYSERVEAPISERVICE = DESCRIPTOR.services_by_name['RayServeAPIService'] _USERDEFINEDSERVICE = DESCRIPTOR.services_by_name['UserDefinedService'] _FRUITSERVICE = DESCRIPTOR.services_by_name['FruitService'] @@ -373,14 +347,12 @@ _ENDPOINTINFO_CONFIGENTRY._serialized_options = b'8\001' _ENDPOINTSET_ENDPOINTSENTRY._options = None _ENDPOINTSET_ENDPOINTSENTRY._serialized_options = b'8\001' - _PREDICTREQUEST_INPUTENTRY._options = None - _PREDICTREQUEST_INPUTENTRY._serialized_options = b'8\001' - _DEPLOYMENTLANGUAGE._serialized_start=3800 - _DEPLOYMENTLANGUAGE._serialized_end=3842 - _DEPLOYMENTSTATUS._serialized_start=3844 - _DEPLOYMENTSTATUS._serialized_end=3958 - _APPLICATIONSTATUS._serialized_start=3961 - _APPLICATIONSTATUS._serialized_end=4187 + _DEPLOYMENTLANGUAGE._serialized_start=3644 + _DEPLOYMENTLANGUAGE._serialized_end=3686 + _DEPLOYMENTSTATUS._serialized_start=3688 + _DEPLOYMENTSTATUS._serialized_end=3802 + _APPLICATIONSTATUS._serialized_start=3805 + _APPLICATIONSTATUS._serialized_end=4031 _AUTOSCALINGCONFIG._serialized_start=44 _AUTOSCALINGCONFIG._serialized_end=482 _DEPLOYMENTCONFIG._serialized_start=485 @@ -429,44 +401,36 @@ _APPLICATIONSTATUSINFO._serialized_end=3021 _STATUSOVERVIEW._serialized_start=3024 _STATUSOVERVIEW._serialized_end=3174 - _PREDICTREQUEST._serialized_start=3176 - _PREDICTREQUEST._serialized_end=3291 - _PREDICTREQUEST_INPUTENTRY._serialized_start=3247 - _PREDICTREQUEST_INPUTENTRY._serialized_end=3291 - _PREDICTRESPONSE._serialized_start=3293 - _PREDICTRESPONSE._serialized_end=3330 - _LISTAPPLICATIONSREQUEST._serialized_start=3332 - _LISTAPPLICATIONSREQUEST._serialized_end=3357 - _LISTAPPLICATIONSRESPONSE._serialized_start=3359 - _LISTAPPLICATIONSRESPONSE._serialized_end=3412 - _HEALTHZREQUEST._serialized_start=3414 - _HEALTHZREQUEST._serialized_end=3430 - _HEALTHZRESPONSE._serialized_start=3432 - _HEALTHZRESPONSE._serialized_end=3466 - _USERDEFINEDMESSAGE._serialized_start=3468 - _USERDEFINEDMESSAGE._serialized_end=3528 - _USERDEFINEDRESPONSE._serialized_start=3530 - _USERDEFINEDRESPONSE._serialized_end=3585 - _USERDEFINEDMESSAGE2._serialized_start=3587 - _USERDEFINEDMESSAGE2._serialized_end=3608 - _USERDEFINEDRESPONSE2._serialized_start=3610 - _USERDEFINEDRESPONSE2._serialized_end=3650 - _FRUITAMOUNTS._serialized_start=3652 - _FRUITAMOUNTS._serialized_end=3713 - _FRUITCOSTS._serialized_start=3715 - _FRUITCOSTS._serialized_end=3742 - _RAWDATA._serialized_start=3744 - _RAWDATA._serialized_end=3767 - _MODELOUTPUT._serialized_start=3769 - _MODELOUTPUT._serialized_end=3798 - _PREDICTAPISSERVICE._serialized_start=4189 - _PREDICTAPISSERVICE._serialized_end=4275 - _RAYSERVEAPISERVICE._serialized_start=4278 - _RAYSERVEAPISERVICE._serialized_end=4457 - _USERDEFINEDSERVICE._serialized_start=4460 - _USERDEFINEDSERVICE._serialized_end=4783 - _FRUITSERVICE._serialized_start=4785 - _FRUITSERVICE._serialized_end=4861 - _RAYSERVEBENCHMARKSERVICE._serialized_start=4863 - _RAYSERVEBENCHMARKSERVICE._serialized_end=4946 + _LISTAPPLICATIONSREQUEST._serialized_start=3176 + _LISTAPPLICATIONSREQUEST._serialized_end=3201 + _LISTAPPLICATIONSRESPONSE._serialized_start=3203 + _LISTAPPLICATIONSRESPONSE._serialized_end=3256 + _HEALTHZREQUEST._serialized_start=3258 + _HEALTHZREQUEST._serialized_end=3274 + _HEALTHZRESPONSE._serialized_start=3276 + _HEALTHZRESPONSE._serialized_end=3310 + _USERDEFINEDMESSAGE._serialized_start=3312 + _USERDEFINEDMESSAGE._serialized_end=3372 + _USERDEFINEDRESPONSE._serialized_start=3374 + _USERDEFINEDRESPONSE._serialized_end=3429 + _USERDEFINEDMESSAGE2._serialized_start=3431 + _USERDEFINEDMESSAGE2._serialized_end=3452 + _USERDEFINEDRESPONSE2._serialized_start=3454 + _USERDEFINEDRESPONSE2._serialized_end=3494 + _FRUITAMOUNTS._serialized_start=3496 + _FRUITAMOUNTS._serialized_end=3557 + _FRUITCOSTS._serialized_start=3559 + _FRUITCOSTS._serialized_end=3586 + _RAWDATA._serialized_start=3588 + _RAWDATA._serialized_end=3611 + _MODELOUTPUT._serialized_start=3613 + _MODELOUTPUT._serialized_end=3642 + _RAYSERVEAPISERVICE._serialized_start=4034 + _RAYSERVEAPISERVICE._serialized_end=4213 + _USERDEFINEDSERVICE._serialized_start=4216 + _USERDEFINEDSERVICE._serialized_end=4539 + _FRUITSERVICE._serialized_start=4541 + _FRUITSERVICE._serialized_end=4617 + _RAYSERVEBENCHMARKSERVICE._serialized_start=4619 + _RAYSERVEBENCHMARKSERVICE._serialized_end=4702 # @@protoc_insertion_point(module_scope) diff --git a/python/ray/serve/generated/serve_pb2_grpc.py b/python/ray/serve/generated/serve_pb2_grpc.py index 99746815165b0..284a6e509ad80 100644 --- a/python/ray/serve/generated/serve_pb2_grpc.py +++ b/python/ray/serve/generated/serve_pb2_grpc.py @@ -5,67 +5,6 @@ from . import serve_pb2 as src_dot_ray_dot_protobuf_dot_serve__pb2 -class PredictAPIsServiceStub(object): - """Missing associated documentation comment in .proto file.""" - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.Predict = channel.unary_unary( - '/ray.serve.PredictAPIsService/Predict', - request_serializer=src_dot_ray_dot_protobuf_dot_serve__pb2.PredictRequest.SerializeToString, - response_deserializer=src_dot_ray_dot_protobuf_dot_serve__pb2.PredictResponse.FromString, - ) - - -class PredictAPIsServiceServicer(object): - """Missing associated documentation comment in .proto file.""" - - def Predict(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - -def add_PredictAPIsServiceServicer_to_server(servicer, server): - rpc_method_handlers = { - 'Predict': grpc.unary_unary_rpc_method_handler( - servicer.Predict, - request_deserializer=src_dot_ray_dot_protobuf_dot_serve__pb2.PredictRequest.FromString, - response_serializer=src_dot_ray_dot_protobuf_dot_serve__pb2.PredictResponse.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'ray.serve.PredictAPIsService', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) - - - # This class is part of an EXPERIMENTAL API. -class PredictAPIsService(object): - """Missing associated documentation comment in .proto file.""" - - @staticmethod - def Predict(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/ray.serve.PredictAPIsService/Predict', - src_dot_ray_dot_protobuf_dot_serve__pb2.PredictRequest.SerializeToString, - src_dot_ray_dot_protobuf_dot_serve__pb2.PredictResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - class RayServeAPIServiceStub(object): """Missing associated documentation comment in .proto file.""" diff --git a/python/ray/serve/schema.py b/python/ray/serve/schema.py index 56a8adf107f63..e4af9b3b40f2a 100644 --- a/python/ray/serve/schema.py +++ b/python/ray/serve/schema.py @@ -260,12 +260,6 @@ class DeploymentSchema(BaseModel, allow_population_by_field_name=True): ), ) - is_driver_deployment: bool = Field( - default=DEFAULT.VALUE, - description="Indicate Whether the deployment is driver deployment " - "Driver deployments are spawned one per node.", - ) - @root_validator def num_replicas_and_autoscaling_config_mutually_exclusive(cls, values): if values.get("num_replicas", None) not in [DEFAULT.VALUE, None] and values.get( @@ -312,7 +306,6 @@ def _deployment_info_to_schema(name: str, info: DeploymentInfo) -> DeploymentSch health_check_period_s=info.deployment_config.health_check_period_s, health_check_timeout_s=info.deployment_config.health_check_timeout_s, ray_actor_options=info.replica_config.ray_actor_options, - is_driver_deployment=info.is_driver_deployment, ) if info.deployment_config.autoscaling_config is not None: diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index c3d1da571807e..bf0c05c200a25 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -673,19 +673,6 @@ def f(): assert requests.get(f"http://localhost:8000{ingress_route}").text == "hello" -def test_invalid_driver_deployment_class(): - """Test invalid driver deployment class""" - - @serve.deployment(is_driver_deployment=True) - def f(): - pass - - with pytest.raises(ValueError): - f.options(num_replicas=2) - with pytest.raises(ValueError): - f.options(autoscaling_config={"min_replicas": "1"}) - - class TestAppBuilder: @serve.deployment class A: diff --git a/python/ray/serve/tests/test_application_state.py b/python/ray/serve/tests/test_application_state.py index 95ee4fc23e7ba..802651348bbd8 100644 --- a/python/ray/serve/tests/test_application_state.py +++ b/python/ray/serve/tests/test_application_state.py @@ -154,7 +154,6 @@ def deployment_params(name: str, route_prefix: str = None, docs_path: str = None "route_prefix": route_prefix, "docs_path": docs_path, "ingress": False, - "is_driver_deployment": False, } @@ -846,24 +845,6 @@ def test_override_route_prefix_3(self, info): assert updated_info.route_prefix == "/bob" assert updated_info.version == "123" - def test_override_is_driver_deployment(self, info): - config = ServeApplicationSchema( - name="default", - import_path="test.import.path", - deployments=[ - DeploymentSchema( - name="A", - is_driver_deployment=True, - ) - ], - ) - - updated_infos = override_deployment_info("default", {"A": info}, config) - updated_info = updated_infos["A"] - assert updated_info.route_prefix == "/" - assert updated_info.version == "123" - assert updated_info.is_driver_deployment - def test_override_ray_actor_options_1(self, info): """Test runtime env specified in config at deployment level.""" config = ServeApplicationSchema( diff --git a/python/ray/serve/tests/test_deployment_scheduler.py b/python/ray/serve/tests/test_deployment_scheduler.py index 304f7b70b059d..130fd78d286b3 100644 --- a/python/ray/serve/tests/test_deployment_scheduler.py +++ b/python/ray/serve/tests/test_deployment_scheduler.py @@ -9,7 +9,6 @@ from ray.serve._private.deployment_scheduler import ( DefaultDeploymentScheduler, DeploymentDownscaleRequest, - DriverDeploymentSchedulingPolicy, ReplicaSchedulingRequest, SpreadDeploymentSchedulingPolicy, ) @@ -325,112 +324,5 @@ def test_spread_deployment_scheduling_policy_downscale_head_node(ray_start_clust scheduler.on_deployment_deleted(dep_id) -def test_driver_deployment_scheduling_policy_upscale(ray_start_cluster): - """Test to make sure there is only one replica on each node - for the driver deployment. - """ - cluster = ray_start_cluster - cluster.add_node(num_cpus=3) - cluster.add_node(num_cpus=3) - cluster.wait_for_nodes() - ray.init(address=cluster.address) - - cluster_node_info_cache = create_cluster_node_info_cache( - GcsClient(address=ray.get_runtime_context().gcs_address) - ) - cluster_node_info_cache.update() - - scheduler = DefaultDeploymentScheduler(cluster_node_info_cache) - dep_id = DeploymentID("deployment1", "my_app") - scheduler.on_deployment_created(dep_id, DriverDeploymentSchedulingPolicy()) - - replica_actor_handles = [] - - def on_scheduled(actor_handle, placement_group): - replica_actor_handles.append(actor_handle) - - deployment_to_replicas_to_stop = scheduler.schedule( - upscales={ - dep_id: [ - ReplicaSchedulingRequest( - deployment_id=dep_id, - replica_name="replica1", - actor_def=Replica, - actor_resources={"CPU": 1}, - actor_options={}, - actor_init_args=(), - on_scheduled=on_scheduled, - ), - ReplicaSchedulingRequest( - deployment_id=dep_id, - replica_name="replica2", - actor_def=Replica, - actor_resources={"CPU": 1}, - actor_options={}, - actor_init_args=(), - on_scheduled=on_scheduled, - ), - ReplicaSchedulingRequest( - deployment_id=dep_id, - replica_name="replica3", - actor_def=Replica, - actor_resources={"CPU": 1}, - actor_options={}, - actor_init_args=(), - on_scheduled=on_scheduled, - ), - ] - }, - downscales={}, - ) - assert not deployment_to_replicas_to_stop - # 2 out of 3 replicas are scheduled since there are only two nodes in the cluster. - assert len(replica_actor_handles) == 2 - assert len(scheduler._pending_replicas[dep_id]) == 1 - assert len(scheduler._launching_replicas[dep_id]) == 2 - assert ( - len( - { - ray.get(replica_actor_handles[0].get_node_id.remote()), - ray.get(replica_actor_handles[1].get_node_id.remote()), - } - ) - == 2 - ) - - scheduler.on_replica_recovering(dep_id, "replica4") - cluster.add_node(num_cpus=3) - cluster.wait_for_nodes() - cluster_node_info_cache.update() - - deployment_to_replicas_to_stop = scheduler.schedule(upscales={}, downscales={}) - assert not deployment_to_replicas_to_stop - # No schduling while some replica is recovering - assert len(replica_actor_handles) == 2 - - scheduler.on_replica_stopping(dep_id, "replica4") - # The last replica is scheduled - deployment_to_replicas_to_stop = scheduler.schedule(upscales={}, downscales={}) - assert not deployment_to_replicas_to_stop - assert not scheduler._pending_replicas[dep_id] - assert len(scheduler._launching_replicas[dep_id]) == 3 - assert len(replica_actor_handles) == 3 - assert ( - len( - { - ray.get(replica_actor_handles[0].get_node_id.remote()), - ray.get(replica_actor_handles[1].get_node_id.remote()), - ray.get(replica_actor_handles[2].get_node_id.remote()), - } - ) - == 3 - ) - - scheduler.on_replica_stopping(dep_id, "replica1") - scheduler.on_replica_stopping(dep_id, "replica2") - scheduler.on_replica_stopping(dep_id, "replica3") - scheduler.on_deployment_deleted(dep_id) - - if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/tests/test_deployment_state.py b/python/ray/serve/tests/test_deployment_state.py index c6cffada48b58..671e9390b8654 100644 --- a/python/ray/serve/tests/test_deployment_state.py +++ b/python/ray/serve/tests/test_deployment_state.py @@ -5,7 +5,6 @@ import pytest -import ray from ray.serve._private.common import ( DeploymentID, DeploymentInfo, @@ -29,7 +28,6 @@ DeploymentState, DeploymentStateManager, DeploymentVersion, - DriverDeploymentState, ReplicaStartupStatus, ReplicaStateContainer, VersionedReplica, @@ -296,7 +294,6 @@ def deployment_info( version: Optional[str] = None, num_replicas: Optional[int] = 1, user_config: Optional[Any] = None, - is_driver_deployment: bool = False, **config_opts, ) -> Tuple[DeploymentInfo, DeploymentVersion]: info = DeploymentInfo( @@ -307,7 +304,6 @@ def deployment_info( ), replica_config=ReplicaConfig.create(lambda x: x), deployer_job_id="", - is_driver_deployment=is_driver_deployment, ) if version is not None: @@ -359,30 +355,17 @@ def mock_save_checkpoint_fn(*args, **kwargs): cluster_node_info_cache = MockClusterNodeInfoCache() - # It is a driver deployment test - if request.param is True: - deployment_state = DriverDeploymentState( - DeploymentID("name", "my_app"), - "name", - True, - mock_long_poll, - MockDeploymentScheduler(cluster_node_info_cache), - cluster_node_info_cache, - mock_save_checkpoint_fn, - ) - yield deployment_state, timer, cluster_node_info_cache - else: - deployment_state = DeploymentState( - DeploymentID("name", "my_app"), - "name", - True, - mock_long_poll, - MockDeploymentScheduler(cluster_node_info_cache), - cluster_node_info_cache, - mock_save_checkpoint_fn, - ) + deployment_state = DeploymentState( + DeploymentID("name", "my_app"), + "name", + True, + mock_long_poll, + MockDeploymentScheduler(cluster_node_info_cache), + cluster_node_info_cache, + mock_save_checkpoint_fn, + ) - yield deployment_state, timer, cluster_node_info_cache + yield deployment_state, timer, cluster_node_info_cache def replica(version: Optional[DeploymentVersion] = None) -> VersionedReplica: @@ -2261,7 +2244,7 @@ def mock_deployment_state_manager_full( cluster_node_info_cache = MockClusterNodeInfoCache() def create_deployment_state_manager( - actor_names=None, placement_group_names=None, is_driver_deployment=False + actor_names=None, placement_group_names=None ): if actor_names is None: actor_names = [] @@ -2286,10 +2269,7 @@ def create_deployment_state_manager( yield create_deployment_state_manager, timer, cluster_node_info_cache -@pytest.mark.parametrize("is_driver_deployment", [True, False]) -def test_recover_state_from_replica_names( - mock_deployment_state_manager_full, is_driver_deployment -): +def test_recover_state_from_replica_names(mock_deployment_state_manager_full): """Test recover deployment state.""" deployment_id = DeploymentID("test_deployment", "test_app") ( @@ -2301,9 +2281,7 @@ def test_recover_state_from_replica_names( cluster_node_info_cache.alive_node_ids = {"node-id"} # Deploy deployment with version "1" and one replica - info1, version1 = deployment_info( - version="1", is_driver_deployment=is_driver_deployment - ) + info1, version1 = deployment_info(version="1") updating = deployment_state_manager.deploy(deployment_id, info1) deployment_state = deployment_state_manager._deployment_states[deployment_id] assert updating @@ -2363,10 +2341,7 @@ def test_recover_state_from_replica_names( assert mocked_replica.replica_tag == new_mocked_replica.replica_tag -@pytest.mark.parametrize("is_driver_deployment", [True, False]) -def test_recover_during_rolling_update( - mock_deployment_state_manager_full, is_driver_deployment -): +def test_recover_during_rolling_update(mock_deployment_state_manager_full): """Test controller crashes before a replica is updated to new version. During recovery, the controller should wait for the version to be fetched from @@ -2384,9 +2359,7 @@ def test_recover_during_rolling_update( cluster_node_info_cache.alive_node_ids = {"node-id"} # Step 1: Create some deployment info with actors in running state - info1, version1 = deployment_info( - version="1", is_driver_deployment=is_driver_deployment - ) + info1, version1 = deployment_info(version="1") updating = deployment_state_manager.deploy(deployment_id, info1) deployment_state = deployment_state_manager._deployment_states[deployment_id] assert updating @@ -2412,9 +2385,7 @@ def test_recover_during_rolling_update( ) # Now execute a rollout: upgrade the version to "2". - info2, version2 = deployment_info( - version="2", is_driver_deployment=is_driver_deployment - ) + info2, version2 = deployment_info(version="2") updating = deployment_state_manager.deploy(deployment_id, info2) assert updating @@ -2437,20 +2408,15 @@ def test_recover_during_rolling_update( version=version2, by_state=[(ReplicaState.RECOVERING, 1)], ) - # Replica should remain recovering and remain labeled as version "2" - # before it recovers the real version from the actor - if not is_driver_deployment: - # NOTE(zcin): for is_driver_deployment=True, since the node id is not available - # when recovering, if we perform an update before marking the replica as ready, - # it will think it needs to bring up a new replica on the node(s) in the cluster - for _ in range(3): - new_deployment_state_manager.update() - check_counts( - new_deployment_state, - total=1, - version=version2, - by_state=[(ReplicaState.RECOVERING, 1)], - ) + + for _ in range(3): + new_deployment_state_manager.update() + check_counts( + new_deployment_state, + total=1, + version=version2, + by_state=[(ReplicaState.RECOVERING, 1)], + ) # Get the new mocked replica. Note that this represents a newly # instantiated class keeping track of the state of the replica, @@ -2494,7 +2460,6 @@ def test_recover_during_rolling_update( @pytest.fixture def mock_deployment_state_manager(request) -> Tuple[DeploymentStateManager, Mock, Mock]: - ray.init() timer = MockTimer() with patch( "ray.serve._private.deployment_state.ActorReplicaWrapper", @@ -2524,11 +2489,9 @@ def mock_deployment_state_manager(request) -> Tuple[DeploymentStateManager, Mock ) yield deployment_state_manager, timer, cluster_node_info_cache - ray.shutdown() -@pytest.mark.parametrize("is_driver_deployment", [False, True]) -def test_shutdown(mock_deployment_state_manager, is_driver_deployment): +def test_shutdown(mock_deployment_state_manager): """ Test that shutdown waits for all deployments to be deleted and they are force-killed without a grace period. @@ -2545,7 +2508,6 @@ def test_shutdown(mock_deployment_state_manager, is_driver_deployment): grace_period_s = 10 b_info_1, _ = deployment_info( graceful_shutdown_timeout_s=grace_period_s, - is_driver_deployment=is_driver_deployment, ) updating = deployment_state_manager.deploy(deployment_id, b_info_1) assert updating @@ -2603,112 +2565,6 @@ class FakeActor: replica.resource_requirements() -@pytest.mark.parametrize("mock_deployment_state", [True], indirect=True) -def test_cancel_extra_replicas_for_driver_deployment(mock_deployment_state): - """Test to make sure the driver deployment state - can cancel extra starting replicas. - """ - - deployment_state, timer, cluster_node_info_cache = mock_deployment_state - cluster_node_info_cache.alive_node_ids = {"0", "1"} - - b_info_1, b_version_1 = deployment_info() - updating = deployment_state.deploy(b_info_1) - assert updating - assert deployment_state.curr_status_info.status == DeploymentStatus.UPDATING - - deployment_state_update_result = deployment_state.update() - # 1 node dies, now the cluster only has 1 node - cluster_node_info_cache.alive_node_ids = {"0"} - deployment_state._deployment_scheduler.schedule( - {deployment_state._id: deployment_state_update_result.upscale}, {} - ) - check_counts(deployment_state, total=2, by_state=[(ReplicaState.STARTING, 2)]) - # only 1 replica is scheduled successfully, the other is PENDING_ALLOCATION - deployment_state._replicas.get(states=[ReplicaState.STARTING])[0]._actor.set_ready() - # the other replica should be cancelled - deployment_state.update() - check_counts( - deployment_state, - total=2, - by_state=[(ReplicaState.RUNNING, 1), (ReplicaState.STOPPING, 1)], - ) - - -@pytest.mark.parametrize("mock_deployment_state", [True], indirect=True) -def test_add_and_remove_nodes_for_driver_deployment(mock_deployment_state): - deployment_state, timer, cluster_node_info_cache = mock_deployment_state - cluster_node_info_cache.alive_node_ids = {"0"} - - b_info_1, b_version_1 = deployment_info() - updating = deployment_state.deploy(b_info_1) - assert updating - assert deployment_state.curr_status_info.status == DeploymentStatus.UPDATING - - deployment_state_update_result = deployment_state.update() - deployment_state._deployment_scheduler.schedule( - {deployment_state._id: deployment_state_update_result.upscale}, {} - ) - check_counts(deployment_state, total=1, by_state=[(ReplicaState.STARTING, 1)]) - - # Add a node when previous one is in STARTING state - cluster_node_info_cache.alive_node_ids = {"0", "1"} - deployment_state_update_result = deployment_state.update() - deployment_state._deployment_scheduler.schedule( - {deployment_state._id: deployment_state_update_result.upscale}, {} - ) - check_counts(deployment_state, total=2, by_state=[(ReplicaState.STARTING, 2)]) - for replica in deployment_state._replicas.get(states=[ReplicaState.STARTING]): - replica._actor.set_ready() - deployment_state.update() - check_counts(deployment_state, total=2, by_state=[(ReplicaState.RUNNING, 2)]) - - # Add another two nodes - cluster_node_info_cache.alive_node_ids = {"0", "1", "2", "3"} - deployment_state_update_result = deployment_state.update() - deployment_state._deployment_scheduler.schedule( - {deployment_state._id: deployment_state_update_result.upscale}, {} - ) - check_counts( - deployment_state, - total=4, - by_state=[(ReplicaState.RUNNING, 2), (ReplicaState.STARTING, 2)], - ) - for replica in deployment_state._replicas.get(states=[ReplicaState.STARTING]): - replica._actor.set_ready() - deployment_state.update() - check_counts(deployment_state, total=4, by_state=[(ReplicaState.RUNNING, 4)]) - - # Remove one node and add another node, node3 is removed, node4 added - cluster_node_info_cache.alive_node_ids = {"0", "1", "2", "4"} - deployment_state._replicas.get(states=[ReplicaState.RUNNING])[ - 3 - ]._actor.set_unhealthy() - deployment_state_update_result = deployment_state.update() - deployment_state._deployment_scheduler.schedule( - {deployment_state._id: deployment_state_update_result.upscale}, {} - ) - check_counts( - deployment_state, - total=4, - by_state=[ - (ReplicaState.RUNNING, 3), - (ReplicaState.STOPPING, 1), - ], - ) - - # Mark stopped replica finish stopping step. - for replica in deployment_state._replicas.get(states=[ReplicaState.STOPPING]): - replica._actor.set_done_stopping() - deployment_state.update() - - # Make starting replica finish starting step. - for replica in deployment_state._replicas.get(states=[ReplicaState.STARTING]): - replica._actor.set_ready() - deployment_state.update() - check_counts(deployment_state, total=4, by_state=[(ReplicaState.RUNNING, 4)]) - - class TestActorReplicaWrapper: def test_default_value(self): actor_replica = ActorReplicaWrapper( diff --git a/python/ray/serve/tests/test_grpc.py b/python/ray/serve/tests/test_grpc.py index 2d5955cf4f6b3..60524814cbf4c 100644 --- a/python/ray/serve/tests/test_grpc.py +++ b/python/ray/serve/tests/test_grpc.py @@ -1,6 +1,5 @@ import os import sys -from unittest.mock import patch import grpc @@ -9,19 +8,11 @@ import ray from ray import serve -from ray._private.test_utils import ( - SignalActor, - run_string_as_driver, - setup_tls, - teardown_tls, - wait_for_condition, -) +from ray._private.test_utils import SignalActor, wait_for_condition from ray.cluster_utils import Cluster from ray.serve._private.common import DeploymentID -from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME, SERVE_NAMESPACE +from ray.serve._private.constants import SERVE_NAMESPACE from ray.serve.config import gRPCOptions -from ray.serve.drivers import DefaultgRPCDriver, gRPCIngress -from ray.serve.exceptions import RayServeException from ray.serve.generated import serve_pb2, serve_pb2_grpc from ray.serve.tests.test_config_files.grpc_deployment import g, g2 from ray.serve.tests.utils import ( @@ -35,14 +26,6 @@ ) -@pytest.fixture -def serve_start_shutdown(): - ray.init() - yield - serve.shutdown() - ray.shutdown() - - @pytest.fixture def ray_cluster(): cluster = Cluster() @@ -52,179 +35,6 @@ def ray_cluster(): cluster.shutdown() -@pytest.fixture -def use_tls(request): - if request.param: - key_filepath, cert_filepath, temp_dir = setup_tls() - yield request.param - if request.param: - teardown_tls(key_filepath, cert_filepath, temp_dir) - - -def tls_enabled(): - return os.environ.get("RAY_USE_TLS", "0").lower() in ("1", "true") - - -@pytest.mark.skipif( - sys.platform == "darwin", - reason=("Cryptography (TLS dependency) doesn't install in Mac build pipeline"), -) -@pytest.mark.parametrize("use_tls", [True], indirect=True) -def test_deploy_basic(use_tls): - if use_tls: - run_string_as_driver( - """ -# coding: utf-8 -import os -from ray.serve.drivers import DefaultgRPCDriver, gRPCIngress -import ray -from ray import serve -from ray.serve.generated import serve_pb2, serve_pb2_grpc -import grpc -from ray.serve.exceptions import RayServeException -from ray._private.tls_utils import load_certs_from_env -import logging -import asyncio -try: - ray.init() - @serve.deployment - class D1: - def __call__(self, input): - return input["a"] - - serve.run(DefaultgRPCDriver.bind(D1.bind())) - - async def send_request(): - server_cert_chain, private_key, ca_cert = load_certs_from_env() - credentials = grpc.ssl_channel_credentials( - certificate_chain=server_cert_chain, - private_key=private_key, - root_certificates=ca_cert, - ) - - async with grpc.aio.secure_channel("localhost:9000", credentials) as channel: - stub = serve_pb2_grpc.PredictAPIsServiceStub(channel) - response = await stub.Predict( - serve_pb2.PredictRequest(input={"a": bytes("123", "utf-8")}) - ) - return response - - resp = asyncio.run(send_request()) - assert resp.prediction == b"123" -finally: - serve.shutdown() - ray.shutdown() - """, - env=os.environ.copy(), - ) - else: - run_string_as_driver( - """ -# coding: utf-8 -import os -from ray.serve.drivers import DefaultgRPCDriver, gRPCIngress -import ray -from ray import serve -from ray.serve.generated import serve_pb2, serve_pb2_grpc -import grpc -from ray.serve.exceptions import RayServeException -from ray._private.tls_utils import load_certs_from_env -import logging -import asyncio -try: - ray.init() - @serve.deployment - class D1: - def __call__(self, input): - return input["a"] - - serve.run(DefaultgRPCDriver.bind(D1.bind())) - - async def send_request(): - async with grpc.aio.insecure_channel("localhost:9000") as channel: - stub = serve_pb2_grpc.PredictAPIsServiceStub(channel) - response = await stub.Predict( - serve_pb2.PredictRequest(input={"a": bytes("123", "utf-8")}) - ) - return response - - resp = asyncio.run(send_request()) - assert resp.prediction == b"123" -finally: - serve.shutdown() - ray.shutdown() - """, - env=os.environ.copy(), - ) - - -@patch("ray.serve._private.api.FLAG_DISABLE_PROXY", True) -def test_controller_without_http(serve_start_shutdown): - @serve.deployment - class D1: - def __call__(self, input): - return input["a"] - - serve.run(DefaultgRPCDriver.bind(D1.bind())) - assert ray.get(serve.context._global_client._controller.get_proxies.remote()) == {} - - -@patch("ray.serve._private.api.FLAG_DISABLE_PROXY", True) -def test_deploy_grpc_driver_to_node(ray_cluster): - cluster = ray_cluster - cluster.add_node(num_cpus=2) - cluster.connect(namespace=SERVE_NAMESPACE) - - @serve.deployment - class D1: - def __call__(self, input): - return input["a"] - - serve.run(DefaultgRPCDriver.bind(D1.bind())) - replicas = ray.get( - serve.context._global_client._controller._all_running_replicas.remote() - ) - deployment_id = DeploymentID("DefaultgRPCDriver", SERVE_DEFAULT_APP_NAME) - assert len(replicas[deployment_id]) == 1 - - worker_node = cluster.add_node(num_cpus=2) - - wait_for_condition( - lambda: len( - ray.get( - serve.context._global_client._controller._all_running_replicas.remote() - )[deployment_id] - ) - == 2 - ) - - # Kill the worker node. - cluster.remove_node(worker_node) - - wait_for_condition( - lambda: len( - ray.get( - serve.context._global_client._controller._all_running_replicas.remote() - )[deployment_id] - ) - == 1 - ) - - -def test_schemas_attach_grpc_server(): - # Failed with initiate solely - with pytest.raises(RayServeException): - _ = gRPCIngress() - - class MyDriver(gRPCIngress): - def __init__(self): - super().__init__() - - # Failed with no schema gRPC binding function - with pytest.raises(RayServeException): - _ = MyDriver() - - def test_serving_request_through_grpc_proxy(ray_cluster): """Test serving request through gRPC proxy. diff --git a/python/ray/serve/tests/test_proxy_state.py b/python/ray/serve/tests/test_proxy_state.py index 4139861526809..a90db314c008d 100644 --- a/python/ray/serve/tests/test_proxy_state.py +++ b/python/ray/serve/tests/test_proxy_state.py @@ -78,7 +78,6 @@ def setup_controller(): SERVE_CONTROLLER_NAME, http_config=None, detached=True, - _disable_proxy=True, ) controller_actor_id = controller._ray_actor_id.hex() diff --git a/python/ray/serve/tests/test_telemetry.py b/python/ray/serve/tests/test_telemetry.py index 259b33b2ecb39..75dc3c3e3fd1d 100644 --- a/python/ray/serve/tests/test_telemetry.py +++ b/python/ray/serve/tests/test_telemetry.py @@ -1,7 +1,6 @@ import subprocess import sys import time -from typing import Dict import pytest import requests @@ -20,7 +19,7 @@ ) from ray.serve._private.usage import ServeUsageTag from ray.serve.context import _get_global_client -from ray.serve.drivers import DAGDriver, DefaultgRPCDriver +from ray.serve.drivers import DAGDriver from ray.serve.http_adapters import json_request from ray.serve.schema import ServeDeploySchema from ray.serve.tests.utils import ( @@ -99,63 +98,6 @@ async def app2(self): assert ServeUsageTag.REST_API_VERSION.get_value_from_report(report) is None -def test_grpc_detected(manage_ray_with_telemetry): - """ - Check that gRPCIngress is detected by telemetry. - """ - - subprocess.check_output(["ray", "start", "--head"]) - wait_for_condition(check_ray_started, timeout=5) - - storage_handle = start_telemetry_app() - - wait_for_condition( - lambda: ray.get(storage_handle.get_reports_received.remote()) > 0, timeout=5 - ) - - # Check that telemetry related to gRPC ingress app is not set - report = ray.get(storage_handle.get_report.remote()) - assert ServeUsageTag.GRPC_INGRESS_USED.get_value_from_report(report) is None - - @serve.deployment(ray_actor_options={"num_cpus": 0}) - def greeter(inputs: Dict[str, bytes]): - return "Hello!" - - with InputNode() as grpc_input: - greeter_node = greeter.bind(grpc_input) - grpc_app = DefaultgRPCDriver.bind(greeter_node) - - serve.run(grpc_app, name="grpc_app", route_prefix="/grpc") - - wait_for_condition( - lambda: serve.status().applications["grpc_app"].status - == ApplicationStatus.RUNNING, - timeout=15, - ) - - current_num_reports = ray.get(storage_handle.get_reports_received.remote()) - - wait_for_condition( - lambda: ray.get(storage_handle.get_reports_received.remote()) - > current_num_reports, - timeout=5, - ) - report = ray.get(storage_handle.get_report.remote()) - - # Check all telemetry relevant to the Serve apps on this cluster - assert ServeUsageTag.GRPC_INGRESS_USED.get_value_from_report(report) == "1" - assert ServeUsageTag.API_VERSION.get_value_from_report(report) == "v2" - assert int(ServeUsageTag.NUM_APPS.get_value_from_report(report)) == 2 - assert int(ServeUsageTag.NUM_DEPLOYMENTS.get_value_from_report(report)) == 3 - assert int(ServeUsageTag.NUM_GPU_DEPLOYMENTS.get_value_from_report(report)) == 0 - - # Check that Serve telemetry not relevant to the running apps is omitted - assert ServeUsageTag.DAG_DRIVER_USED.get_value_from_report(report) is None - assert ServeUsageTag.HTTP_ADAPTER_USED.get_value_from_report(report) is None - assert ServeUsageTag.FASTAPI_USED.get_value_from_report(report) is None - assert ServeUsageTag.REST_API_VERSION.get_value_from_report(report) is None - - @pytest.mark.parametrize("use_adapter", [True, False]) def test_graph_detected(manage_ray_with_telemetry, use_adapter): """ diff --git a/src/ray/protobuf/serve.proto b/src/ray/protobuf/serve.proto index 85b55b5dab7c3..34c9bfe7fa27f 100644 --- a/src/ray/protobuf/serve.proto +++ b/src/ray/protobuf/serve.proto @@ -237,19 +237,6 @@ message StatusOverview { string name = 3; } -// RPC Schema -message PredictRequest { - map input = 2; -} - -message PredictResponse { - bytes prediction = 1; -} - -service PredictAPIsService { - rpc Predict(PredictRequest) returns (PredictResponse); -} - // Used for gRPC proxy health check message ListApplicationsRequest {}