Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][state] Task events backend - Port state API [5/n] #31278

Merged
merged 8 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
revert
Signed-off-by: rickyyx <[email protected]>
  • Loading branch information
rickyyx committed Dec 22, 2022
commit e856df9fdf89cc330aa883c031a33e4de68d87d8
56 changes: 20 additions & 36 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,8 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
spec.TaskId(), spec, max_retries, num_returns, task_counter_, max_oom_retries);
RAY_CHECK(inserted.second);
num_pending_tasks_++;
RecordTaskStatusEvent(inserted.first->second, rpc::TaskStatus::PENDING_ARGS_AVAIL);
}
RecordTaskStatusEvent(spec,
rpc::TaskStatus::PENDING_ARGS_AVAIL,
/* include_task_info */ true);

return returned_refs;
}
Expand All @@ -127,8 +125,7 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *tas

if (!it->second.IsPending()) {
resubmit = true;
SetTaskStatus(
it->second, rpc::TaskStatus::PENDING_ARGS_AVAIL, /* include_task_info */ true);
SetTaskStatus(it->second, rpc::TaskStatus::PENDING_ARGS_AVAIL);
it->second.MarkRetryOnResubmit();
num_pending_tasks_++;

Expand Down Expand Up @@ -468,8 +465,7 @@ bool TaskManager::RetryTaskIfPossible(const TaskID &task_id,
<< ", oom retries left: " << num_oom_retries_left
<< ", task failed due to oom: " << task_failed_due_to_oom;
if (will_retry) {
RAY_LOG(INFO) << "Attempting to resubmit task " << spec.TaskId()
<< " for attempt number: " << spec.AttemptNumber();
RAY_LOG(INFO) << "Attempting to resubmit task " << spec.TaskId();
// TODO(clarng): clean up and remove task_retry_delay_ms that is relied
// on by some tests.
int32_t delay_ms = task_failed_due_to_oom
Expand Down Expand Up @@ -790,18 +786,12 @@ void TaskManager::MarkTaskWaitingForExecution(const TaskID &task_id,
}
RAY_CHECK(it->second.GetStatus() == rpc::TaskStatus::PENDING_NODE_ASSIGNMENT);
it->second.SetNodeId(node_id);
SetTaskStatus(it->second,
rpc::TaskStatus::SUBMITTED_TO_WORKER,
/* include_task_info */ false,
node_id);
SetTaskStatus(it->second, rpc::TaskStatus::SUBMITTED_TO_WORKER);
}

void TaskManager::SetTaskStatus(TaskEntry &task_entry,
rpc::TaskStatus status,
bool include_task_info,
absl::optional<NodeID> node_id) {
void TaskManager::SetTaskStatus(TaskEntry &task_entry, rpc::TaskStatus status) {
task_entry.SetStatus(status);
RecordTaskStatusEvent(task_entry.spec, status, include_task_info, node_id);
RecordTaskStatusEvent(task_entry, status);
}

rpc::TaskInfoEntry TaskManager::MakeTaskInfoEntry(
Expand Down Expand Up @@ -887,38 +877,32 @@ void TaskManager::RecordMetrics() {
task_counter_.FlushOnChangeCallbacks();
}

void TaskManager::RecordTaskStatusEvent(const TaskSpecification &spec,
rpc::TaskStatus status,
bool include_task_info,
absl::optional<NodeID> node_id) {
void TaskManager::RecordTaskStatusEvent(const TaskEntry &task_entry,
rpc::TaskStatus status) {
if (!task_event_buffer_.Enabled()) {
return;
}
// Make task event
rpc::TaskEvents task_event;
task_event.set_task_id(spec.TaskId().Binary());
task_event.set_job_id(spec.JobId().Binary());
task_event.set_attempt_number(spec.AttemptNumber());
task_event.set_task_id(task_entry.spec.TaskId().Binary());
task_event.set_job_id(task_entry.spec.JobId().Binary());
task_event.set_attempt_number(task_entry.spec.AttemptNumber());
auto state_updates = task_event.mutable_state_updates();
if (include_task_info) {
// Initialize a new TaskInfoEntry
auto task_info = MakeTaskInfoEntry(spec);
task_event.mutable_task_info()->Swap(&task_info);
}

// Update the node id
if (node_id.has_value()) {
RAY_CHECK(status == rpc::TaskStatus::SUBMITTED_TO_WORKER)
<< "Only SUBMITTED_TO_WORKER status change has node id update.";
state_updates->set_node_id(node_id->Binary());
}

switch (status) {
case rpc::TaskStatus::PENDING_ARGS_AVAIL: {
// Initialize a new TaskInfoEntry
auto task_info = MakeTaskInfoEntry(task_entry.spec);
task_event.mutable_task_info()->Swap(&task_info);
state_updates->set_pending_args_avail_ts(absl::GetCurrentTimeNanos());
break;
}
case rpc::TaskStatus::SUBMITTED_TO_WORKER: {
RAY_CHECK(!task_entry.GetNodeId().IsNil())
<< "Node ID should have been set on the TaskEntry before updating it's status "
"to "
"SUBMITTED_TO_WORKER.";
// Update the node id
state_updates->set_node_id(task_entry.GetNodeId().Binary());
state_updates->set_submitted_to_worker_ts(absl::GetCurrentTimeNanos());
break;
}
Expand Down
19 changes: 3 additions & 16 deletions src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,27 +467,14 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// the TaskEventBuffer if enabled.
///
/// \param task_entry corresponding TaskEntry of a task to record the event.
/// \param status new status.
/// \param include_task_info True if TaskInfoEntry will be included when recording
/// status task event change for RecordTaskStatusEvent.
/// \param node_id Node ID of the worker for which the task's submitted. Only applicable
/// for SUBMITTED_TO_WORKER status change.
void SetTaskStatus(TaskEntry &task_entry,
rpc::TaskStatus status,
bool include_task_info = false,
absl::optional<NodeID> node_id = absl::nullopt);
/// \param status the changed status.
void SetTaskStatus(TaskEntry &task_entry, rpc::TaskStatus status);

/// Update task status change in TaskEventBuffer
///
/// \param task_entry corresponding TaskEntry of a task to record the event.
/// \param status the changed status.
/// \param include_task_info True if TaskInfoEntry will be added to the Task events.
/// \param node_id Node ID of the worker for which the task's submitted. Only applicable
/// for SUBMITTED_TO_WORKER status change.
void RecordTaskStatusEvent(const TaskSpecification &spec,
rpc::TaskStatus status,
bool include_task_info,
absl::optional<NodeID> node_id = absl::nullopt);
void RecordTaskStatusEvent(const TaskEntry &task_entry, rpc::TaskStatus status);

/// Used to store task results.
std::shared_ptr<CoreWorkerMemoryStore> in_memory_store_;
Expand Down