Skip to content

Commit

Permalink
[core][state] Ignore tasks w/o status updates (driver generated tasks…
Browse files Browse the repository at this point in the history
…) by default from GCS (ray-project#31481)

With profiling events (submit_task) collected on the driver, we will have entries of TaskEvent from the driver being reported to GCS. These events don't have any task spec info associated with them (only profile events with task id and job id). When querying events in state API, the driver task event will get accounted for sometimes when truncation happens. This PR does a source-side filtering of driver events with exclude_driver_task to True to filter out driver tasks
  • Loading branch information
rickyyx committed Jan 10, 2023
1 parent 0c42513 commit 34a14a9
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 5 deletions.
4 changes: 0 additions & 4 deletions dashboard/state_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,6 @@ def _to_task_state(task_attempt: dict) -> dict:
task_info = task_attempt.get("task_info", {})
state_updates = task_attempt.get("state_updates", None)

if state_updates is None:
return {}

# Convert those settable fields
mappings = [
(
Expand Down Expand Up @@ -426,7 +423,6 @@ def _get_most_recent_status(task_state: dict) -> str:
)
for message in reply.events_by_task
]
result = [e for e in result if len(e) > 0]

num_after_truncation = len(result)
num_total = num_after_truncation + reply.num_status_task_events_dropped
Expand Down
2 changes: 1 addition & 1 deletion python/ray/experimental/state/state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ async def get_all_task_info(
) -> Optional[GetTaskEventsReply]:
if not limit:
limit = RAY_MAX_LIMIT_FROM_DATA_SOURCE
request = GetTaskEventsRequest(limit=limit)
request = GetTaskEventsRequest(limit=limit, exclude_driver_task=True)
reply = await self._gcs_task_info_stub.GetTaskEvents(request, timeout=timeout)
return reply

Expand Down
5 changes: 5 additions & 0 deletions src/ray/gcs/gcs_server/gcs_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ void GcsTaskManager::HandleGetTaskEvents(rpc::GetTaskEventsRequest request,
int32_t num_profile_event_limit = 0;
int32_t num_status_event_limit = 0;
for (auto &task_event : task_events) {
if (request.exclude_driver_task() && !task_event.has_state_updates()) {
// Driver related profile events will generate TaskEvent w/o any task state updates.
continue;
}

if (limit < 0 || count++ < limit) {
auto events = reply->add_events_by_task();
events->Swap(&task_event);
Expand Down
2 changes: 2 additions & 0 deletions src/ray/protobuf/gcs_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,8 @@ message GetTaskEventsRequest {
// If set, the exact `limit` TaskEvents returned do not have any ordering or selection
// guarantee.
optional int64 limit = 3;
// True if task events from driver (only profiling events) should be excluded.
bool exclude_driver_task = 4;
}

message GetTaskEventsReply {
Expand Down

0 comments on commit 34a14a9

Please sign in to comment.