Skip to content

Commit

Permalink
[serve] Add piping to set and fetch target_capacity (ray-project#41007
Browse files Browse the repository at this point in the history
)

Add logic to set & get target_capacity via REST API

---------

Signed-off-by: Edward Oakes <[email protected]>
  • Loading branch information
edoakes committed Nov 7, 2023
1 parent 6de82c8 commit cc93df5
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 34 deletions.
46 changes: 39 additions & 7 deletions dashboard/modules/serve/tests/test_serve_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,30 +684,62 @@ def test_target_capacity_field(ray_start_stop, url: str):
raw_json = requests.get(url).json()

# `target_capacity` should be present in the response before deploying anything.
assert raw_json["target_capacity"] is None, raw_json
assert raw_json["target_capacity"] is None

config_without_target_capacity = {
config = {
"http_options": {
"host": "127.0.0.1",
"port": 8000,
},
"applications": [],
}
deploy_config_multi_app(config_without_target_capacity, url)
deploy_config_multi_app(config, url)

# `target_capacity` should be present in the response even if not set.
raw_json = requests.get(url).json()
assert raw_json["target_capacity"] is None
details = ServeInstanceDetails(**raw_json)
assert details.target_capacity is None
assert details.http_options.host == "127.0.0.1"
assert details.http_options.port == 8000
assert details.applications == {}

# `target_capacity` should be present in the response even if not set.
assert raw_json["target_capacity"] is None, raw_json
# Set `target_capacity`, ensure it is returned properly.
config["target_capacity"] = 20
deploy_config_multi_app(config, url)
raw_json = requests.get(url).json()
assert raw_json["target_capacity"] == 20
details = ServeInstanceDetails(**raw_json)
assert details.target_capacity == 20
assert details.http_options.host == "127.0.0.1"
assert details.http_options.port == 8000
assert details.applications == {}

# Parse the response to ensure it's formatted correctly.
# Update `target_capacity`, ensure it is returned properly.
config["target_capacity"] = 40
deploy_config_multi_app(config, url)
raw_json = requests.get(url).json()
assert raw_json["target_capacity"] == 40
details = ServeInstanceDetails(**raw_json)
assert details.target_capacity == 40
assert details.http_options.host == "127.0.0.1"
assert details.http_options.port == 8000
assert details.applications == {}

# Reset `target_capacity` by omitting it, ensure it is returned properly.
del config["target_capacity"]
deploy_config_multi_app(config, url)
raw_json = requests.get(url).json()
assert raw_json["target_capacity"] is None
details = ServeInstanceDetails(**raw_json)
assert details.target_capacity is None
assert details.http_options.host == "127.0.0.1"
assert details.http_options.port == 8000
assert details.applications == {}

# TODO(edoakes): add test cases that set and update `target_capacity`.
# Try to set an invalid `target_capacity`, ensure a `400` status is returned.
config["target_capacity"] = 101
assert requests.put(url, json=config, timeout=30).status_code == 400


if __name__ == "__main__":
Expand Down
52 changes: 25 additions & 27 deletions python/ray/serve/_private/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ async def __init__(
self._create_control_loop_metrics()
run_background_task(self.run_control_loop())

# The target capacity percentage for all replicas across the cluster.
self._target_capacity: Optional[float] = None
self._recover_config_from_checkpoint()

# Nodes where proxy actors should run.
self._proxy_nodes = set()
self._update_proxy_nodes()
Expand Down Expand Up @@ -441,11 +444,15 @@ def _recover_config_from_checkpoint(self):
logger.info(
"Recovering config from checkpoint.", extra={"log_to_stderr": False}
)
deployment_time, config_checkpoints_dict = pickle.loads(checkpoint)
applications = list(config_checkpoints_dict.values())
deployment_time, target_capacity, config_checkpoints_dict = pickle.loads(
checkpoint
)
self.deploy_config(
ServeDeploySchema.parse_obj({"applications": applications}),
deployment_time,
ServeDeploySchema(
applications=list(config_checkpoints_dict.values()),
target_capacity=target_capacity,
),
deployment_time=deployment_time,
)

def _all_running_replicas(self) -> Dict[DeploymentID, List[RunningReplicaInfo]]:
Expand Down Expand Up @@ -643,28 +650,15 @@ def deploy_application(self, name: str, deployment_args_list: List[bytes]) -> No

def deploy_config(
self,
config: Union[ServeApplicationSchema, ServeDeploySchema],
deployment_time: float = 0,
config: ServeDeploySchema,
deployment_time: float = 0.0,
) -> None:
"""Kicks off a task that deploys a set of Serve applications.
"""Apply the config described in `ServeDeploySchema`.
Cancels in-progress tasks that are deploying Serve applications with the same
name as newly deployed applications.
This is idempotent and will upgrade the applications to the goal state
specified in the config.
Args:
config:
[if ServeApplicationSchema]
name: Application name. If not provided, it is empty string.
import_path: Serve deployment graph's import path
runtime_env: runtime_env to run the deployment graph in
deployments: Dictionaries that contain argument-value options
that can be passed directly into a set_options() call. Overrides
deployment options set in the graph's code itself.
[if ServeDeploySchema]
applications: Dictionaries of the format ServeApplicationSchema.
deployment_time: set deployment_timestamp. If not provided, time.time() is
used to indicate the deployment time.
If `deployment_time` is not provided, `time.time()` is used.
"""
ServeUsageTag.API_VERSION.record("v2")
if not deployment_time:
Expand Down Expand Up @@ -692,10 +686,14 @@ def deploy_config(

self.kv_store.put(
CONFIG_CHECKPOINT_KEY,
pickle.dumps((deployment_time, new_config_checkpoint)),
pickle.dumps(
(deployment_time, config.target_capacity, new_config_checkpoint)
),
)

# Delete live applications not listed in config
self._target_capacity = config.target_capacity

# Delete live applications not listed in the config.
existing_applications = set(
self.application_state_manager._application_states.keys()
)
Expand Down Expand Up @@ -834,7 +832,7 @@ def get_serve_instance_details(self) -> Dict:
http_options = HTTPOptionsSchema.parse_obj(http_config.dict(exclude_unset=True))
grpc_options = gRPCOptionsSchema.parse_obj(grpc_config.dict(exclude_unset=True))
return ServeInstanceDetails(
target_capacity=None,
target_capacity=self._target_capacity,
controller_info=self._actor_details,
proxy_location=http_config.location,
http_options=http_options,
Expand Down Expand Up @@ -878,7 +876,7 @@ def list_serve_statuses(self) -> List[bytes]:
def get_app_config(self, name: str = SERVE_DEFAULT_APP_NAME) -> Optional[Dict]:
checkpoint = self.kv_store.get(CONFIG_CHECKPOINT_KEY)
if checkpoint is not None:
_, config_checkpoints_dict = pickle.loads(checkpoint)
_, _, config_checkpoints_dict = pickle.loads(checkpoint)
if name in config_checkpoints_dict:
config = config_checkpoints_dict[name]
return ServeApplicationSchema.parse_obj(config).dict(exclude_unset=True)
Expand Down

0 comments on commit cc93df5

Please sign in to comment.