Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][experimental] Pass torch.Tensors through accelerated DAGs #44825

Merged
merged 27 commits into from
May 3, 2024

Conversation

stephanie-wang
Copy link
Contributor

@stephanie-wang stephanie-wang commented Apr 18, 2024

Why are these changes needed?

This PR adds support for passing torch.Tensors to local actors in an accelerated DAG, via Ray's shared memory store. It supports the following transfer cases, as long as the sending and receiving actors are on the same node: CPU-CPU, CPU-GPU, GPU-CPU, GPU-GPU (via CPU).

This iteration requires the user to explicitly declare which DAG nodes contain torch.Tensors and the tensors' shape and dtype, with a new with_type_hint decorator. For example:

    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(SHAPE, DTYPE))
        dag = receiver.recv.bind(dag)

    compiled_dag = dag.experimental_compile()

This declaration isn't necessarily useful for this PR, but it is included now because it makes it much simpler to efficiently support other cases in the future, such as p2p GPU-GPU transfers.

When a TorchTensor node is declared, the serialization of the underlying torch.Tensor is performed differently from vanilla Ray. In particular, we store the numpy view of the data. On the receiving actor, we deserialize to a torch.Tensor and move it to the device assigned to the actor, if any. Microbenchmarking shows that this is 4x faster than normal pickling and unpickling of a torch.Tensor, likely due to Ray's serialization support for numpy. Also, when moving the torch.Tensor to a GPU on the receiving side, we can avoid one extra data copy by copying directly from Ray's shared memory buffer to GPU memory.

Limitations:

  • Only supports tasks that directly return a torch.Tensor, i.e. the torch.Tensor cannot be nested in other data.
  • The task must declare the shape and dtype of its torch.Tensor at DAG compile time.
  • Does not support local p2p GPU-GPU transfer, either using cudaMemCpy or NCCL. Microbenchmark shows this can be >10x faster than transfer via CPU.
  • Does not support multinode GPU-GPU transfer, e.g., via RPC between hosts or NCCL.

stephanie-wang and others added 9 commits April 17, 2024 15:56
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
GPU
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
# Test torch.Tensor sent between actors.
with InputNode() as inp:
dag = sender.send.bind(shape, dtype, inp)
dag = TorchTensor(dag, shape, dtype)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding a wrapper DAG node, I'm thinking it may be advantageous to set typing information on the node instead to define the static shape and dtype. This would be just one case of adding more static typing information to the DAG, e.g., could imagine size as a generally useful attribute too.

Syntactically this could make the code look like this instead:

        dag = sender.send.bind(shape, dtype, inp) \
                .with_output_type(TorchTensorType(shape, type))
        dag = receiver.recv.bind(dag)

I haven't looked closely at how this would change the backend implementation but I think this could simplify that as well by removing special cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this but the one issue with that API is that it's not very clean for tasks that return multiple values. This isn't supported yet but I suspect we will need to support it eventually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although yes, we could set use that kind of dot syntax instead of a wrapper class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, eventually how would you support that? If the return type is arbitrarily complex then you probably need to handle it at runtime in a custom serialization hook instead of in the DAG. Or, the type can become complex as well.

Not sure if runtime handling is sufficient to setup accelerated transfer in all cases though, or if you need to know the type information statically ahead of time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For multiple return values, we can use the normal Ray num_returns syntax.

But yeah, the type annotations will get unwieldy for nested tensors. What I'm thinking is that for cases where the user knows the shape and dtype ahead of time, they should use the static annotation and that will guarantee 0 control plane overhead from accelerated DAGs.

If they don't know it ahead of time, we can do the custom serialization hook (we just have to make sure to do it only for DAG outputs, not for other objects serialized by the user). The serialization hook would pass the type metadata through a normal channel and start the accelerated transfer separately. It'll be slower because we need to send the type metadata synchronously, but it will be much easier to program against. With the current PR, it'd also be easy for the user to add their own type hints to the task return values, by wrapping a returned torch.Tensor with the _TorchTensorWrapper (currently a developer API).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I feel like maybe annotating the function is actually the best way to go long run, like @ray.remote(return_types=[...]), but any interim state between that and the nodes is probably fine.

I do think that using type annotations instead of a separate node wrapper class will clean up the internal code though (avoid "tensor" special cases and rewriting).

return torch.ones(shape, dtype=dtype, device=self.device) * value

def recv(self, tensor):
assert tensor.device == self.device
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would the multi-device case be handled in the DAG declaration? I imagine it would have to be something you also include as part of the DAG in order to set up the right deserialization processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was thinking to expose a call on the actor that you can use to set the default device.

Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
@stephanie-wang stephanie-wang changed the title [WIP] Pass torch.Tensors through accelerated DAGs [core][experimental] Pass torch.Tensors through accelerated DAGs Apr 24, 2024
@stephanie-wang
Copy link
Contributor Author

stephanie-wang commented Apr 25, 2024

Strangely, this commit caused the performance of CPU-CPU tensor transfer to drop from 1.29GB/s to 400 MB/s. I have no idea why; the commit doesn't seem like it should have affected anything and from what I can tell, the serialization/deserialization code wasn't affected. Will file an issue after this PR is merged...

Signed-off-by: Stephanie Wang <[email protected]>
@@ -59,6 +79,8 @@ def do_exec_compiled_task(
the loop.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment for output_wrapper_fn

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved


@pytest.mark.parametrize("use_gpu", [False, True])
def test_torch_tensor_p2p(ray_start_regular_shared, use_gpu):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to split this test into multiple tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay either way, I just used the same test to reduce duplication.

python/ray/dag/compiled_dag_node.py Show resolved Hide resolved
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm type_hint thing seems like not a good UX. What's the exact limitation now? I assume it is that we need to know if the output is in gpu tensor ahead of time to decide the transport?

I think it is okay for the initial version, but I feel like it is pretty bad UX, so we should probably discuss how to get around this...

Other than this most other comments are nits

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Show resolved Hide resolved
python/ray/dag/experimental/types.py Outdated Show resolved Hide resolved


@DeveloperAPI
class _TorchTensorSerializer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding unit test for this API?

python/ray/dag/experimental/types.py Outdated Show resolved Hide resolved

@pytest.mark.parametrize("use_gpu", [False, True])
def test_torch_tensor_p2p(ray_start_regular_shared, use_gpu):
if use_gpu and sum(node["Resources"].get("GPU", 0) for node in ray.nodes()) < 1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it actually run in this case? I don't know if we have an instance that runs on gpu?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it doesn't run right now, but I tested manually. Will probably revisit this later to run tests on GPU machines.

def __init__(self, device: "torch.device"):
self.device = device

@staticmethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it static method to be compatible with register_serializer?

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved

self.downstream_node_idxs = set()
self.output_channel = None

self.output_wrapper_fn = None
if self.dag_node.type_hint is not None:
if isinstance(self.dag_node.type_hint, TorchTensorType):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonder if we can detect torch tensor automatically here? and if it is on gpu we just wrap it to this wrapper?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to preserve existing serialization methods for torch.Tensor since this acts a bit differently from normal serialization; it automatically sets the device on the receiving end.

There is probably a nicer way to do this, but for now this is the easiest and probably doesn't affect performance.

Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
x
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
@stephanie-wang stephanie-wang merged commit bbcdc49 into ray-project:master May 3, 2024
5 checks passed
@stephanie-wang stephanie-wang deleted the dag-gpu-channels branch May 3, 2024 20:31
harborn pushed a commit to harborn/ray that referenced this pull request May 8, 2024
…-project#44825)

This PR adds support for passing torch.Tensors to local actors in an
accelerated DAG, via Ray's shared memory store. It supports the
following transfer cases, as long as the sending and receiving actors
are on the same node: CPU-CPU, CPU-GPU, GPU-CPU, GPU-GPU (via CPU).

This iteration requires the user to explicitly declare which DAG nodes
contain torch.Tensors and the tensors' shape and dtype, with a new
`with_type_hint` decorator. For example:

```python
    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(SHAPE, DTYPE))
        dag = receiver.recv.bind(dag)

    compiled_dag = dag.experimental_compile()
```
This declaration isn't necessarily useful for this PR, but it is
included now because it makes it much simpler to efficiently support
other cases in the future, such as p2p GPU-GPU transfers.

When a TorchTensor node is declared, the serialization of the underlying
torch.Tensor is performed differently from vanilla Ray. In particular,
we store the numpy view of the data. On the receiving actor, we
deserialize to a torch.Tensor and move it to the device assigned to the
actor, if any. Microbenchmarking shows that this is 4x faster than
normal pickling and unpickling of a torch.Tensor, likely due to Ray's
serialization support for numpy. Also, when moving the torch.Tensor to a
GPU on the receiving side, we can avoid one extra data copy by copying
directly from Ray's shared memory buffer to GPU memory.

Limitations:
- Only supports tasks that directly return a torch.Tensor, i.e. the
torch.Tensor cannot be nested in other data.
- The task must declare the shape and dtype of its torch.Tensor at DAG
compile time.
- Does not support local p2p GPU-GPU transfer, either using `cudaMemCpy`
or NCCL. Microbenchmark shows this can be >10x faster than transfer via
CPU.
- Does not support multinode GPU-GPU transfer, e.g., via RPC between
hosts or NCCL.

---------

Signed-off-by: Stephanie Wang <[email protected]>
peytondmurray pushed a commit to peytondmurray/ray that referenced this pull request May 13, 2024
…-project#44825)

This PR adds support for passing torch.Tensors to local actors in an
accelerated DAG, via Ray's shared memory store. It supports the
following transfer cases, as long as the sending and receiving actors
are on the same node: CPU-CPU, CPU-GPU, GPU-CPU, GPU-GPU (via CPU).

This iteration requires the user to explicitly declare which DAG nodes
contain torch.Tensors and the tensors' shape and dtype, with a new
`with_type_hint` decorator. For example:

```python
    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(SHAPE, DTYPE))
        dag = receiver.recv.bind(dag)

    compiled_dag = dag.experimental_compile()
```
This declaration isn't necessarily useful for this PR, but it is
included now because it makes it much simpler to efficiently support
other cases in the future, such as p2p GPU-GPU transfers.

When a TorchTensor node is declared, the serialization of the underlying
torch.Tensor is performed differently from vanilla Ray. In particular,
we store the numpy view of the data. On the receiving actor, we
deserialize to a torch.Tensor and move it to the device assigned to the
actor, if any. Microbenchmarking shows that this is 4x faster than
normal pickling and unpickling of a torch.Tensor, likely due to Ray's
serialization support for numpy. Also, when moving the torch.Tensor to a
GPU on the receiving side, we can avoid one extra data copy by copying
directly from Ray's shared memory buffer to GPU memory.

Limitations:
- Only supports tasks that directly return a torch.Tensor, i.e. the
torch.Tensor cannot be nested in other data.
- The task must declare the shape and dtype of its torch.Tensor at DAG
compile time.
- Does not support local p2p GPU-GPU transfer, either using `cudaMemCpy`
or NCCL. Microbenchmark shows this can be >10x faster than transfer via
CPU.
- Does not support multinode GPU-GPU transfer, e.g., via RPC between
hosts or NCCL.

---------

Signed-off-by: Stephanie Wang <[email protected]>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 6, 2024
…-project#44825)

This PR adds support for passing torch.Tensors to local actors in an
accelerated DAG, via Ray's shared memory store. It supports the
following transfer cases, as long as the sending and receiving actors
are on the same node: CPU-CPU, CPU-GPU, GPU-CPU, GPU-GPU (via CPU).

This iteration requires the user to explicitly declare which DAG nodes
contain torch.Tensors and the tensors' shape and dtype, with a new
`with_type_hint` decorator. For example:

```python
    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(SHAPE, DTYPE))
        dag = receiver.recv.bind(dag)

    compiled_dag = dag.experimental_compile()
```
This declaration isn't necessarily useful for this PR, but it is
included now because it makes it much simpler to efficiently support
other cases in the future, such as p2p GPU-GPU transfers.

When a TorchTensor node is declared, the serialization of the underlying
torch.Tensor is performed differently from vanilla Ray. In particular,
we store the numpy view of the data. On the receiving actor, we
deserialize to a torch.Tensor and move it to the device assigned to the
actor, if any. Microbenchmarking shows that this is 4x faster than
normal pickling and unpickling of a torch.Tensor, likely due to Ray's
serialization support for numpy. Also, when moving the torch.Tensor to a
GPU on the receiving side, we can avoid one extra data copy by copying
directly from Ray's shared memory buffer to GPU memory.

Limitations:
- Only supports tasks that directly return a torch.Tensor, i.e. the
torch.Tensor cannot be nested in other data.
- The task must declare the shape and dtype of its torch.Tensor at DAG
compile time.
- Does not support local p2p GPU-GPU transfer, either using `cudaMemCpy`
or NCCL. Microbenchmark shows this can be >10x faster than transfer via
CPU.
- Does not support multinode GPU-GPU transfer, e.g., via RPC between
hosts or NCCL.

---------

Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Ryan O'Leary <[email protected]>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 7, 2024
…-project#44825)

This PR adds support for passing torch.Tensors to local actors in an
accelerated DAG, via Ray's shared memory store. It supports the
following transfer cases, as long as the sending and receiving actors
are on the same node: CPU-CPU, CPU-GPU, GPU-CPU, GPU-GPU (via CPU).

This iteration requires the user to explicitly declare which DAG nodes
contain torch.Tensors and the tensors' shape and dtype, with a new
`with_type_hint` decorator. For example:

```python
    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(SHAPE, DTYPE))
        dag = receiver.recv.bind(dag)

    compiled_dag = dag.experimental_compile()
```
This declaration isn't necessarily useful for this PR, but it is
included now because it makes it much simpler to efficiently support
other cases in the future, such as p2p GPU-GPU transfers.

When a TorchTensor node is declared, the serialization of the underlying
torch.Tensor is performed differently from vanilla Ray. In particular,
we store the numpy view of the data. On the receiving actor, we
deserialize to a torch.Tensor and move it to the device assigned to the
actor, if any. Microbenchmarking shows that this is 4x faster than
normal pickling and unpickling of a torch.Tensor, likely due to Ray's
serialization support for numpy. Also, when moving the torch.Tensor to a
GPU on the receiving side, we can avoid one extra data copy by copying
directly from Ray's shared memory buffer to GPU memory.

Limitations:
- Only supports tasks that directly return a torch.Tensor, i.e. the
torch.Tensor cannot be nested in other data.
- The task must declare the shape and dtype of its torch.Tensor at DAG
compile time.
- Does not support local p2p GPU-GPU transfer, either using `cudaMemCpy`
or NCCL. Microbenchmark shows this can be >10x faster than transfer via
CPU.
- Does not support multinode GPU-GPU transfer, e.g., via RPC between
hosts or NCCL.

---------

Signed-off-by: Stephanie Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants