Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fault tolerance for compiled DAGs #41943

Merged
merged 18 commits into from
Dec 21, 2023
Merged

Conversation

ericl
Copy link
Contributor

@ericl ericl commented Dec 15, 2023

Why are these changes needed?

This adds fault tolerance and a teardown method for compiled DAGs.

  • Application-level exceptions are propagated to callers via the channel
  • To handle system-level errors, we add the notion of a "closed" channel, which is one that has an error bit set. The error bit can be set without acquiring a mutex, which means it works even if crashed processes are potentially holding the channel mutex. Processes trying to use a closed channel get RaySystemError("channel closed").
  • Upon a crash of the compiled background task, we close all DAG channels to cancel the computation.
  • The user can also explicitly teardown the DAG via compiled_dag.teardown()

Note: cancellation is best-effort and currently requires running a task on the actor's main concurrency group. If the actor is busy with some other task submitted by the user, cancellation will be delayed.

Related issue number

#41769

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

debugger_breakpoint,
)
values = self.deserialize_objects(data_metadata_pairs, object_refs)
for i, value in enumerate(values):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied from non-experimental path.

@ericl ericl changed the title [WIP] Fault tolerance for compiled DAGs [core] Fault tolerance for compiled DAGs Dec 15, 2023
@@ -751,8 +756,14 @@ Status PlasmaClient::Impl::EnsureGetAcquired(
}

int64_t version_read = 0;

// Need to unlock the client mutex since ReadAcquire() is blocking.
// TODO(ekl) is this entirely thread-safe?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are probably assured the object cannot be deallocated while a reader has the reference. @stephanie-wang @rkooo567 does this seem right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now mutable objects are never deallocated :D

But yeah in general this should be okay as long as we make sure to increment the PlasmaClient's local ref count for the object before we unlock.

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is an edge case here where an actor dies after WriteAcquire but before WriteRelease. For that case, we would need to make sure to write the exception and have one process Release (or make Release safe for multiple writers).

try_wait=True,
)
except Exception as e:
if not _is_write_acquire_failed_error(e):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I didn't quite understand this condition. It seems to fail silently if we fail to acquire?

Also, could you comment on what cases we expect to fail to acquire?

@@ -75,10 +73,21 @@ def do_exec_compiled_task(
channel.end_read()

except Exception as e:
logging.warn(f"Compiled DAG task aborted with exception: {e}")
logging.info(f"Compiled DAG task exited with exception: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-Ray exceptions, I wonder if we should instead store the error and keep looping?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -176,6 +192,32 @@ def f(x):
dag.experimental_compile()


@pytest.mark.parametrize("num_actors", [1, 4])
def test_dag_fault_tolerance(ray_start_regular, num_actors):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a test for worker process dying?

@ericl
Copy link
Contributor Author

ericl commented Dec 15, 2023

I think there is an edge case here where an actor dies after WriteAcquire but before WriteRelease

I see, would we need a timeout here to force release of the lock?

python/ray/_raylet.pyx Outdated Show resolved Hide resolved
@@ -75,10 +73,21 @@ def do_exec_compiled_task(
channel.end_read()

except Exception as e:
logging.warn(f"Compiled DAG task aborted with exception: {e}")
logging.info(f"Compiled DAG task exited with exception: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

src/ray/object_manager/plasma/client.cc Outdated Show resolved Hide resolved
python/ray/dag/tests/test_accelerated_dag.py Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Show resolved Hide resolved
python/ray/dag/tests/test_accelerated_dag.py Outdated Show resolved Hide resolved
python/ray/dag/tests/test_accelerated_dag.py Show resolved Hide resolved
src/ray/object_manager/plasma/client.cc Outdated Show resolved Hide resolved
src/ray/object_manager/common.h Outdated Show resolved Hide resolved
python/ray/experimental/channel.py Outdated Show resolved Hide resolved
@rkooo567 rkooo567 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Dec 15, 2023
@stephanie-wang
Copy link
Contributor

I think there is an edge case here where an actor dies after WriteAcquire but before WriteRelease

I see, would we need a timeout here to force release of the lock?

If we know that the original writer has definitely died, it should be okay to directly write the plasma buffer with the exception object and WriteRelease.

But actually yeah it is a bit tricky if the process fails while holding the pthread_mutex in WriteAcquire or WriteRelease. I think we need to rethink that concurrency mechanism...

@stephanie-wang
Copy link
Contributor

A long enough timeout on pthread_mutex_lock seems okay for now; we can probably improve it later.

@stephanie-wang
Copy link
Contributor

By the way, seems like we need something like this to support multi-node too, so that we have a way to signal that we should stop waiting for values to send to the other node. I think it'd be best if we can send a special value like "EOF" instead of storing an exception, so that way it works for both python and C++ readers.

@ericl
Copy link
Contributor Author

ericl commented Dec 15, 2023

But actually yeah it is a bit tricky if the process fails while holding the pthread_mutex in WriteAcquire or WriteRelease. I think we need to rethink that concurrency mechanism...

So how about this, we can switch the error writing path of WriteAcquire to sem_timedwait and pthread_mutex_timedlock with a 10 second timeout for now?

By the way, seems like we need something like this to support multi-node too, so that we have a way to signal that we should stop waiting for values to send to the other node. I think it'd be best if we can send a special value like "EOF" instead of storing an exception, so that way it works for both python and C++ readers.

Can you explain more?

Signed-off-by: Eric Liang <[email protected]>
@ericl
Copy link
Contributor Author

ericl commented Dec 18, 2023

From offline discussion, this should be refactored so that

  • app-level exception -> errors returned via application level writes
  • worker crash -> caught / logged by monitor and dag torn down via EOS
  • teardown -> dag torn down via EOS

EOS will be implemented as an "error bit" that can be set on the channel without locking. This allows error handling to avoid race conditions, while still preserving exception messages in most cases.

@rkooo567
Copy link
Contributor

This allows error handling to avoid race conditions, while still preserving exception messages in most cases.

What's missing to support "all errors" in this case?

Signed-off-by: Eric Liang <[email protected]>
Signed-off-by: Eric Liang <[email protected]>
Signed-off-by: Eric Liang <[email protected]>
src/ray/object_manager/common.cc Outdated Show resolved Hide resolved
src/ray/object_manager/common.h Show resolved Hide resolved
src/ray/object_manager/plasma/client.cc Outdated Show resolved Hide resolved
python/ray/experimental/channel.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Show resolved Hide resolved
@@ -68,17 +69,41 @@ def do_exec_compiled_task(
for idx, channel in input_channel_idxs:
resolved_inputs[idx] = channel.begin_read()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to explicitly try-catch the channel calls so that we can differentiate between expected errors (channel closed), application code errors, and anything else that might error in this loop (most likely system bugs). The try-catch at the end can be for system errors only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I played around with this and deciding the semantics is tricky, so I think we should tackle this later on for productionization.

python/ray/dag/tests/test_accelerated_dag.py Outdated Show resolved Hide resolved
Signed-off-by: Eric Liang <[email protected]>
@ericl
Copy link
Contributor Author

ericl commented Dec 20, 2023

Comments addressed

Signed-off-by: Eric Liang <[email protected]>
Signed-off-by: Eric Liang <[email protected]>
@ericl
Copy link
Contributor Author

ericl commented Dec 21, 2023

Tests look good, retrying flaky one.

@ericl ericl merged commit f295e94 into ray-project:master Dec 21, 2023
9 of 10 checks passed
vickytsang pushed a commit to ROCm/ray that referenced this pull request Jan 12, 2024
This adds fault tolerance and a teardown method for compiled DAGs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants