Skip to content

Commit

Permalink
[core][state] Add task_parent_id to TaskState (ray-project#31570)
Browse files Browse the repository at this point in the history
Signed-off-by: rickyyx <[email protected]>
  • Loading branch information
rickyyx committed Jan 11, 2023
1 parent ed6c6f5 commit 8e375d0
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 0 deletions.
1 change: 1 addition & 0 deletions dashboard/state_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ def _to_task_state(task_attempt: dict) -> dict:
"language",
"required_resources",
"runtime_env_info",
"parent_task_id",
],
),
(task_attempt, ["task_id", "attempt_number", "job_id"]),
Expand Down
2 changes: 2 additions & 0 deletions python/ray/experimental/state/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ class TaskState(StateSchema):
required_resources: dict = state_column(detail=True, filterable=False)
#: The runtime environment information for the task.
runtime_env_info: str = state_column(detail=True, filterable=False)
#: The parent task id.
parent_task_id: str = state_column(filterable=True)


@dataclass(init=True)
Expand Down
33 changes: 33 additions & 0 deletions python/ray/tests/test_state_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2004,6 +2004,39 @@ def verify():
print(list_tasks())


def test_parent_task_id(shutdown_only):
"""Test parent task id set up properly"""
ray.init(num_cpus=2)

@ray.remote
def child():
pass

@ray.remote
def parent():
ray.get(child.remote())

ray.get(parent.remote())

def verify():
tasks = list_tasks()
assert len(tasks) == 2, "Expect 2 tasks to finished"
parent_task_id = None
child_parent_task_id = None
for task in tasks:
if task["func_or_class_name"] == "parent":
parent_task_id = task["task_id"]
elif task["func_or_class_name"] == "child":
child_parent_task_id = task["parent_task_id"]

assert (
parent_task_id == child_parent_task_id
), "Child should have the parent task id"
return True

wait_for_condition(verify)


def test_list_get_task_multiple_attempt_all_failed(shutdown_only):
ray.init(num_cpus=2)
job_id = ray.get_runtime_context().get_job_id()
Expand Down

0 comments on commit 8e375d0

Please sign in to comment.