Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][experimental] Add support for dynamically sized torch.Tensors passed via NCCL in accelerated DAG #45332

Merged
merged 95 commits into from
May 20, 2024

Conversation

stephanie-wang
Copy link
Contributor

@stephanie-wang stephanie-wang commented May 14, 2024

Why are these changes needed?

This adds support for dynamically sized torch.Tensors to be passed between accelerated DAG nodes via NCCL. Specifically, the following code is now supported, whereas previously shape and dtype had to be explicitly passed to TorchTensorType.

    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
        dag = receiver.recv.bind(dag)

    compiled_dag = dag.experimental_compile()

The feature works by creating a shared memory channel to pass the metadata for the shape and dtype of the tensor. The metadata is then used to create a buffer of the correct size on the NCCL receiver.

Initial microbenchmarks shows this adds about 50% throughput overhead compared to statically declaring the shape and dtype, or about 160us/DAG call. This seems a bit higher than expected (see also #45319).

This also adds a few other fixes:

  • adds support for reusing actors to create new NCCL groups, which is needed if a DAG is torn down and a new one is created.
  • adds a lock to DAG teardown, to prevent the same NCCL group from getting destructed twice.
  • User-defined TorchTensorType shape or dtype is now used as a hint for the buffer size, instead of a required size. Since buffers are currently static, an error will be thrown if the user tries to return a too-large tensor.

Related issue number

Part 1 of #45306, will follow up with a separate PR for nested tensors.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

stephanie-wang and others added 30 commits April 17, 2024 15:56
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
GPU
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! +1 on this #45319. I feel like it is safer to kill actors when nccl destroy times out just in case (especially given it is hard to test and we don't understand this very well yet), but I will leave it yup to you.

@stephanie-wang
Copy link
Contributor Author

LGTM! +1 on this #45319. I feel like it is safer to kill actors when nccl destroy times out just in case (especially given it is hard to test and we don't understand this very well yet), but I will leave it yup to you.

For the initial case what I would like to do is just offer two options, one to kill the actors and the other that syncs the stream and raises an exception to keep the actors alive. I agree it needs more testing, but we can probably defer that.

Signed-off-by: Stephanie Wang <[email protected]>
@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 17, 2024
@stephanie-wang
Copy link
Contributor Author

Hmm some CI test issue about not being able to find GPUs...

This reverts commit 9915fbe.
Signed-off-by: Stephanie Wang <[email protected]>
@stephanie-wang stephanie-wang enabled auto-merge (squash) May 18, 2024 00:13
Signed-off-by: Stephanie Wang <[email protected]>
@github-actions github-actions bot disabled auto-merge May 18, 2024 01:13
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
@@ -339,5 +339,6 @@ steps:
- bazel run //ci/ray_ci:test_in_docker -- //python/ray/tests/... //python/ray/dag/... core
--parallelism-per-worker 2 --gpus 2
--build-name coregpubuild
--only-tags multi_gpu
--only-tags multi_gpu || true
- sleep 1000000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w00t i think you might forget to remove this on CI so it was running for 8 hours

Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Copy link
Collaborator

@can-anyscale can-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ci changes look good, thankks

@@ -1,7 +1,7 @@
# coding: utf-8
import logging
import torch
import pickle
import ray.cloudpickle as pickle
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to run this release test on this PR?

@stephanie-wang stephanie-wang merged commit ca9f736 into ray-project:master May 20, 2024
5 of 6 checks passed
stephanie-wang added a commit that referenced this pull request May 26, 2024
…ython objects (#45473)

Allows torch.Tensors nested inside Python objects to be transferred via
NCCL using the following syntax:

```python
    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
        dag = receiver.recv.bind(dag)
```

We implement this by using an additional shared memory channel to pass
CPU data, with a "nested" NCCL channel to pass the GPU data. Here is the
send procedure for the above code:
1. Serialize the data. Extract out all tensors that are on the GPU and
replace them with some kind of placeholder.
2. Send a list of metadata through the meta_channel.
3. Send the GPU tensors through the NCCL channel.
4. Send the rest of the CPU data through a cpu_data_channel, with the
placeholders for the GPU tensors.

Note that if the TorchTensorType doesn't have a shape and dtype
specified, we currently use the separate meta_channel to pass metadata
for the serialized tensors, as introduced in #45332. To elide the
cpu_data_channel, the user should now use
`TorchTensorType(direct_return=True)`, to indicate that no CPU data is
sent along with the GPU data. To elide the meta_channel, the user should
declare the shape and dtype, e.g., `TorchTensorType(shape=(10, ),
dtype=torch.float16)`.

## Related issue number

Closes #45306.

---------

Signed-off-by: Stephanie Wang <[email protected]>
Co-authored-by: SangBin Cho <[email protected]>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 6, 2024
…passed via NCCL in accelerated DAG (ray-project#45332)

This adds support for dynamically sized torch.Tensors to be passed
between accelerated DAG nodes via NCCL. Specifically, the following code
is now supported, whereas previously `shape` and `dtype` had to be
explicitly passed to `TorchTensorType`.

```python
    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
        dag = receiver.recv.bind(dag)

    compiled_dag = dag.experimental_compile()
```

The feature works by creating a shared memory channel to pass the
metadata for the shape and dtype of the tensor. The metadata is then
used to create a buffer of the correct size on the NCCL receiver.

Initial microbenchmarks shows this adds about 50% throughput overhead
compared to statically declaring the shape and dtype, or about 160us/DAG
call. This seems a bit higher than expected (see also ray-project#45319).

This also adds a few other fixes:
- adds support for reusing actors to create new NCCL groups, which is
needed if a DAG is torn down and a new one is created.
- adds a lock to DAG teardown, to prevent the same NCCL group from
getting destructed twice.
- User-defined TorchTensorType shape or dtype is now used as a hint for
the buffer size, instead of a required size. Since buffers are currently
static, an error will be thrown if the user tries to return a too-large
tensor.

Part 1 of ray-project#45306, will follow up with a separate PR for nested tensors.

---------

Signed-off-by: Stephanie Wang <[email protected]>
Co-authored-by: SangBin Cho <[email protected]>
Co-authored-by: Kai-Hsun Chen <[email protected]>
Signed-off-by: Ryan O'Leary <[email protected]>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 6, 2024
…ython objects (ray-project#45473)

Allows torch.Tensors nested inside Python objects to be transferred via
NCCL using the following syntax:

```python
    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
        dag = receiver.recv.bind(dag)
```

We implement this by using an additional shared memory channel to pass
CPU data, with a "nested" NCCL channel to pass the GPU data. Here is the
send procedure for the above code:
1. Serialize the data. Extract out all tensors that are on the GPU and
replace them with some kind of placeholder.
2. Send a list of metadata through the meta_channel.
3. Send the GPU tensors through the NCCL channel.
4. Send the rest of the CPU data through a cpu_data_channel, with the
placeholders for the GPU tensors.

Note that if the TorchTensorType doesn't have a shape and dtype
specified, we currently use the separate meta_channel to pass metadata
for the serialized tensors, as introduced in ray-project#45332. To elide the
cpu_data_channel, the user should now use
`TorchTensorType(direct_return=True)`, to indicate that no CPU data is
sent along with the GPU data. To elide the meta_channel, the user should
declare the shape and dtype, e.g., `TorchTensorType(shape=(10, ),
dtype=torch.float16)`.

## Related issue number

Closes ray-project#45306.

---------

Signed-off-by: Stephanie Wang <[email protected]>
Co-authored-by: SangBin Cho <[email protected]>
Signed-off-by: Ryan O'Leary <[email protected]>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 6, 2024
…passed via NCCL in accelerated DAG (ray-project#45332)

This adds support for dynamically sized torch.Tensors to be passed
between accelerated DAG nodes via NCCL. Specifically, the following code
is now supported, whereas previously `shape` and `dtype` had to be
explicitly passed to `TorchTensorType`.

```python
    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
        dag = receiver.recv.bind(dag)

    compiled_dag = dag.experimental_compile()
```

The feature works by creating a shared memory channel to pass the
metadata for the shape and dtype of the tensor. The metadata is then
used to create a buffer of the correct size on the NCCL receiver.

Initial microbenchmarks shows this adds about 50% throughput overhead
compared to statically declaring the shape and dtype, or about 160us/DAG
call. This seems a bit higher than expected (see also ray-project#45319).

This also adds a few other fixes:
- adds support for reusing actors to create new NCCL groups, which is
needed if a DAG is torn down and a new one is created.
- adds a lock to DAG teardown, to prevent the same NCCL group from
getting destructed twice.
- User-defined TorchTensorType shape or dtype is now used as a hint for
the buffer size, instead of a required size. Since buffers are currently
static, an error will be thrown if the user tries to return a too-large
tensor.

Part 1 of ray-project#45306, will follow up with a separate PR for nested tensors.

---------

Signed-off-by: Stephanie Wang <[email protected]>
Co-authored-by: SangBin Cho <[email protected]>
Co-authored-by: Kai-Hsun Chen <[email protected]>
Signed-off-by: Ryan O'Leary <[email protected]>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 6, 2024
…ython objects (ray-project#45473)

Allows torch.Tensors nested inside Python objects to be transferred via
NCCL using the following syntax:

```python
    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
        dag = receiver.recv.bind(dag)
```

We implement this by using an additional shared memory channel to pass
CPU data, with a "nested" NCCL channel to pass the GPU data. Here is the
send procedure for the above code:
1. Serialize the data. Extract out all tensors that are on the GPU and
replace them with some kind of placeholder.
2. Send a list of metadata through the meta_channel.
3. Send the GPU tensors through the NCCL channel.
4. Send the rest of the CPU data through a cpu_data_channel, with the
placeholders for the GPU tensors.

Note that if the TorchTensorType doesn't have a shape and dtype
specified, we currently use the separate meta_channel to pass metadata
for the serialized tensors, as introduced in ray-project#45332. To elide the
cpu_data_channel, the user should now use
`TorchTensorType(direct_return=True)`, to indicate that no CPU data is
sent along with the GPU data. To elide the meta_channel, the user should
declare the shape and dtype, e.g., `TorchTensorType(shape=(10, ),
dtype=torch.float16)`.

## Related issue number

Closes ray-project#45306.

---------

Signed-off-by: Stephanie Wang <[email protected]>
Co-authored-by: SangBin Cho <[email protected]>
Signed-off-by: Ryan O'Leary <[email protected]>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 7, 2024
…passed via NCCL in accelerated DAG (ray-project#45332)

This adds support for dynamically sized torch.Tensors to be passed
between accelerated DAG nodes via NCCL. Specifically, the following code
is now supported, whereas previously `shape` and `dtype` had to be
explicitly passed to `TorchTensorType`.

```python
    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
        dag = receiver.recv.bind(dag)

    compiled_dag = dag.experimental_compile()
```

The feature works by creating a shared memory channel to pass the
metadata for the shape and dtype of the tensor. The metadata is then
used to create a buffer of the correct size on the NCCL receiver.

Initial microbenchmarks shows this adds about 50% throughput overhead
compared to statically declaring the shape and dtype, or about 160us/DAG
call. This seems a bit higher than expected (see also ray-project#45319).

This also adds a few other fixes:
- adds support for reusing actors to create new NCCL groups, which is
needed if a DAG is torn down and a new one is created.
- adds a lock to DAG teardown, to prevent the same NCCL group from
getting destructed twice.
- User-defined TorchTensorType shape or dtype is now used as a hint for
the buffer size, instead of a required size. Since buffers are currently
static, an error will be thrown if the user tries to return a too-large
tensor.

Part 1 of ray-project#45306, will follow up with a separate PR for nested tensors.


---------

Signed-off-by: Stephanie Wang <[email protected]>
Co-authored-by: SangBin Cho <[email protected]>
Co-authored-by: Kai-Hsun Chen <[email protected]>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 7, 2024
…ython objects (ray-project#45473)

Allows torch.Tensors nested inside Python objects to be transferred via
NCCL using the following syntax:

```python
    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
        dag = receiver.recv.bind(dag)
```

We implement this by using an additional shared memory channel to pass
CPU data, with a "nested" NCCL channel to pass the GPU data. Here is the
send procedure for the above code:
1. Serialize the data. Extract out all tensors that are on the GPU and
replace them with some kind of placeholder.
2. Send a list of metadata through the meta_channel.
3. Send the GPU tensors through the NCCL channel.
4. Send the rest of the CPU data through a cpu_data_channel, with the
placeholders for the GPU tensors.

Note that if the TorchTensorType doesn't have a shape and dtype
specified, we currently use the separate meta_channel to pass metadata
for the serialized tensors, as introduced in ray-project#45332. To elide the
cpu_data_channel, the user should now use
`TorchTensorType(direct_return=True)`, to indicate that no CPU data is
sent along with the GPU data. To elide the meta_channel, the user should
declare the shape and dtype, e.g., `TorchTensorType(shape=(10, ),
dtype=torch.float16)`.

## Related issue number

Closes ray-project#45306.

---------

Signed-off-by: Stephanie Wang <[email protected]>
Co-authored-by: SangBin Cho <[email protected]>
GabeChurch pushed a commit to GabeChurch/ray that referenced this pull request Jun 11, 2024
…passed via NCCL in accelerated DAG (ray-project#45332)

This adds support for dynamically sized torch.Tensors to be passed
between accelerated DAG nodes via NCCL. Specifically, the following code
is now supported, whereas previously `shape` and `dtype` had to be
explicitly passed to `TorchTensorType`.

```python
    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
        dag = receiver.recv.bind(dag)

    compiled_dag = dag.experimental_compile()
```

The feature works by creating a shared memory channel to pass the
metadata for the shape and dtype of the tensor. The metadata is then
used to create a buffer of the correct size on the NCCL receiver.

Initial microbenchmarks shows this adds about 50% throughput overhead
compared to statically declaring the shape and dtype, or about 160us/DAG
call. This seems a bit higher than expected (see also ray-project#45319).

This also adds a few other fixes:
- adds support for reusing actors to create new NCCL groups, which is
needed if a DAG is torn down and a new one is created.
- adds a lock to DAG teardown, to prevent the same NCCL group from
getting destructed twice.
- User-defined TorchTensorType shape or dtype is now used as a hint for
the buffer size, instead of a required size. Since buffers are currently
static, an error will be thrown if the user tries to return a too-large
tensor.

Part 1 of ray-project#45306, will follow up with a separate PR for nested tensors.

---------

Signed-off-by: Stephanie Wang <[email protected]>
Co-authored-by: SangBin Cho <[email protected]>
Co-authored-by: Kai-Hsun Chen <[email protected]>
Signed-off-by: gchurch <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants