Skip to content

Commit

Permalink
[Core][State Observability] Use address arg + print warning if API re…
Browse files Browse the repository at this point in the history
…sponds slowly (ray-project#26008)

This PR is doing 2 things.

(1) Use api_server_url to address which is consistent to other submission APIs.
(2) When the API is not responded timely, it prints a warning every 5 seconds. Below is an example. This is useful when the API is slowly responded (e.g., when there are partial failures). Without this users will see hanging API for 30 seconds, which is a pretty bad UX.

(0.12 / 10 seconds) Waiting for the response from the API server address http:https://127.0.0.1:8265/api/v0/delay/5.
  • Loading branch information
rkooo567 committed Jul 14, 2022
1 parent 8f74e1f commit e9f6ffc
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 92 deletions.
13 changes: 11 additions & 2 deletions dashboard/modules/dashboard_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tempfile
from typing import Any, Dict, List, Optional
from pkg_resources import packaging
import ray

try:
import requests
Expand Down Expand Up @@ -125,8 +126,16 @@ def parse_cluster_info(
headers: Optional[Dict[str, Any]] = None,
) -> ClusterInfo:
if address is None:
logger.info(f"No address provided, defaulting to {DEFAULT_DASHBOARD_ADDRESS}.")
address = DEFAULT_DASHBOARD_ADDRESS
if ray.is_initialized():
address = (
"http:https://"
f"{ray._private.worker.global_worker.node.address_info['webui_url']}"
)
else:
logger.info(
f"No address provided, defaulting to {DEFAULT_DASHBOARD_ADDRESS}."
)
address = DEFAULT_DASHBOARD_ADDRESS

module_string, inner_address = _split_address(address)

Expand Down
13 changes: 13 additions & 0 deletions dashboard/modules/state/state_head.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging
from dataclasses import asdict
from typing import Callable, Optional
Expand Down Expand Up @@ -412,6 +413,18 @@ async def summarize_actors(self, req: aiohttp.web.Request) -> aiohttp.web.Respon
async def summarize_objects(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
return await self._handle_summary_api(self._state_api.summarize_objects, req)

@routes.get("/api/v0/delay/{delay_s}")
async def delayed_response(self, req: aiohttp.web.Request):
"""Testing only. Response after a specified delay."""
delay = int(req.match_info.get("delay_s", 10))
await asyncio.sleep(delay)
return self._reply(
success=True,
error_message="",
result={},
partial_failure_warning=None,
)

async def run(self, server):
gcs_channel = self._dashboard_head.aiogrpc_gcs_channel
self._state_api_data_source_client = StateDataSourceClient(gcs_channel)
Expand Down
2 changes: 1 addition & 1 deletion dashboard/state_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ async def list_placement_groups(self, *, option: ListApiOptions) -> ListApiRespo

data = self._message_to_dict(
message=message,
fields_to_decode=["placement_group_id"],
fields_to_decode=["placement_group_id", "node_id"],
)
result.append(data)

Expand Down
2 changes: 1 addition & 1 deletion dashboard/tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ def test_dashboard_requests_fail_on_missing_deps(ray_start_with_dashboard):
response = None

with pytest.raises(ServerUnavailable):
client = StateApiClient(api_server_address=DEFAULT_DASHBOARD_ADDRESS)
client = StateApiClient(address=DEFAULT_DASHBOARD_ADDRESS)
response = client.list(StateResource.NODES, options=ListApiOptions())

# Response should not be populated
Expand Down
Loading

0 comments on commit e9f6ffc

Please sign in to comment.