Skip to content

Commit

Permalink
[1/N][Advanced timeline] Include events/profiling events to the ray l…
Browse files Browse the repository at this point in the history
…ist tasks (ray-project#31776)

This PR

changes scheduling_state -> state, which allows us to render state count from the frontend (it is also consistent with other schema).
Add duration to the frontend.
Add profile / regular events to the task state API.
Support source side filtering for job id
There are remaining work in the follow up

Replace timeline implementation to use task API
Implement timeline frontend.
Display events / profile events from the dashboard/state API in a better format.
  • Loading branch information
rkooo567 committed Jan 23, 2023
1 parent 25d3d52 commit 228b87f
Show file tree
Hide file tree
Showing 13 changed files with 241 additions and 57 deletions.
31 changes: 25 additions & 6 deletions dashboard/client/src/components/TaskTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import {
import Autocomplete from "@material-ui/lab/Autocomplete";
import Pagination from "@material-ui/lab/Pagination";
import React, { useState } from "react";
import { DurationText } from "../common/DurationText";
import rowStyles from "../common/RowStyles";
import { Task } from "../type/task";
import { useFilter } from "../util/hook";
import StateCounter from "./StatesCounter";
import { StatusChip } from "./StatusChip";

const TaskTable = ({
Expand All @@ -36,7 +38,8 @@ const TaskTable = ({
{ label: "ID" },
{ label: "Name" },
{ label: "Job Id" },
{ label: "Scheduling State" },
{ label: "State" },
{ label: "Duration" },
{ label: "Function or Class Name" },
{ label: "Node Id" },
{ label: "Actor_id" },
Expand All @@ -59,12 +62,12 @@ const TaskTable = ({
/>
<Autocomplete
style={{ margin: 8, width: 120 }}
options={Array.from(new Set(tasks.map((e) => e.scheduling_state)))}
options={Array.from(new Set(tasks.map((e) => e.state)))}
onInputChange={(_: any, value: string) => {
changeFilter("scheduling_state", value.trim());
changeFilter("state", value.trim());
}}
renderInput={(params: TextFieldProps) => (
<TextField {...params} label="Scheduling State" />
<TextField {...params} label="State" />
)}
/>
<Autocomplete
Expand Down Expand Up @@ -121,6 +124,9 @@ const TaskTable = ({
count={Math.ceil(taskList.length / pageSize)}
/>
</div>
<div>
<StateCounter type="task" list={taskList} />
</div>
</div>
<Table>
<TableHead>
Expand All @@ -140,12 +146,15 @@ const TaskTable = ({
task_id,
name,
job_id,
scheduling_state,
state,
func_or_class_name,
node_id,
actor_id,
type,
required_resources,
events,
start_time_ms,
end_time_ms,
}) => (
<TableRow>
<TableCell align="center">
Expand All @@ -161,7 +170,17 @@ const TaskTable = ({
<TableCell align="center">{name ? name : "-"}</TableCell>
<TableCell align="center">{job_id}</TableCell>
<TableCell align="center">
<StatusChip type="actor" status={scheduling_state} />
<StatusChip type="actor" status={state} />
</TableCell>
<TableCell align="center">
{start_time_ms && start_time_ms > 0 ? (
<DurationText
startTime={start_time_ms}
endTime={end_time_ms}
/>
) : (
"-"
)}
</TableCell>
<TableCell align="center">{func_or_class_name}</TableCell>
<TableCell align="center">{node_id ? node_id : "-"}</TableCell>
Expand Down
2 changes: 1 addition & 1 deletion dashboard/client/src/pages/state/task.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Task } from "../../type/task";
import { useStateApiList } from "./hook/useStateApi";

/**
* Represent the embedable actors page.
* Represent the embedable tasks page.
*/
const TaskList = ({ jobId = null }: { jobId?: string | null }) => {
const [timeStamp] = useState(dayjs());
Expand Down
2 changes: 1 addition & 1 deletion dashboard/client/src/service/placementGroup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import { get } from "./requestHandlers";

export const getPlacementGroup = () => {
return get<StateApiResponse<PlacementGroup>>(
"api/v0/placement_groups?detail=1",
"api/v0/placement_groups?detail=1&limit=10000",
);
};
2 changes: 1 addition & 1 deletion dashboard/client/src/service/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ import { Task } from "../type/task";
import { get } from "./requestHandlers";

export const getTasks = () => {
return get<StateApiResponse<Task>>("api/v0/tasks?detail=1");
return get<StateApiResponse<Task>>("api/v0/tasks?detail=1&limit=10000");
};
5 changes: 4 additions & 1 deletion dashboard/client/src/type/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export enum TypeTaskType {
export type Task = {
task_id: string;
name: string;
scheduling_state: TypeTaskStatus;
state: TypeTaskStatus;
job_id: string;
node_id: string;
actor_id: string;
Expand All @@ -33,4 +33,7 @@ export type Task = {
language: string;
required_resources: { [key: string]: number };
runtime_env_info: string;
events: { [key: string]: string }[];
start_time_ms: number | null;
end_time_ms: number | null;
};
57 changes: 45 additions & 12 deletions dashboard/state_aggregator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import logging
import json

from dataclasses import asdict, fields
from itertools import islice
Expand Down Expand Up @@ -359,9 +360,16 @@ async def list_tasks(self, *, option: ListApiOptions) -> ListApiResponse:
{task_id -> task_data_in_dict}
task_data_in_dict's schema is in TaskState
"""
job_id = None
for filter in option.filters:
if filter[0] == "job_id":
# tuple consists of (job_id, predicate, value)
job_id = filter[2]

try:
reply = await self._client.get_all_task_info(timeout=option.timeout)
reply = await self._client.get_all_task_info(
timeout=option.timeout, job_id=job_id
)
except DataSourceUnavailable:
raise DataSourceUnavailable(GCS_QUERY_FAILURE_WARNING)

Expand All @@ -371,7 +379,15 @@ def _to_task_state(task_attempt: dict) -> dict:
"""
task_state = {}
task_info = task_attempt.get("task_info", {})
state_updates = task_attempt.get("state_updates", None)
state_updates = task_attempt.get("state_updates", [])
profiling_data = task_attempt.get("profiling_data", {})
if profiling_data:
for event in profiling_data["events"]:
# End/start times are recorded in ns. We convert them to ms.
event["end_time"] = int(event["end_time"]) / 1e6
event["start_time"] = int(event["start_time"]) / 1e6
event["extra_data"] = json.loads(event["extra_data"])
task_state["profiling_data"] = profiling_data

# Convert those settable fields
mappings = [
Expand All @@ -396,16 +412,32 @@ def _to_task_state(task_attempt: dict) -> dict:
for key in keys:
task_state[key] = src.get(key)

# Get the most updated scheduling_state by state transition ordering.
def _get_most_recent_status(task_state: dict) -> str:
# Reverse the order as defined in protobuf for the most recent state.
for status_name in reversed(common_pb2.TaskStatus.keys()):
key = f"{status_name.lower()}_ts"
if state_updates.get(key):
return status_name
return common_pb2.TaskStatus.Name(common_pb2.NIL)

task_state["scheduling_state"] = _get_most_recent_status(state_updates)
task_state["start_time_ms"] = None
task_state["end_time_ms"] = None
events = []

for state in common_pb2.TaskStatus.keys():
key = f"{state.lower()}_ts"
if key in state_updates:
# timestamp is recorded in ns.
ts_ms = int(state_updates[key]) // 1e6
events.append(
{
"state": state,
"created_ms": ts_ms,
}
)
if state == "RUNNING":
task_state["start_time_ms"] = ts_ms
if state == "FINISHED" or state == "FAILED":
task_state["end_time_ms"] = ts_ms

task_state["events"] = events
if len(events) > 0:
latest_state = events[-1]["state"]
else:
latest_state = common_pb2.TaskStatus.Name(common_pb2.NIL)
task_state["state"] = latest_state

return task_state

Expand All @@ -419,6 +451,7 @@ def _get_most_recent_status(task_state: dict) -> str:
"node_id",
"actor_id",
"parent_task_id",
"component_id",
],
)
)
Expand Down
12 changes: 6 additions & 6 deletions doc/source/ray-observability/state/state-api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,14 @@ E.g., List running tasks

.. code-block:: bash
ray list tasks -f scheduling_state=RUNNING
ray list tasks -f state=RUNNING
.. tabbed:: Python SDK

.. code-block:: python
from ray.experimental.state.api import list_tasks
list_tasks(filters=[("scheduling_state", "=", "RUNNING")])
list_tasks(filters=[("state", "=", "RUNNING")])
E.g., List non-running tasks
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -336,14 +336,14 @@ E.g., List non-running tasks

.. code-block:: bash
ray list tasks -f scheduling_state!=RUNNING
ray list tasks -f state!=RUNNING
.. tabbed:: Python SDK

.. code-block:: python
from ray.experimental.state.api import list_tasks
list_tasks(filters=[("scheduling_state", "!=", "RUNNING")])
list_tasks(filters=[("state", "!=", "RUNNING")])
E.g., List running tasks that have a name func
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -352,14 +352,14 @@ E.g., List running tasks that have a name func

.. code-block:: bash
ray list tasks -f scheduling_state=RUNNING -f name="task_running_300_seconds()"
ray list tasks -f state=RUNNING -f name="task_running_300_seconds()"
.. tabbed:: Python SDK

.. code-block:: python
from ray.experimental.state.api import list_tasks
list_tasks(filters=[("scheduling_state", "=", "RUNNING"), ("name", "=", "task_running_300_seconds()")])
list_tasks(filters=[("state", "=", "RUNNING"), ("name", "=", "task_running_300_seconds()")])
E.g., List tasks with more details
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
4 changes: 2 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -746,8 +746,8 @@ cdef void execute_task(
function_descriptor = CFunctionDescriptorToPython(
ray_function.GetFunctionDescriptor())
function_name = execution_info.function_name
extra_data = (b'{"name": ' + function_name.encode("ascii") +
b' "task_id": ' + task_id.hex().encode("ascii") + b'}')
extra_data = (b'{"name": "' + function_name.encode("ascii") +
b'", "task_id": "' + task_id.hex().encode("ascii") + b'"}')

name_of_concurrency_group_to_execute = \
c_name_of_concurrency_group_to_execute.decode("ascii")
Expand Down
16 changes: 13 additions & 3 deletions python/ray/experimental/state/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ class TaskState(StateSchema):
#: Refer to src/ray/protobuf/common.proto for a detailed explanation of the state
#: breakdowns and typical state transition flow.
#:
scheduling_state: TypeTaskStatus = state_column(filterable=True)
state: TypeTaskStatus = state_column(filterable=True)
#: The job id of this task.
job_id: str = state_column(filterable=True)
#: Id of the node that runs the task. If the task is retried, it could
Expand Down Expand Up @@ -522,7 +522,17 @@ class TaskState(StateSchema):
#: The runtime environment information for the task.
runtime_env_info: str = state_column(detail=True, filterable=False)
#: The parent task id.
parent_task_id: str = state_column(filterable=True)
parent_task_id: str = state_column(detail=True, filterable=True)
#: The list of events of the given task.
#: Refer to src/ray/protobuf/common.proto for a detailed explanation of the state
#: breakdowns and typical state transition flow.
events: List[dict] = state_column(detail=True, filterable=False)
#: The list of profile events of the given task.
profiling_data: List[dict] = state_column(detail=True, filterable=False)
#: The time when the task starts to run. A Unix timestamp in ms.
start_time_ms: Optional[int] = state_column(detail=True, filterable=False)
#: The time when the task finishes or failed. A Unix timestamp in ms.
end_time_ms: Optional[int] = state_column(detail=True, filterable=False)


@dataclass(init=True)
Expand Down Expand Up @@ -740,7 +750,7 @@ def to_summary(cls, *, tasks: List[Dict]):
)
task_summary = summary[key]

state = task["scheduling_state"]
state = task["state"]
if state not in task_summary.state_counts:
task_summary.state_counts[state] = 0
task_summary.state_counts[state] += 1
Expand Down
10 changes: 8 additions & 2 deletions python/ray/experimental/state/state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import ray.dashboard.modules.log.log_consts as log_consts
from ray._private import ray_constants
from ray._private.gcs_utils import GcsAioClient
from ray._private.utils import hex_to_binary
from ray._raylet import JobID
from ray.core.generated import gcs_service_pb2_grpc
from ray.core.generated.gcs_service_pb2 import (
GetAllActorInfoReply,
Expand Down Expand Up @@ -232,11 +234,15 @@ async def get_all_actor_info(

@handle_grpc_network_errors
async def get_all_task_info(
self, timeout: int = None, limit: int = None
self, timeout: int = None, limit: int = None, job_id: Optional[str] = None
) -> Optional[GetTaskEventsReply]:
if not limit:
limit = RAY_MAX_LIMIT_FROM_DATA_SOURCE
request = GetTaskEventsRequest(limit=limit, exclude_driver_task=True)
if job_id:
job_id = JobID(hex_to_binary(job_id)).binary()
request = GetTaskEventsRequest(
limit=limit, exclude_driver_task=True, job_id=job_id
)
reply = await self._gcs_task_info_stub.GetTaskEvents(request, timeout=timeout)
return reply

Expand Down
Loading

0 comments on commit 228b87f

Please sign in to comment.