Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][experimental] asyncio support for accelerated DAGs #43128

Merged
merged 27 commits into from
Mar 8, 2024

Conversation

stephanie-wang
Copy link
Contributor

@stephanie-wang stephanie-wang commented Feb 13, 2024

Why are these changes needed?

Adds asyncio support for accelerated DAGs. This works by creating two asyncio.Queues on the driver, one for DAG inputs and the other for DAG outputs. Reading/writing to the DAG input/output channels is performed on asyncio's default threadpool executor to avoid blocking the main thread.

The DAG is not thread-safe.

See unit test changes for API example.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
@stephanie-wang stephanie-wang changed the title [WIP][donotmerge] async and pipelined channels for accelerated DAG [WIP] asyncio support for accelerated DAG Feb 14, 2024
@stephanie-wang stephanie-wang changed the title [WIP] asyncio support for accelerated DAG [core][experimental] asyncio support for accelerated DAGs Feb 20, 2024
Signed-off-by: Stephanie Wang <[email protected]>
def __init__(
self,
buffer_size_bytes: Optional[int],
enable_asyncio: bool = False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docstring?

c.end_read()


class AwaitableBackgroundOutputReader(InputReader):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is confusing to have "Output"Reader is a subclass of "Input"Reader.

Do we have to associate these classes to output/input? Why don't we just call it writer / reader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! :D

self._get_or_compile()
async with self._dag_submission_lock:
await self._dag_submitter.write(args[0])
# Allocate a future that the caller can use to get the result.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you mention the future is set from a background thread of fetcher?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah actually it is not set from a background thread. Fetching is done in the background thread, and then the result is set in the main thread. But I'll note that down.

def __init__(
self,
buffer_size_bytes: Optional[int],
enable_asyncio: bool = False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a little weird to me this has to be defined in the constructor. Normally you can just create a DAG and use *_sync & *_async both.

I wonder if we can lazily start the background reader/writer when we call execute_* first time instead of defininig it at the end of compile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think we can probably handle this better in the future? For now, I think it's better to be explicit. The issue with starting both kinds of readers is that the async reader will create background tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handling it later also sgtm!

loop = asyncio.get_event_loop()
while True:
res = await self._queue.get()
await loop.run_in_executor(None, self._run, res)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think None will create the threadpool which uses total_num_cpus / 2 threads by default. Should we create a threadpool with a single thread here instead?

@rkooo567 rkooo567 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Feb 20, 2024
This reverts commit b322646.
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
@stephanie-wang stephanie-wang removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Feb 21, 2024
@stephanie-wang
Copy link
Contributor Author

FYI @rkooo567 added some C++ changes to enable thread-safe begin_read / end_read, lost them accidentally in the merge earlier.

Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
@stephanie-wang stephanie-wang added the tests-ok The tagger certifies test failures are unrelated and assumes personal liability. label Feb 27, 2024
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. One question regarding the thread safety implementation

RAY_RETURN_NOT_OK(EnsureGetAcquired(channel));

// If there is a concurrent ReadRelease, wait for that to finish first.
sem_wait(&channel.reader_semaphore);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: does it affect perf btw? Remeber Eric mentioned busy waiting was necessary for perf?

ReaderChannel(std::unique_ptr<plasma::MutableObject> mutable_object_ptr)
: mutable_object(std::move(mutable_object_ptr)) {}
: mutable_object(std::move(mutable_object_ptr)) {
RAY_CHECK(sem_init(&reader_semaphore, /*pshared=*/0, /*value=*/1) == 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://man7.org/linux/man-pages/man3/sem_init.3.html

The doc says this value needs to be non-zero to be shared across process. Does this mean we pshared == 0 doesn't work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only used by the same process.

/// All channels for which we are registered as a writer. This can overlap
/// with reader channels (e.g., if the CoreWorker is multithreaded and one
/// thread reads while the other writes).
// TODO(swang): Access to these maps is not threadsafe. This is fine in the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit; To be safer, we can create Add/Remove Channel APi with FATAL error? And we can add a comment there?

#endif
return Status::OK();
}
sem_post(&channel.reader_semaphore);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means if release is not called, it can hang indefinitely? Is it okay in case of exceptions being raised? E.g., read 2 values, release 1 value and exception is raised

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but that was already true before. Exceptions get serialized as normal values.

@rkooo567
Copy link
Contributor

rkooo567 commented Mar 1, 2024

Feel free to merge after addressing last comments!

@rkooo567 rkooo567 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Mar 1, 2024
@stephanie-wang stephanie-wang merged commit 4f3f1f7 into ray-project:master Mar 8, 2024
8 of 9 checks passed
@stephanie-wang stephanie-wang deleted the asyncio-channels branch March 8, 2024 05:15
stephanie-wang added a commit that referenced this pull request Mar 28, 2024
…microbenchmarks (#44330)

Fix microbenchmark after changes made in #43128:

    must begin_read() on channel before calling end_read()
    multi-output DAGs now return a single channel instead of multiple
    add asyncio versions of the DAG benchmarks

Signed-off-by: Stephanie Wang <[email protected]>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 7, 2024
…t#43128)

Adds asyncio support for accelerated DAGs. This works by creating two asyncio.Queues on the driver, one for DAG inputs and the other for DAG outputs. Reading/writing to the DAG input/output channels is performed on asyncio's default threadpool executor to avoid blocking the main thread.

The DAG is not thread-safe.

See unit test changes for API example.
---------

Signed-off-by: Stephanie Wang <[email protected]>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 7, 2024
…microbenchmarks (ray-project#44330)

Fix microbenchmark after changes made in ray-project#43128:

    must begin_read() on channel before calling end_read()
    multi-output DAGs now return a single channel instead of multiple
    add asyncio versions of the DAG benchmarks

Signed-off-by: Stephanie Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. tests-ok The tagger certifies test failures are unrelated and assumes personal liability.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants