-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core][experimental] asyncio support for accelerated DAGs #43128
[core][experimental] asyncio support for accelerated DAGs #43128
Conversation
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
a4f081b
to
19ccf88
Compare
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
def __init__( | ||
self, | ||
buffer_size_bytes: Optional[int], | ||
enable_asyncio: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add docstring?
python/ray/experimental/channel.py
Outdated
c.end_read() | ||
|
||
|
||
class AwaitableBackgroundOutputReader(InputReader): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is confusing to have "Output"Reader is a subclass of "Input"Reader.
Do we have to associate these classes to output/input? Why don't we just call it writer / reader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! :D
self._get_or_compile() | ||
async with self._dag_submission_lock: | ||
await self._dag_submitter.write(args[0]) | ||
# Allocate a future that the caller can use to get the result. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you mention the future is set from a background thread of fetcher?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah actually it is not set from a background thread. Fetching is done in the background thread, and then the result is set in the main thread. But I'll note that down.
def __init__( | ||
self, | ||
buffer_size_bytes: Optional[int], | ||
enable_asyncio: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels a little weird to me this has to be defined in the constructor. Normally you can just create a DAG and use *_sync & *_async both.
I wonder if we can lazily start the background reader/writer when we call execute_* first time instead of defininig it at the end of compile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I think we can probably handle this better in the future? For now, I think it's better to be explicit. The issue with starting both kinds of readers is that the async reader will create background tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handling it later also sgtm!
python/ray/experimental/channel.py
Outdated
loop = asyncio.get_event_loop() | ||
while True: | ||
res = await self._queue.get() | ||
await loop.run_in_executor(None, self._run, res) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think None will create the threadpool which uses total_num_cpus / 2 threads by default. Should we create a threadpool with a single thread here instead?
This reverts commit b322646.
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
FYI @rkooo567 added some C++ changes to enable thread-safe begin_read / end_read, lost them accidentally in the merge earlier. |
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. One question regarding the thread safety implementation
RAY_RETURN_NOT_OK(EnsureGetAcquired(channel)); | ||
|
||
// If there is a concurrent ReadRelease, wait for that to finish first. | ||
sem_wait(&channel.reader_semaphore); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: does it affect perf btw? Remeber Eric mentioned busy waiting was necessary for perf?
ReaderChannel(std::unique_ptr<plasma::MutableObject> mutable_object_ptr) | ||
: mutable_object(std::move(mutable_object_ptr)) {} | ||
: mutable_object(std::move(mutable_object_ptr)) { | ||
RAY_CHECK(sem_init(&reader_semaphore, /*pshared=*/0, /*value=*/1) == 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://man7.org/linux/man-pages/man3/sem_init.3.html
The doc says this value needs to be non-zero to be shared across process. Does this mean we pshared == 0 doesn't work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's only used by the same process.
/// All channels for which we are registered as a writer. This can overlap | ||
/// with reader channels (e.g., if the CoreWorker is multithreaded and one | ||
/// thread reads while the other writes). | ||
// TODO(swang): Access to these maps is not threadsafe. This is fine in the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit; To be safer, we can create Add/Remove Channel APi with FATAL error? And we can add a comment there?
#endif | ||
return Status::OK(); | ||
} | ||
sem_post(&channel.reader_semaphore); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means if release is not called, it can hang indefinitely? Is it okay in case of exceptions being raised? E.g., read 2 values, release 1 value and exception is raised
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but that was already true before. Exceptions get serialized as normal values.
Feel free to merge after addressing last comments! |
…microbenchmarks (#44330) Fix microbenchmark after changes made in #43128: must begin_read() on channel before calling end_read() multi-output DAGs now return a single channel instead of multiple add asyncio versions of the DAG benchmarks Signed-off-by: Stephanie Wang <[email protected]>
…t#43128) Adds asyncio support for accelerated DAGs. This works by creating two asyncio.Queues on the driver, one for DAG inputs and the other for DAG outputs. Reading/writing to the DAG input/output channels is performed on asyncio's default threadpool executor to avoid blocking the main thread. The DAG is not thread-safe. See unit test changes for API example. --------- Signed-off-by: Stephanie Wang <[email protected]>
…microbenchmarks (ray-project#44330) Fix microbenchmark after changes made in ray-project#43128: must begin_read() on channel before calling end_read() multi-output DAGs now return a single channel instead of multiple add asyncio versions of the DAG benchmarks Signed-off-by: Stephanie Wang <[email protected]>
Why are these changes needed?
Adds asyncio support for accelerated DAGs. This works by creating two asyncio.Queues on the driver, one for DAG inputs and the other for DAG outputs. Reading/writing to the DAG input/output channels is performed on asyncio's default threadpool executor to avoid blocking the main thread.
The DAG is not thread-safe.
See unit test changes for API example.
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.