Skip to content

Commit

Permalink
[core][state] Adding task failure type to state API [1/2] (ray-projec…
Browse files Browse the repository at this point in the history
…t#32834)

This PR adds the `ErrorType` information of failed tasks to state API, so that it's queryable from state API and could be made visible on the dashboard. 
This PR would allow us to mark children task failure properly by differentiating with the error type (`WORKER_DIED` vs `TASK_EXECUTION_EXECEPTION`)  (see ray-project#32835)

**Major changes in the PR are:** 
1. Propogage the `ErrorType` info (as well as the `RayErrorInfo`) into the `TaskManager` when it marks tasks as failed. 
2. A bit of plumbing to to the `TaskEventBuffer` interface for constructing a `TaskStatusEvent` (wrapping of various arguments into a struct `TaskStateUpdate` so that future additional info will occur with minimal plumbing of function signatures) 
3. Embed the `ErrorType` into `RayErrorInfo` at places when the two are used. (They are currently not coupled even though `ErrorType` is a field of `RayErrorInfo`). 

--------
The PR doesn't test all the error type yet but only some, and some I believe are not relevant with task failures. Also need some help to find a way to test the others in unittesting. 
**I beleive these are not relevant with task failures:** 
- [ ]  OBJECT_UNRECONSTRUCTABLE:
- [ ] OBJECT_IN_PLASMA: 
- [ ] RUNTIME_ENV_SETUP_FAILED:
- [ ] OBJECT_LOST: 
- [ ] OWNER_DIED: 
- [ ] OWNER_DIED: 
- [ ] OBJECT_UNRECONSTRUCTABLE_MAX_ATTEMPTS_EXCEEDED: 
- [ ] OBJECT_UNRECONSTRUCTABLE_LINEAGE_EVICTED:
- [ ] OBJECT_FETCH_TIMED_OUT
- [ ] ACTOR_PLACEMENT_GROUP_REMOVED: 
- [ ] ACTOR_UNSCHEDULABLE_ERROR: 
- [ ] OUT_OF_DISK_ERROR
- [ ] OBJECT_FREED: 
- [ ] DEPENDENCY_RESOLUTION_FAILED

**Relevant task failures (tested)** 
- [x] WORKER_DIED
- [x] ACTOR_DIED
- [x] TASK_EXECUTION_EXCEPTION
- [x] TASK_CANCELLED
- [x] ACTOR_CREATION_FAILED: should be addressed together with this PR: ray-project#32726
- [x] LOCAL_RAYLET_DIED: not sure how to repro
- [x] TASK_PLACEMENT_GROUP_REMOVED
- [x] TASK_UNSCHEDULABLE_ERROR
- [x] OUT_OF_MEMORY
- [x] NODE_DIED

Co-authored-by: SangBin Cho <[email protected]>
  • Loading branch information
rickyyx and rkooo567 committed Mar 10, 2023
1 parent 694a74a commit dd41126
Show file tree
Hide file tree
Showing 18 changed files with 327 additions and 84 deletions.
2 changes: 1 addition & 1 deletion dashboard/state_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ def _to_task_state(task_attempt: dict) -> dict:
],
),
(task_attempt, ["task_id", "attempt_number", "job_id"]),
(state_updates, ["node_id", "worker_id"]),
(state_updates, ["node_id", "worker_id", "error_type"]),
]
for src, keys in mappings:
for key in keys:
Expand Down
13 changes: 13 additions & 0 deletions python/ray/_private/state_api_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import time
import traceback
from typing import Callable, Dict, List, Optional
from ray.experimental.state.api import list_tasks
import ray
from ray.actor import ActorHandle

Expand Down Expand Up @@ -306,3 +307,15 @@ def periodic_invoke_state_apis_with_actor(*args, **kwargs) -> ActorHandle:
print("State api actor is ready now.")
actor.start.remote()
return actor


def verify_failed_task(name: str, error_type: str) -> bool:
"""
Check if a task with 'name' has failed with the exact error type 'error_type'
"""
tasks = list_tasks(filters=[("name", "=", name)])
assert len(tasks) == 1, tasks
t = tasks[0]
assert t["state"] == "FAILED", t
assert t["error_type"] == error_type, t
return True
2 changes: 2 additions & 0 deletions python/ray/experimental/state/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,8 @@ class TaskState(StateSchema):
start_time_ms: Optional[int] = state_column(detail=True, filterable=False)
#: The time when the task is finished or failed. A Unix timestamp in ms.
end_time_ms: Optional[int] = state_column(detail=True, filterable=False)
#: Task error type.
error_type: Optional[str] = state_column(detail=False, filterable=False)


@dataclass(init=True)
Expand Down
10 changes: 9 additions & 1 deletion python/ray/tests/test_failure_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import psutil
import pytest
from grpc._channel import _InactiveRpcError
from ray._private.state_api_test_utils import verify_failed_task

import ray
import ray._private.ray_constants as ray_constants
Expand Down Expand Up @@ -555,13 +556,20 @@ def func():

# The lease request should wait inside raylet
# since there is no available resources.
ret = func.remote()
ret = func.options(name="task-local-raylet-dead").remote()
# Waiting for the lease request to reach raylet.
time.sleep(1)
head.kill_raylet()
with pytest.raises(LocalRayletDiedError):
ray.get(ret)

# Check the task failure states for observability.
wait_for_condition(
verify_failed_task,
name="task-local-raylet-dead",
error_type="LOCAL_RAYLET_DIED",
)


def test_locality_aware_scheduling_for_dead_nodes(shutdown_only):
"""Test that locality-ware scheduling can handle dead nodes."""
Expand Down
15 changes: 12 additions & 3 deletions python/ray/tests/test_memory_pressure.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import numpy as np
from ray._private.utils import get_system_memory
from ray._private.utils import get_used_memory
from ray.experimental.state.api import list_tasks
from ray.experimental.state.state_manager import StateDataSourceClient


Expand Down Expand Up @@ -339,7 +340,7 @@ async def test_task_oom_logs_error(ray_with_memory_monitor):
bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(1)
with pytest.raises(ray.exceptions.OutOfMemoryError) as _:
ray.get(
allocate_memory.options(max_retries=0).remote(
allocate_memory.options(max_retries=0, name="allocate_memory").remote(
allocate_bytes=bytes_to_alloc,
allocate_interval_s=0,
post_allocate_sleep_s=1000,
Expand All @@ -355,8 +356,16 @@ async def test_task_oom_logs_error(ray_with_memory_monitor):
verified = True
assert verified

# TODO(clarng): verify task info once state_api_client.get_task_info
# returns the crashed task.
def verify_oom_task_error():
tasks = list_tasks(filters=[("name", "=", "allocate_memory")])
print(tasks)
assert len(tasks) == 1, "no retries should be expected."
assert tasks[0]["state"] == "FAILED"
assert tasks[0]["error_type"] == "OUT_OF_MEMORY"
return True

wait_for_condition(verify_oom_task_error)

# TODO(clarng): verify log info once state api can dump log info


Expand Down
170 changes: 169 additions & 1 deletion python/ray/tests/test_task_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import pytest
import threading
import time

from ray._private.state_api_test_utils import verify_failed_task
from ray.exceptions import RuntimeEnvSetupError
from ray.runtime_env import RuntimeEnv
import ray
from ray.experimental.state.common import ListApiOptions, StateResource
from ray._private.test_utils import (
Expand Down Expand Up @@ -82,6 +84,172 @@ def verify():
)


def test_failed_task_error(shutdown_only):
ray.init(_system_config=_SYSTEM_CONFIG)

# Test failed task with TASK_EXECUTION_EXCEPTION
@ray.remote
def fail(x=None):
if x is not None:
time.sleep(x)
raise ValueError("fail is expected to failed")

with pytest.raises(ray.exceptions.RayTaskError):
ray.get(fail.options(name="fail").remote())

wait_for_condition(
verify_failed_task, name="fail", error_type="TASK_EXECUTION_EXCEPTION"
)

# Test canceled tasks with TASK_CANCELLED
@ray.remote
def sleep():
time.sleep(999)

with pytest.raises(ray.exceptions.TaskCancelledError):
t = sleep.options(name="sleep-cancel").remote()
ray.cancel(t)
ray.get(t)

wait_for_condition(
verify_failed_task, name="sleep-cancel", error_type="TASK_CANCELLED"
)

# Test task failed when worker killed :WORKER_DIED
@ray.remote(max_retries=0)
def die():
exit(1)

with pytest.raises(ray.exceptions.WorkerCrashedError):
ray.get(die.options(name="die-worker").remote())

wait_for_condition(verify_failed_task, name="die-worker", error_type="WORKER_DIED")

# Test actor task failed with actor dead: ACTOR_DIED
@ray.remote
class Actor:
def f(self):
time.sleep(999)

a = Actor.remote()
with pytest.raises(ray.exceptions.RayActorError):
ray.kill(a)
ray.get(a.f.options(name="actor-killed").remote())

wait_for_condition(verify_failed_task, name="actor-killed", error_type="ACTOR_DIED")


def test_failed_task_failed_due_to_node_failure(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)
node = cluster.add_node(num_cpus=2)

driver_script = """
import ray
ray.init("auto")
@ray.remote(num_cpus=2, max_retries=0)
def sleep():
import time
time.sleep(999)
x = sleep.options(name="node-killed").remote()
ray.get(x)
"""

run_string_as_driver_nonblocking(driver_script)

def driver_running():
t = list_tasks(filters=[("name", "=", "node-killed")])
return len(t) > 0

wait_for_condition(driver_running)

# Kill the node
cluster.remove_node(node)

wait_for_condition(verify_failed_task, name="node-killed", error_type="NODE_DIED")


def test_failed_task_unschedulable(shutdown_only):
ray.init(num_cpus=1, _system_config=_SYSTEM_CONFIG)

node_id = ray.get_runtime_context().get_node_id()
policy = ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=node_id,
soft=False,
)

@ray.remote
def task():
pass

task.options(
scheduling_strategy=policy,
name="task-unschedulable",
num_cpus=2,
).remote()

wait_for_condition(
verify_failed_task,
name="task-unschedulable",
error_type="TASK_UNSCHEDULABLE_ERROR",
)


def test_failed_task_removed_placement_group(shutdown_only, monkeypatch):
ray.init(num_cpus=2, _system_config=_SYSTEM_CONFIG)
from ray.util.placement_group import placement_group, remove_placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

pg = placement_group([{"CPU": 2}])
ray.get(pg.ready())

@ray.remote(num_cpus=2)
def sleep():
time.sleep(999)

with monkeypatch.context() as m:
m.setenv(
"RAY_testing_asio_delay_us",
"NodeManagerService.grpc_server.RequestWorkerLease=3000000:3000000",
)

sleep.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=pg),
name="task-pg-removed",
max_retries=0,
).remote()

remove_placement_group(pg)

wait_for_condition(
verify_failed_task,
name="task-pg-removed",
error_type="TASK_PLACEMENT_GROUP_REMOVED",
)


def test_failed_task_runtime_env_setup(shutdown_only):
@ray.remote
def f():
pass

bad_env = RuntimeEnv(conda={"dependencies": ["_this_does_not_exist"]})
with pytest.raises(
RuntimeEnvSetupError,
match="ResolvePackageNotFound",
):
ray.get(f.options(runtime_env=bad_env, name="task-runtime-env-failed").remote())

wait_for_condition(
verify_failed_task,
name="task-runtime-env-failed",
error_type="RUNTIME_ENV_SETUP_FAILED",
)


def test_fault_tolerance_parent_failed(shutdown_only):
ray.init(num_cpus=4, _system_config=_SYSTEM_CONFIG)

Expand Down
2 changes: 1 addition & 1 deletion src/mock/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class MockTaskFinisherInterface : public TaskFinisherInterface {
(const, override));
MOCK_METHOD(bool,
RetryTaskIfPossible,
(const TaskID &task_id, bool task_failed_due_to_oom),
(const TaskID &task_id, const rpc::RayErrorInfo &error_info),
(override));
MOCK_METHOD(void, MarkDependenciesResolved, (const TaskID &task_id), (override));
MOCK_METHOD(void,
Expand Down
26 changes: 16 additions & 10 deletions src/ray/core_worker/task_event_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@ TaskStatusEvent::TaskStatusEvent(
const rpc::TaskStatus &task_status,
int64_t timestamp,
const std::shared_ptr<const TaskSpecification> &task_spec,
absl::optional<NodeID> node_id,
absl::optional<WorkerID> worker_id)
absl::optional<const TaskStatusEvent::TaskStateUpdate> state_update)
: TaskEvent(task_id, job_id, attempt_number),
task_status_(task_status),
timestamp_(timestamp),
task_spec_(task_spec),
node_id_(node_id),
worker_id_(worker_id) {}
state_update_(state_update) {}

TaskProfileEvent::TaskProfileEvent(TaskID task_id,
JobID job_id,
Expand Down Expand Up @@ -67,22 +65,30 @@ void TaskStatusEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
}

// Task status update.
auto state_updates = rpc_task_events->mutable_state_updates();
auto dst_state_update = rpc_task_events->mutable_state_updates();
gcs::FillTaskStatusUpdateTime(task_status_, timestamp_, dst_state_update);

if (node_id_.has_value()) {
if (!state_update_.has_value()) {
return;
}

if (state_update_->node_id_.has_value()) {
RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER)
<< "Node ID should be included when task status changes to "
"SUBMITTED_TO_WORKER.";
state_updates->set_node_id(node_id_->Binary());
dst_state_update->set_node_id(state_update_->node_id_->Binary());
}

if (worker_id_.has_value()) {
if (state_update_->worker_id_.has_value()) {
RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER)
<< "Worker ID should be included when task status changes to "
"SUBMITTED_TO_WORKER.";
state_updates->set_worker_id(worker_id_->Binary());
dst_state_update->set_worker_id(state_update_->worker_id_->Binary());
}

if (state_update_->error_info_.has_value()) {
dst_state_update->set_error_type(state_update_->error_info_->error_type());
}
gcs::FillTaskStatusUpdateTime(task_status_, timestamp_, state_updates);
}

void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
Expand Down
Loading

0 comments on commit dd41126

Please sign in to comment.