Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][experimental] Support NCCL-based torch.Tensors nested inside Python objects #45473

Merged
merged 100 commits into from
May 26, 2024

Conversation

stephanie-wang
Copy link
Contributor

@stephanie-wang stephanie-wang commented May 21, 2024

Why are these changes needed?

Allows torch.Tensors nested inside Python objects to be transferred via NCCL using the following syntax:

    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
        dag = receiver.recv.bind(dag)

We implement this by using an additional shared memory channel to pass CPU data, with a "nested" NCCL channel to pass the GPU data. Here is the send procedure for the above code:

  1. Serialize the data. Extract out all tensors that are on the GPU and replace them with some kind of placeholder.
  2. Send a list of metadata through the meta_channel.
  3. Send the GPU tensors through the NCCL channel.
  4. Send the rest of the CPU data through a cpu_data_channel, with the placeholders for the GPU tensors.

Note that if the TorchTensorType doesn't have a shape and dtype specified, we currently use the separate meta_channel to pass metadata for the serialized tensors, as introduced in #45332. To elide the cpu_data_channel, the user should now use TorchTensorType(direct_return=True), to indicate that no CPU data is sent along with the GPU data. To elide the meta_channel, the user should declare the shape and dtype, e.g., TorchTensorType(shape=(10, ), dtype=torch.float16).

Related issue number

Closes #45306.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

stephanie-wang and others added 30 commits April 17, 2024 15:56
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
GPU
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Your Name added 2 commits May 21, 2024 11:33
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still reviewing this PR.

python/ray/dag/compiled_dag_node.py Show resolved Hide resolved
@@ -75,6 +76,9 @@ def do_exec_tasks(
Args:
tasks: the executable tasks corresponding to the actor methods.
"""
ctx = ChannelContext.get_current()
self._serialization_ctx = ctx.serialization_context
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Python newbie question: do_exec_tasks is not a class method. What does self refer to here? It seems to refer to an actor based on the following code snippet.

            self.worker_task_refs[
                task.dag_node._get_actor_handle()
            ] = worker_fn.options(concurrency_group="_ray_system").remote(
                do_exec_tasks,
                executable_tasks,
            )

https://github.com/ray-project/ray/blob/master/python/ray/dag/compiled_dag_node.py#L662-L667

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we want to avoid modifying the base ray Actor class, so instead of defining the method do_exec_tasks on the base Actor class, we instead pass it to the generic __ray_call__ method, which takes in a lambda and the lambda args/kwargs.

@@ -669,6 +677,11 @@ def _get_node_id(self):
input_task = self.idx_to_task[self.input_task_idx]
# Register custom serializers for inputs provided to dag.execute().
input_task.dag_node.type_hint.register_custom_serializer()
ctx = ChannelContext.get_current()
self.serialization_ctx = ctx.serialization_context
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused about serialization_ctx. In _exec_task, it appears to be an attribute of the actor, and we use ExecutableTask.output_type_hint.requires_nccl() as the input argument for serialization_ctx.set_use_external_transport. In _get_or_compile, it is an attribute of the class CompiledDAG, and we use CompiledTask.dag_node.type_hint.requires_nccl() as the input argument for serialization_ctx.set_use_external_transport.

I can't understand the scope of serialization_ctx, and when should I use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serialization_ctx is a per-worker context. The reason we also set it as an attribute of CompiledDag is because that object lives on the driver, which runs different code from the actors participating in the DAG.

I cached it as an attribute of the actor/driver for convenience, but can remove this and directly look it up with ChannelContext.get_current().serialization_context for clarity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually for clarity, I'm going to move the call to set_use_external_transport directly into the channels.

@@ -766,13 +779,15 @@ def teardown(self, wait: bool):

for actor in outer.actor_refs:
logger.info(f"Cancelling compiled worker on actor: {actor}")
for actor, tasks in outer.actor_to_executable_tasks.items():
cancel_refs = [
actor.__ray_call__.remote(do_cancel_executable_tasks, tasks)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between

(1) (this PR): Generate a list of cancel refs, and then using a for loop to call ray.get

and

(2)

for actor, tasks in outer.actor_to_executable_tasks.items():
    try:
        # TODO(swang): Suppress exceptions from actors trying to
        # read closed channels when DAG is being torn down.
        ray.get(
            actor.__ray_call__.remote(do_cancel_executable_tasks, tasks)
        )

(2) seems to be blocked by each executable task's cancellation process instead of canceling all of them asynchronously. Is my understanding correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

from ray.experimental.channel.torch_tensor_type import TorchTensorType

if typ is not None and not isinstance(typ, TorchTensorType):
raise ValueError("Contained type must be of type TorchTensorType")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why "Contained type must be of type TorchTensorType"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only other type we have is SharedMemoryType, and it doesn't make sense for a SharedMemoryType to contain a SharedMemoryType. I'll change this to an assert since it should not be user-facing.

self._type_hint = copy.deepcopy(typ)
if typ.is_direct_return:
old_contains_typ = self._type_hint.contains_type
self._type_hint = copy.deepcopy(typ)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._type_hint = copy.deepcopy(typ) or self._type_hint.set_contains_type(typ)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand you... we want to set the type_hint to typ here so we should not use set_contains_type?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't fully understand the logic here, but I noticed that the else block uses self._type_hint.set_contains_type(typ), whereas here we use self._type_hint = copy.deepcopy(typ). That's why I have this question.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seem to understand the logic after rereading this PR. If ChannelOutputType.is_direct_return is True, it means that this type of data does not need an additional channel to transfer some parts of its data (i.e., contains_typ) to a different destination.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right, will add these docs to is_direct_return.

if self.use_external_transport:
# Add the actual tensor to a buffer. The buffer of tensors should
# later be popped by the caller and sent via external transport.
self.tensors.append(tensor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that:
(1) if we use NCCL to transfer data, both GPUs (sender and receiver) can access self.tensors?
(2) if we use NCCL to transfer an integer, we don't need to serialize the integer?"

For the case self.use_external_transport: False, can we transfer an integer from GPU -> CPU -> shared memory, and then have the receiver GPU deserialize it by accessing self.tensors[integer] instead of transferring a numpy array?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand you. We never send the integer placeholder over NCCL, only tensors. The protocol here is:

(1) Extract list of tensors. Replace with an integer as a placeholder. Each tensor's placeholder is its index into the list of tensors.
(2) On the sending GPU, use NCCL to send the list of tensors, in order.
(3) On the receiving GPU, use NCCL to receive the list of tensors, in order.
(4) Deserialize the rest of the data. If we see an integer placeholder, replace it with the corresponding received tensor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand how serialization/deserialization works after chatting with @rynewang. I will write it down and check with you tomorrow to see if my understanding is correct.

)
raise TypeError(msg) from e

tensors_to_send = self.serialization_ctx.reset_tensors([])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR:

The ChannelOutputType calls ray.util.serialization.register_serializer to register the serializer / deserializer. When / Where are the serializer / deserializer used?

ray.util.serialization.register_serializer(
    torch.Tensor,
    serializer=serialize,
    deserializer=deserialize,
)

The tensors appear to be serialized and stored in serialization_ctx.tensors before this write function is called. We serialize CPU data in this function, but serialize tensors in other places. Why do we choose to serialize them in different locations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

register_serializer registers a custom serializer for any objects that match the given type. So when we call the ray serializer with self._worker.get_serialization_context().serialize(value), we will trigger the custom deserialize function for any torch.Tensors found in value. That will replace the tensors with the placeholders and add them to serialization_ctx.tensors.

):
if isinstance(tensor, ray.exceptions.RayTaskError):
if isinstance(tensors, ray.exceptions.RayTaskError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python newbie question:

Is it possible for isinstance(tensors, ray.exceptions.RayTaskError) to be True if the type is Union["torch.Tensor", List["torch.Tensor"]]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type here is just a hint so yes it is possible. I'll add Exception to the possible types though.

tensors = self._gpu_data_channel.begin_read()

if self._gpu_data_channel.has_static_type():
# If the channel was declared with a static TorchTensorType, then
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the channel was declared with a static TorchTensorType, then the task is allowed to return at most one tensor

Would you mind giving me a code reference for this limitation? I don't see TorchTensorNcclChannel.begin_read calls the function has_static_type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you asking how we enforce that only one tensor is sent? It is here. The writer will check that only one tensor is sent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I saw it

            if self.has_static_type():
                # Make sure that there is exactly one tensor to send, and its
                # metadata should have matched the static type.
                if meta_list != [None]:
                    raise ValueError(
                        "DAGNode annotated with "
                        "TorchTensorType(shape=shape, dtype=dtype))` can return at "
                        "most one tensor with the declared `shape` and `dtype`. "
                        "Use TorchTensorType() if value contains more than one "
                        "tensor or tensor of dynamic size."
                    )

Your Name added 3 commits May 22, 2024 14:56
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>


@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
def test_torch_tensor_nccl_nested_dynamic(ray_start_regular):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to understand the difference between test_torch_tensor_nccl_dynamic and test_torch_tensor_nccl_nested_dynamic. The only difference between them is direct_return. My understanding is:

  • In test_torch_tensor_nccl_dynamic,

    • Because direct_return is set to True, it doesn't have any contains_type.
    • Because transport is set to "nccl", it will create a TorchTensorNcclChannel instead of a shared memory channel.
  • In test_torch_tensor_nccl_nested_dynamic,

    • Because direct_return is set to False, the DAGNode calls DAGNode._type_hint.set_contains_type(TorchTensorType).
    • [Question] What's the type of _type_hint? It seems to be ChannelOutputType (code), and ChannelOutputType's create_channel is not implemented (code).
      • I guess the expected result of this test is that a NestedTorchTensorNcclChannel should be created, but I didn't know how does it create because of the above question.

Copy link
Contributor Author

@stephanie-wang stephanie-wang May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is all correct.

ChannelOutputType is an interface type and the subclasses are meant to implement create_channel. The default type for DAG node is a SharedMemoryType. SharedMemoryType.create_channel creates a NestedTorchTensorNcclChannel in this PR if it contains a TorchTensorType.

@kevin85421 kevin85421 changed the title [core][experimental] Support NCCL-based torch.Tensors nested inside Python objects Review [core][experimental] Support NCCL-based torch.Tensors nested inside Python objects May 23, 2024
@kevin85421 kevin85421 changed the title Review [core][experimental] Support NCCL-based torch.Tensors nested inside Python objects [core][experimental] Support NCCL-based torch.Tensors nested inside Python objects May 23, 2024
@kevin85421 kevin85421 changed the title [core][experimental] Support NCCL-based torch.Tensors nested inside Python objects Review [core][experimental] Support NCCL-based torch.Tensors nested inside Python objects May 23, 2024
@kevin85421 kevin85421 changed the title Review [core][experimental] Support NCCL-based torch.Tensors nested inside Python objects [core][experimental] Support NCCL-based torch.Tensors nested inside Python objects May 23, 2024
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good! So iiuc,

  • by default, it can handle nested tensors (and dynamic shape/type)
  • But it has perf impact. If you know you return tensor directly, set diretc_returns=True and it will improve performance

^ is this correct?

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
raise TypeError(msg) from e
finally:
# Pop the tensors that were found during serialization of `value`.
tensors_to_send = self.serialization_ctx.reset_tensors([])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means the channel is not thread-safe anymore. Can you make sure to comment that to the docstring?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will do. We could make the serialization context thread-local to avoid that in the future.

@@ -58,6 +59,10 @@ def __init__(
shape; if it does not match, the task will error.
dtype: The expected dtype of the torch.Tensor. Similar to the
shape, this may be statically or dynamically declared.
direct_return: Whether the tensor is sent directly or inside of
other metadata. For GPU-GPU channels, this allows the sender
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a little leaky abstraction to me. what's the benefit of removing additional channel? Maybe we should just mention the benefits? (is it improving the perf? Reducing memory overhead?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is for performance, will update the comment.

self._gpu_data_channel: TorchTensorNcclChannel = (
gpu_data_typ.create_channel(writer, readers)
)
self._cpu_data_channel: Optional["Channel"] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we always just create cpu channel for gpu (and remove this nested nccl channel)? Does it affect the performance drastically?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my understanding, the GPU channel (i.e., TorchTensorNcclChannel) already has two channels:

  • NCCL channel: used to transfer tensors.
  • CPU channel: used to transfer the tensor's metadata (i.e., shape).

The additional CPU channel here is used to transfer CPU data, which is not the tensor's metadata. Hence, we have two CPU channels, one for tensor's metadata and one for other CPU data.

Does it affect the performance drastically?

To the best of my knowledge, the answer is no.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the performance hit for dynamic shape compared to NCCL is about 2x. On top of that, the additional CPU channel for nested tensors is about 1.5x. This is a bit higher than expected, though (see #45319).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the performance hit for dynamic shape compared to NCCL is about 2x.

Do you mean

  • Case 1: Send tensors via NCCL channel (only tensors, no tensor's metadata)
  • Case 2:
    • Send tensors via NCCL channel
    • Send metadata via meta_channel (CPU channel)
    • Send other CPU data via cpu_data_channel (CPU channel)

(Case 1 performance) / (Case 2 performance) = 2?

On top of that, the additional CPU channel for nested tensors is about 1.5x.

Do you mean

  • Case 1:
    • Send tensors via NCCL channel
    • Send metadata and other CPU data via meta_channel (CPU channel)
  • Case 2:
    • Send tensors via NCCL channel
    • Send metadata via meta_channel (CPU channel)
    • Send other CPU data via cpu_data_channel (CPU channel)

(Case 1 performance) / (Case 2 performance) = 1.5?

Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first one, the comparison is between these:

Case 1: Send tensors via NCCL channel (only tensors, no tensor's metadata)
Case 2:

    Send tensors via NCCL channel
    Send metadata via meta_channel (CPU channel)

The second one is right.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Case 1: Send tensors via NCCL channel (only tensors, no tensor's metadata)
Case 2:

  • Send tensors via NCCL channel
  • Send metadata via meta_channel (CPU channel)

In this case, (Case 1 performance) / (Case 2 performance) = 2. I am wondering that does it mean that:

(1)
2 * Time(Send tensors via NCCL channel) = Time(Send tensors via NCCL channel) + Time(Send metadata via meta_channel)

or

(2)
2 * Time(Send tensors via NCCL channel) = Max(Time(Send tensors via NCCL channel), Time(Send metadata via meta_channel))

It seems to be (1) because the call is not async. Is my understanding correct?

self._meta_channel.write(meta)
raise tensors

if isinstance(tensors, list):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: make tensors = [tensors] and always use this branch instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm prefer to keep it this way because sometimes we send a list of metadata and sometimes we send just one.

@@ -58,6 +59,10 @@ def __init__(
shape; if it does not match, the task will error.
dtype: The expected dtype of the torch.Tensor. Similar to the
shape, this may be statically or dynamically declared.
direct_return: Whether the tensor is sent directly or inside of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: do we have a test that raises an exception when direct_return = True but it actually doesn't do it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm we have one for DAG inputs but I think not for outputs sent between actors. Let me add.

self._contains_type.register_custom_serializer()

@property
def is_direct_return(self) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it expensive to figure this out automatically>? For example,

  • if the return type is tensor (with a type hint tensor), it is True
  • If not, we can set it False automatically
    ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python doesn't guarantee that if it's true, it will always be true :( Also, when we use direct_return=True, the only thing we can do is loop on a NCCL channel. We might be able to figure out a way to send some control message along the NCCL channel, but it seems a bit complicated for now.

@MicroCheck //python/ray/dag

Signed-off-by: Stephanie Wang <[email protected]>
@stephanie-wang stephanie-wang added the go add ONLY when ready to merge, run all tests label May 25, 2024
@stephanie-wang stephanie-wang merged commit 93dce13 into ray-project:master May 26, 2024
5 of 7 checks passed
@stephanie-wang stephanie-wang deleted the nested-tensors branch May 26, 2024 03:56
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 6, 2024
…ython objects (ray-project#45473)

Allows torch.Tensors nested inside Python objects to be transferred via
NCCL using the following syntax:

```python
    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
        dag = receiver.recv.bind(dag)
```

We implement this by using an additional shared memory channel to pass
CPU data, with a "nested" NCCL channel to pass the GPU data. Here is the
send procedure for the above code:
1. Serialize the data. Extract out all tensors that are on the GPU and
replace them with some kind of placeholder.
2. Send a list of metadata through the meta_channel.
3. Send the GPU tensors through the NCCL channel.
4. Send the rest of the CPU data through a cpu_data_channel, with the
placeholders for the GPU tensors.

Note that if the TorchTensorType doesn't have a shape and dtype
specified, we currently use the separate meta_channel to pass metadata
for the serialized tensors, as introduced in ray-project#45332. To elide the
cpu_data_channel, the user should now use
`TorchTensorType(direct_return=True)`, to indicate that no CPU data is
sent along with the GPU data. To elide the meta_channel, the user should
declare the shape and dtype, e.g., `TorchTensorType(shape=(10, ),
dtype=torch.float16)`.

## Related issue number

Closes ray-project#45306.

---------

Signed-off-by: Stephanie Wang <[email protected]>
Co-authored-by: SangBin Cho <[email protected]>
Signed-off-by: Ryan O'Leary <[email protected]>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 6, 2024
…ython objects (ray-project#45473)

Allows torch.Tensors nested inside Python objects to be transferred via
NCCL using the following syntax:

```python
    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
        dag = receiver.recv.bind(dag)
```

We implement this by using an additional shared memory channel to pass
CPU data, with a "nested" NCCL channel to pass the GPU data. Here is the
send procedure for the above code:
1. Serialize the data. Extract out all tensors that are on the GPU and
replace them with some kind of placeholder.
2. Send a list of metadata through the meta_channel.
3. Send the GPU tensors through the NCCL channel.
4. Send the rest of the CPU data through a cpu_data_channel, with the
placeholders for the GPU tensors.

Note that if the TorchTensorType doesn't have a shape and dtype
specified, we currently use the separate meta_channel to pass metadata
for the serialized tensors, as introduced in ray-project#45332. To elide the
cpu_data_channel, the user should now use
`TorchTensorType(direct_return=True)`, to indicate that no CPU data is
sent along with the GPU data. To elide the meta_channel, the user should
declare the shape and dtype, e.g., `TorchTensorType(shape=(10, ),
dtype=torch.float16)`.

## Related issue number

Closes ray-project#45306.

---------

Signed-off-by: Stephanie Wang <[email protected]>
Co-authored-by: SangBin Cho <[email protected]>
Signed-off-by: Ryan O'Leary <[email protected]>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 7, 2024
…ython objects (ray-project#45473)

Allows torch.Tensors nested inside Python objects to be transferred via
NCCL using the following syntax:

```python
    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
        dag = receiver.recv.bind(dag)
```

We implement this by using an additional shared memory channel to pass
CPU data, with a "nested" NCCL channel to pass the GPU data. Here is the
send procedure for the above code:
1. Serialize the data. Extract out all tensors that are on the GPU and
replace them with some kind of placeholder.
2. Send a list of metadata through the meta_channel.
3. Send the GPU tensors through the NCCL channel.
4. Send the rest of the CPU data through a cpu_data_channel, with the
placeholders for the GPU tensors.

Note that if the TorchTensorType doesn't have a shape and dtype
specified, we currently use the separate meta_channel to pass metadata
for the serialized tensors, as introduced in ray-project#45332. To elide the
cpu_data_channel, the user should now use
`TorchTensorType(direct_return=True)`, to indicate that no CPU data is
sent along with the GPU data. To elide the meta_channel, the user should
declare the shape and dtype, e.g., `TorchTensorType(shape=(10, ),
dtype=torch.float16)`.

## Related issue number

Closes ray-project#45306.

---------

Signed-off-by: Stephanie Wang <[email protected]>
Co-authored-by: SangBin Cho <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[core][experimental] Support nested and dynamically sized GPU tensors via NCCL
4 participants