From cc93df5824ac38f44e9f64723bc435ecbc5fbd16 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Tue, 7 Nov 2023 14:51:09 -0600 Subject: [PATCH] [serve] Add piping to set and fetch `target_capacity` (#41007) Add logic to set & get target_capacity via REST API --------- Signed-off-by: Edward Oakes --- .../serve/tests/test_serve_dashboard.py | 46 +++++++++++++--- python/ray/serve/_private/controller.py | 52 +++++++++---------- 2 files changed, 64 insertions(+), 34 deletions(-) diff --git a/dashboard/modules/serve/tests/test_serve_dashboard.py b/dashboard/modules/serve/tests/test_serve_dashboard.py index 676aae6faa2bd..3431e1dcd8aff 100644 --- a/dashboard/modules/serve/tests/test_serve_dashboard.py +++ b/dashboard/modules/serve/tests/test_serve_dashboard.py @@ -684,30 +684,62 @@ def test_target_capacity_field(ray_start_stop, url: str): raw_json = requests.get(url).json() # `target_capacity` should be present in the response before deploying anything. - assert raw_json["target_capacity"] is None, raw_json + assert raw_json["target_capacity"] is None - config_without_target_capacity = { + config = { "http_options": { "host": "127.0.0.1", "port": 8000, }, "applications": [], } - deploy_config_multi_app(config_without_target_capacity, url) + deploy_config_multi_app(config, url) + # `target_capacity` should be present in the response even if not set. raw_json = requests.get(url).json() + assert raw_json["target_capacity"] is None + details = ServeInstanceDetails(**raw_json) + assert details.target_capacity is None + assert details.http_options.host == "127.0.0.1" + assert details.http_options.port == 8000 + assert details.applications == {} - # `target_capacity` should be present in the response even if not set. - assert raw_json["target_capacity"] is None, raw_json + # Set `target_capacity`, ensure it is returned properly. + config["target_capacity"] = 20 + deploy_config_multi_app(config, url) + raw_json = requests.get(url).json() + assert raw_json["target_capacity"] == 20 + details = ServeInstanceDetails(**raw_json) + assert details.target_capacity == 20 + assert details.http_options.host == "127.0.0.1" + assert details.http_options.port == 8000 + assert details.applications == {} - # Parse the response to ensure it's formatted correctly. + # Update `target_capacity`, ensure it is returned properly. + config["target_capacity"] = 40 + deploy_config_multi_app(config, url) + raw_json = requests.get(url).json() + assert raw_json["target_capacity"] == 40 + details = ServeInstanceDetails(**raw_json) + assert details.target_capacity == 40 + assert details.http_options.host == "127.0.0.1" + assert details.http_options.port == 8000 + assert details.applications == {} + + # Reset `target_capacity` by omitting it, ensure it is returned properly. + del config["target_capacity"] + deploy_config_multi_app(config, url) + raw_json = requests.get(url).json() + assert raw_json["target_capacity"] is None details = ServeInstanceDetails(**raw_json) assert details.target_capacity is None assert details.http_options.host == "127.0.0.1" assert details.http_options.port == 8000 assert details.applications == {} - # TODO(edoakes): add test cases that set and update `target_capacity`. + # Try to set an invalid `target_capacity`, ensure a `400` status is returned. + config["target_capacity"] = 101 + assert requests.put(url, json=config, timeout=30).status_code == 400 if __name__ == "__main__": diff --git a/python/ray/serve/_private/controller.py b/python/ray/serve/_private/controller.py index d7056eb07dc74..d105fd8c985c0 100644 --- a/python/ray/serve/_private/controller.py +++ b/python/ray/serve/_private/controller.py @@ -196,7 +196,10 @@ async def __init__( self._create_control_loop_metrics() run_background_task(self.run_control_loop()) + # The target capacity percentage for all replicas across the cluster. + self._target_capacity: Optional[float] = None self._recover_config_from_checkpoint() + # Nodes where proxy actors should run. self._proxy_nodes = set() self._update_proxy_nodes() @@ -441,11 +444,15 @@ def _recover_config_from_checkpoint(self): logger.info( "Recovering config from checkpoint.", extra={"log_to_stderr": False} ) - deployment_time, config_checkpoints_dict = pickle.loads(checkpoint) - applications = list(config_checkpoints_dict.values()) + deployment_time, target_capacity, config_checkpoints_dict = pickle.loads( + checkpoint + ) self.deploy_config( - ServeDeploySchema.parse_obj({"applications": applications}), - deployment_time, + ServeDeploySchema( + applications=list(config_checkpoints_dict.values()), + target_capacity=target_capacity, + ), + deployment_time=deployment_time, ) def _all_running_replicas(self) -> Dict[DeploymentID, List[RunningReplicaInfo]]: @@ -643,28 +650,15 @@ def deploy_application(self, name: str, deployment_args_list: List[bytes]) -> No def deploy_config( self, - config: Union[ServeApplicationSchema, ServeDeploySchema], - deployment_time: float = 0, + config: ServeDeploySchema, + deployment_time: float = 0.0, ) -> None: - """Kicks off a task that deploys a set of Serve applications. + """Apply the config described in `ServeDeploySchema`. - Cancels in-progress tasks that are deploying Serve applications with the same - name as newly deployed applications. + This is idempotent and will upgrade the applications to the goal state + specified in the config. - Args: - config: - [if ServeApplicationSchema] - name: Application name. If not provided, it is empty string. - import_path: Serve deployment graph's import path - runtime_env: runtime_env to run the deployment graph in - deployments: Dictionaries that contain argument-value options - that can be passed directly into a set_options() call. Overrides - deployment options set in the graph's code itself. - [if ServeDeploySchema] - applications: Dictionaries of the format ServeApplicationSchema. - - deployment_time: set deployment_timestamp. If not provided, time.time() is - used to indicate the deployment time. + If `deployment_time` is not provided, `time.time()` is used. """ ServeUsageTag.API_VERSION.record("v2") if not deployment_time: @@ -692,10 +686,14 @@ def deploy_config( self.kv_store.put( CONFIG_CHECKPOINT_KEY, - pickle.dumps((deployment_time, new_config_checkpoint)), + pickle.dumps( + (deployment_time, config.target_capacity, new_config_checkpoint) + ), ) - # Delete live applications not listed in config + self._target_capacity = config.target_capacity + + # Delete live applications not listed in the config. existing_applications = set( self.application_state_manager._application_states.keys() ) @@ -834,7 +832,7 @@ def get_serve_instance_details(self) -> Dict: http_options = HTTPOptionsSchema.parse_obj(http_config.dict(exclude_unset=True)) grpc_options = gRPCOptionsSchema.parse_obj(grpc_config.dict(exclude_unset=True)) return ServeInstanceDetails( - target_capacity=None, + target_capacity=self._target_capacity, controller_info=self._actor_details, proxy_location=http_config.location, http_options=http_options, @@ -878,7 +876,7 @@ def list_serve_statuses(self) -> List[bytes]: def get_app_config(self, name: str = SERVE_DEFAULT_APP_NAME) -> Optional[Dict]: checkpoint = self.kv_store.get(CONFIG_CHECKPOINT_KEY) if checkpoint is not None: - _, config_checkpoints_dict = pickle.loads(checkpoint) + _, _, config_checkpoints_dict = pickle.loads(checkpoint) if name in config_checkpoints_dict: config = config_checkpoints_dict[name] return ServeApplicationSchema.parse_obj(config).dict(exclude_unset=True)