Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][state] List actors should show actors from dead jobs as well #31984

Closed
wants to merge 16 commits into from
4 changes: 4 additions & 0 deletions dashboard/modules/state/state_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ def _options_from_req(self, req: aiohttp.web.Request) -> ListApiOptions:
timeout = int(req.query.get("timeout", 30))
filters = self._get_filters_from_req(req)
detail = convert_string_to_type(req.query.get("detail", False), bool)
show_dead_jobs = convert_string_to_type(
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
req.query.get("show_dead_jobs", True), bool
)
exclude_driver = convert_string_to_type(
req.query.get("exclude_driver", True), bool
)
Expand All @@ -203,6 +206,7 @@ def _options_from_req(self, req: aiohttp.web.Request) -> ListApiOptions:
timeout=timeout,
filters=filters,
detail=detail,
show_dead_jobs=show_dead_jobs,
exclude_driver=exclude_driver,
)

Expand Down
4 changes: 3 additions & 1 deletion dashboard/state_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ async def list_actors(self, *, option: ListApiOptions) -> ListApiResponse:

"""
try:
reply = await self._client.get_all_actor_info(timeout=option.timeout)
reply = await self._client.get_all_actor_info(
timeout=option.timeout, show_dead_jobs=option.show_dead_jobs
)
except DataSourceUnavailable:
raise DataSourceUnavailable(GCS_QUERY_FAILURE_WARNING)

Expand Down
3 changes: 3 additions & 0 deletions python/ray/experimental/state/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class ListApiOptions:
filters: Optional[List[Tuple[str, PredicateType, SupportedFilterType]]] = field(
default_factory=list
)
# [only actors] If actors from dead jobs should be included in the list.
show_dead_jobs: bool = True
# [only tasks] If driver tasks should be excluded.
exclude_driver: bool = True
# When the request is processed on the server side,
Expand All @@ -102,6 +104,7 @@ def __post_init__(self):
assert isinstance(self.limit, int)
assert isinstance(self.timeout, int)
assert isinstance(self.detail, bool)
assert isinstance(self.show_dead_jobs, bool)
assert isinstance(self.exclude_driver, bool)
assert isinstance(self.filters, list) or self.filters is None, (
"filters must be a list type. Given filters: "
Expand Down
4 changes: 2 additions & 2 deletions python/ray/experimental/state/state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,12 @@ def ip_to_node_id(self, ip: Optional[str]) -> Optional[str]:

@handle_grpc_network_errors
async def get_all_actor_info(
self, timeout: int = None, limit: int = None
self, timeout: int = None, limit: int = None, show_dead_jobs: bool = False
) -> Optional[GetAllActorInfoReply]:
if not limit:
limit = RAY_MAX_LIMIT_FROM_DATA_SOURCE

request = GetAllActorInfoRequest(limit=limit)
request = GetAllActorInfoRequest(limit=limit, show_dead_jobs=show_dead_jobs)
reply = await self._gcs_actor_info_stub.GetAllActorInfo(
request, timeout=timeout
)
Expand Down
41 changes: 41 additions & 0 deletions python/ray/tests/test_state_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import ray._private.state as global_state
import ray._private.ray_constants as ray_constants
from ray._private.test_utils import (
run_string_as_driver,
wait_for_condition,
async_wait_for_condition_async_predicate,
)
Expand Down Expand Up @@ -88,6 +89,7 @@
DEFAULT_RPC_TIMEOUT,
ActorState,
ListApiOptions,
StateResource,
SummaryApiOptions,
NodeState,
ObjectState,
Expand Down Expand Up @@ -1927,6 +1929,45 @@ class A:
assert x_actors[0]["ray_namespace"] == "x"


def test_list_all_actors_from_dead_jobs(ray_start_regular):
script = """
import ray
import time

ray.init("auto")

@ray.remote
class Actor():
def ready(self):
pass

a = Actor.remote()
ray.get(a.ready.remote())
"""
run_string_as_driver(script)

client = StateApiClient()

def list_actors(show_dead_jobs: bool):
return client.list(
StateResource.ACTORS,
options=ListApiOptions(show_dead_jobs=show_dead_jobs),
raise_on_missing_output=True,
)

def verify():
actors_with_dead_jobs = list_actors(show_dead_jobs=True)
assert len(actors_with_dead_jobs) == 1
assert actors_with_dead_jobs[0]["state"] == "DEAD"

actors_no_dead_jobs = list_actors(show_dead_jobs=False)
assert len(actors_no_dead_jobs) == 0

return True

wait_for_condition(verify)


@pytest.mark.skipif(
sys.platform == "win32",
reason="Failed on Windows",
Expand Down
3 changes: 2 additions & 1 deletion python/ray/tests/test_state_api_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,13 +382,14 @@ def verify():
summary = summary["cluster"]["summary"]
actor_summary = None
infeasible_summary = None
print(summary)
for actor_class_name, s in summary.items():
if ".Actor" in actor_class_name:
actor_summary = s
elif ".Infeasible" in actor_class_name:
infeasible_summary = s

assert actor_summary["state_counts"]["PENDING_CREATION"] == 1
assert actor_summary["state_counts"]["DEPENDENCY_UNREADY"] == 1
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
assert actor_summary["state_counts"]["ALIVE"] == 2
assert infeasible_summary["state_counts"]["PENDING_CREATION"] == 1
return True
Expand Down