-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core][experimental] Pass torch.Tensors through accelerated DAGs #44825
[core][experimental] Pass torch.Tensors through accelerated DAGs #44825
Conversation
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
test_torch_tensor_dag.py
Outdated
# Test torch.Tensor sent between actors. | ||
with InputNode() as inp: | ||
dag = sender.send.bind(shape, dtype, inp) | ||
dag = TorchTensor(dag, shape, dtype) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding a wrapper DAG node, I'm thinking it may be advantageous to set typing information on the node instead to define the static shape
and dtype
. This would be just one case of adding more static typing information to the DAG, e.g., could imagine size
as a generally useful attribute too.
Syntactically this could make the code look like this instead:
dag = sender.send.bind(shape, dtype, inp) \
.with_output_type(TorchTensorType(shape, type))
dag = receiver.recv.bind(dag)
I haven't looked closely at how this would change the backend implementation but I think this could simplify that as well by removing special cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this but the one issue with that API is that it's not very clean for tasks that return multiple values. This isn't supported yet but I suspect we will need to support it eventually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although yes, we could set use that kind of dot syntax instead of a wrapper class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, eventually how would you support that? If the return type is arbitrarily complex then you probably need to handle it at runtime in a custom serialization hook instead of in the DAG. Or, the type can become complex as well.
Not sure if runtime handling is sufficient to setup accelerated transfer in all cases though, or if you need to know the type information statically ahead of time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For multiple return values, we can use the normal Ray num_returns
syntax.
But yeah, the type annotations will get unwieldy for nested tensors. What I'm thinking is that for cases where the user knows the shape and dtype ahead of time, they should use the static annotation and that will guarantee 0 control plane overhead from accelerated DAGs.
If they don't know it ahead of time, we can do the custom serialization hook (we just have to make sure to do it only for DAG outputs, not for other objects serialized by the user). The serialization hook would pass the type metadata through a normal channel and start the accelerated transfer separately. It'll be slower because we need to send the type metadata synchronously, but it will be much easier to program against. With the current PR, it'd also be easy for the user to add their own type hints to the task return values, by wrapping a returned torch.Tensor with the _TorchTensorWrapper
(currently a developer API).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. I feel like maybe annotating the function is actually the best way to go long run, like @ray.remote(return_types=[...])
, but any interim state between that and the nodes is probably fine.
I do think that using type annotations instead of a separate node wrapper class will clean up the internal code though (avoid "tensor" special cases and rewriting).
test_torch_tensor_dag.py
Outdated
return torch.ones(shape, dtype=dtype, device=self.device) * value | ||
|
||
def recv(self, tensor): | ||
assert tensor.device == self.device |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would the multi-device case be handled in the DAG declaration? I imagine it would have to be something you also include as part of the DAG in order to set up the right deserialization processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was thinking to expose a call on the actor that you can use to set the default device.
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Strangely, this commit caused the performance of CPU-CPU tensor transfer to drop from 1.29GB/s to 400 MB/s. I have no idea why; the commit doesn't seem like it should have affected anything and from what I can tell, the serialization/deserialization code wasn't affected. Will file an issue after this PR is merged... |
Signed-off-by: Stephanie Wang <[email protected]>
@@ -59,6 +79,8 @@ def do_exec_compiled_task( | |||
the loop. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment for output_wrapper_fn
|
||
|
||
@pytest.mark.parametrize("use_gpu", [False, True]) | ||
def test_torch_tensor_p2p(ray_start_regular_shared, use_gpu): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to split this test into multiple tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's okay either way, I just used the same test to reduce duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm type_hint thing seems like not a good UX. What's the exact limitation now? I assume it is that we need to know if the output is in gpu tensor ahead of time to decide the transport?
I think it is okay for the initial version, but I feel like it is pretty bad UX, so we should probably discuss how to get around this...
Other than this most other comments are nits
|
||
|
||
@DeveloperAPI | ||
class _TorchTensorSerializer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding unit test for this API?
|
||
@pytest.mark.parametrize("use_gpu", [False, True]) | ||
def test_torch_tensor_p2p(ray_start_regular_shared, use_gpu): | ||
if use_gpu and sum(node["Resources"].get("GPU", 0) for node in ray.nodes()) < 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it actually run in this case? I don't know if we have an instance that runs on gpu?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it doesn't run right now, but I tested manually. Will probably revisit this later to run tests on GPU machines.
def __init__(self, device: "torch.device"): | ||
self.device = device | ||
|
||
@staticmethod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it static method to be compatible with register_serializer?
|
||
self.downstream_node_idxs = set() | ||
self.output_channel = None | ||
|
||
self.output_wrapper_fn = None | ||
if self.dag_node.type_hint is not None: | ||
if isinstance(self.dag_node.type_hint, TorchTensorType): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wonder if we can detect torch tensor automatically here? and if it is on gpu we just wrap it to this wrapper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to preserve existing serialization methods for torch.Tensor since this acts a bit differently from normal serialization; it automatically sets the device on the receiving end.
There is probably a nicer way to do this, but for now this is the easiest and probably doesn't affect performance.
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
… dag-gpu-channels
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
… dag-gpu-channels
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
…-project#44825) This PR adds support for passing torch.Tensors to local actors in an accelerated DAG, via Ray's shared memory store. It supports the following transfer cases, as long as the sending and receiving actors are on the same node: CPU-CPU, CPU-GPU, GPU-CPU, GPU-GPU (via CPU). This iteration requires the user to explicitly declare which DAG nodes contain torch.Tensors and the tensors' shape and dtype, with a new `with_type_hint` decorator. For example: ```python with InputNode() as inp: dag = sender.send.bind(inp) dag = dag.with_type_hint(TorchTensorType(SHAPE, DTYPE)) dag = receiver.recv.bind(dag) compiled_dag = dag.experimental_compile() ``` This declaration isn't necessarily useful for this PR, but it is included now because it makes it much simpler to efficiently support other cases in the future, such as p2p GPU-GPU transfers. When a TorchTensor node is declared, the serialization of the underlying torch.Tensor is performed differently from vanilla Ray. In particular, we store the numpy view of the data. On the receiving actor, we deserialize to a torch.Tensor and move it to the device assigned to the actor, if any. Microbenchmarking shows that this is 4x faster than normal pickling and unpickling of a torch.Tensor, likely due to Ray's serialization support for numpy. Also, when moving the torch.Tensor to a GPU on the receiving side, we can avoid one extra data copy by copying directly from Ray's shared memory buffer to GPU memory. Limitations: - Only supports tasks that directly return a torch.Tensor, i.e. the torch.Tensor cannot be nested in other data. - The task must declare the shape and dtype of its torch.Tensor at DAG compile time. - Does not support local p2p GPU-GPU transfer, either using `cudaMemCpy` or NCCL. Microbenchmark shows this can be >10x faster than transfer via CPU. - Does not support multinode GPU-GPU transfer, e.g., via RPC between hosts or NCCL. --------- Signed-off-by: Stephanie Wang <[email protected]>
…-project#44825) This PR adds support for passing torch.Tensors to local actors in an accelerated DAG, via Ray's shared memory store. It supports the following transfer cases, as long as the sending and receiving actors are on the same node: CPU-CPU, CPU-GPU, GPU-CPU, GPU-GPU (via CPU). This iteration requires the user to explicitly declare which DAG nodes contain torch.Tensors and the tensors' shape and dtype, with a new `with_type_hint` decorator. For example: ```python with InputNode() as inp: dag = sender.send.bind(inp) dag = dag.with_type_hint(TorchTensorType(SHAPE, DTYPE)) dag = receiver.recv.bind(dag) compiled_dag = dag.experimental_compile() ``` This declaration isn't necessarily useful for this PR, but it is included now because it makes it much simpler to efficiently support other cases in the future, such as p2p GPU-GPU transfers. When a TorchTensor node is declared, the serialization of the underlying torch.Tensor is performed differently from vanilla Ray. In particular, we store the numpy view of the data. On the receiving actor, we deserialize to a torch.Tensor and move it to the device assigned to the actor, if any. Microbenchmarking shows that this is 4x faster than normal pickling and unpickling of a torch.Tensor, likely due to Ray's serialization support for numpy. Also, when moving the torch.Tensor to a GPU on the receiving side, we can avoid one extra data copy by copying directly from Ray's shared memory buffer to GPU memory. Limitations: - Only supports tasks that directly return a torch.Tensor, i.e. the torch.Tensor cannot be nested in other data. - The task must declare the shape and dtype of its torch.Tensor at DAG compile time. - Does not support local p2p GPU-GPU transfer, either using `cudaMemCpy` or NCCL. Microbenchmark shows this can be >10x faster than transfer via CPU. - Does not support multinode GPU-GPU transfer, e.g., via RPC between hosts or NCCL. --------- Signed-off-by: Stephanie Wang <[email protected]>
…-project#44825) This PR adds support for passing torch.Tensors to local actors in an accelerated DAG, via Ray's shared memory store. It supports the following transfer cases, as long as the sending and receiving actors are on the same node: CPU-CPU, CPU-GPU, GPU-CPU, GPU-GPU (via CPU). This iteration requires the user to explicitly declare which DAG nodes contain torch.Tensors and the tensors' shape and dtype, with a new `with_type_hint` decorator. For example: ```python with InputNode() as inp: dag = sender.send.bind(inp) dag = dag.with_type_hint(TorchTensorType(SHAPE, DTYPE)) dag = receiver.recv.bind(dag) compiled_dag = dag.experimental_compile() ``` This declaration isn't necessarily useful for this PR, but it is included now because it makes it much simpler to efficiently support other cases in the future, such as p2p GPU-GPU transfers. When a TorchTensor node is declared, the serialization of the underlying torch.Tensor is performed differently from vanilla Ray. In particular, we store the numpy view of the data. On the receiving actor, we deserialize to a torch.Tensor and move it to the device assigned to the actor, if any. Microbenchmarking shows that this is 4x faster than normal pickling and unpickling of a torch.Tensor, likely due to Ray's serialization support for numpy. Also, when moving the torch.Tensor to a GPU on the receiving side, we can avoid one extra data copy by copying directly from Ray's shared memory buffer to GPU memory. Limitations: - Only supports tasks that directly return a torch.Tensor, i.e. the torch.Tensor cannot be nested in other data. - The task must declare the shape and dtype of its torch.Tensor at DAG compile time. - Does not support local p2p GPU-GPU transfer, either using `cudaMemCpy` or NCCL. Microbenchmark shows this can be >10x faster than transfer via CPU. - Does not support multinode GPU-GPU transfer, e.g., via RPC between hosts or NCCL. --------- Signed-off-by: Stephanie Wang <[email protected]> Signed-off-by: Ryan O'Leary <[email protected]>
…-project#44825) This PR adds support for passing torch.Tensors to local actors in an accelerated DAG, via Ray's shared memory store. It supports the following transfer cases, as long as the sending and receiving actors are on the same node: CPU-CPU, CPU-GPU, GPU-CPU, GPU-GPU (via CPU). This iteration requires the user to explicitly declare which DAG nodes contain torch.Tensors and the tensors' shape and dtype, with a new `with_type_hint` decorator. For example: ```python with InputNode() as inp: dag = sender.send.bind(inp) dag = dag.with_type_hint(TorchTensorType(SHAPE, DTYPE)) dag = receiver.recv.bind(dag) compiled_dag = dag.experimental_compile() ``` This declaration isn't necessarily useful for this PR, but it is included now because it makes it much simpler to efficiently support other cases in the future, such as p2p GPU-GPU transfers. When a TorchTensor node is declared, the serialization of the underlying torch.Tensor is performed differently from vanilla Ray. In particular, we store the numpy view of the data. On the receiving actor, we deserialize to a torch.Tensor and move it to the device assigned to the actor, if any. Microbenchmarking shows that this is 4x faster than normal pickling and unpickling of a torch.Tensor, likely due to Ray's serialization support for numpy. Also, when moving the torch.Tensor to a GPU on the receiving side, we can avoid one extra data copy by copying directly from Ray's shared memory buffer to GPU memory. Limitations: - Only supports tasks that directly return a torch.Tensor, i.e. the torch.Tensor cannot be nested in other data. - The task must declare the shape and dtype of its torch.Tensor at DAG compile time. - Does not support local p2p GPU-GPU transfer, either using `cudaMemCpy` or NCCL. Microbenchmark shows this can be >10x faster than transfer via CPU. - Does not support multinode GPU-GPU transfer, e.g., via RPC between hosts or NCCL. --------- Signed-off-by: Stephanie Wang <[email protected]>
Why are these changes needed?
This PR adds support for passing torch.Tensors to local actors in an accelerated DAG, via Ray's shared memory store. It supports the following transfer cases, as long as the sending and receiving actors are on the same node: CPU-CPU, CPU-GPU, GPU-CPU, GPU-GPU (via CPU).
This iteration requires the user to explicitly declare which DAG nodes contain torch.Tensors and the tensors' shape and dtype, with a new
with_type_hint
decorator. For example:This declaration isn't necessarily useful for this PR, but it is included now because it makes it much simpler to efficiently support other cases in the future, such as p2p GPU-GPU transfers.
When a TorchTensor node is declared, the serialization of the underlying torch.Tensor is performed differently from vanilla Ray. In particular, we store the numpy view of the data. On the receiving actor, we deserialize to a torch.Tensor and move it to the device assigned to the actor, if any. Microbenchmarking shows that this is 4x faster than normal pickling and unpickling of a torch.Tensor, likely due to Ray's serialization support for numpy. Also, when moving the torch.Tensor to a GPU on the receiving side, we can avoid one extra data copy by copying directly from Ray's shared memory buffer to GPU memory.
Limitations:
cudaMemCpy
or NCCL. Microbenchmark shows this can be >10x faster than transfer via CPU.