-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core][experimental] Support NCCL-based torch.Tensors nested inside Python objects #45473
[core][experimental] Support NCCL-based torch.Tensors nested inside Python objects #45473
Conversation
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
… dag-gpu-channels
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still reviewing this PR.
python/ray/dag/compiled_dag_node.py
Outdated
@@ -75,6 +76,9 @@ def do_exec_tasks( | |||
Args: | |||
tasks: the executable tasks corresponding to the actor methods. | |||
""" | |||
ctx = ChannelContext.get_current() | |||
self._serialization_ctx = ctx.serialization_context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A Python newbie question: do_exec_tasks
is not a class method. What does self
refer to here? It seems to refer to an actor based on the following code snippet.
self.worker_task_refs[
task.dag_node._get_actor_handle()
] = worker_fn.options(concurrency_group="_ray_system").remote(
do_exec_tasks,
executable_tasks,
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we want to avoid modifying the base ray Actor class, so instead of defining the method do_exec_tasks
on the base Actor class, we instead pass it to the generic __ray_call__
method, which takes in a lambda and the lambda args/kwargs.
python/ray/dag/compiled_dag_node.py
Outdated
@@ -669,6 +677,11 @@ def _get_node_id(self): | |||
input_task = self.idx_to_task[self.input_task_idx] | |||
# Register custom serializers for inputs provided to dag.execute(). | |||
input_task.dag_node.type_hint.register_custom_serializer() | |||
ctx = ChannelContext.get_current() | |||
self.serialization_ctx = ctx.serialization_context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit confused about serialization_ctx
. In _exec_task
, it appears to be an attribute of the actor, and we use ExecutableTask.output_type_hint.requires_nccl()
as the input argument for serialization_ctx.set_use_external_transport
. In _get_or_compile
, it is an attribute of the class CompiledDAG
, and we use CompiledTask.dag_node.type_hint.requires_nccl()
as the input argument for serialization_ctx.set_use_external_transport
.
I can't understand the scope of serialization_ctx
, and when should I use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
serialization_ctx
is a per-worker context. The reason we also set it as an attribute of CompiledDag
is because that object lives on the driver, which runs different code from the actors participating in the DAG.
I cached it as an attribute of the actor/driver for convenience, but can remove this and directly look it up with ChannelContext.get_current().serialization_context
for clarity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually for clarity, I'm going to move the call to set_use_external_transport
directly into the channels.
@@ -766,13 +779,15 @@ def teardown(self, wait: bool): | |||
|
|||
for actor in outer.actor_refs: | |||
logger.info(f"Cancelling compiled worker on actor: {actor}") | |||
for actor, tasks in outer.actor_to_executable_tasks.items(): | |||
cancel_refs = [ | |||
actor.__ray_call__.remote(do_cancel_executable_tasks, tasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between
(1) (this PR): Generate a list of cancel refs, and then using a for loop to call ray.get
and
(2)
for actor, tasks in outer.actor_to_executable_tasks.items():
try:
# TODO(swang): Suppress exceptions from actors trying to
# read closed channels when DAG is being torn down.
ray.get(
actor.__ray_call__.remote(do_cancel_executable_tasks, tasks)
)
(2) seems to be blocked by each executable task's cancellation process instead of canceling all of them asynchronously. Is my understanding correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
from ray.experimental.channel.torch_tensor_type import TorchTensorType | ||
|
||
if typ is not None and not isinstance(typ, TorchTensorType): | ||
raise ValueError("Contained type must be of type TorchTensorType") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why "Contained type must be of type TorchTensorType"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only other type we have is SharedMemoryType, and it doesn't make sense for a SharedMemoryType to contain a SharedMemoryType. I'll change this to an assert since it should not be user-facing.
self._type_hint = copy.deepcopy(typ) | ||
if typ.is_direct_return: | ||
old_contains_typ = self._type_hint.contains_type | ||
self._type_hint = copy.deepcopy(typ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self._type_hint = copy.deepcopy(typ)
or self._type_hint.set_contains_type(typ)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand you... we want to set the type_hint to typ
here so we should not use set_contains_type
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't fully understand the logic here, but I noticed that the else
block uses self._type_hint.set_contains_type(typ)
, whereas here we use self._type_hint = copy.deepcopy(typ)
. That's why I have this question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I seem to understand the logic after rereading this PR. If ChannelOutputType.is_direct_return
is True, it means that this type of data does not need an additional channel to transfer some parts of its data (i.e., contains_typ
) to a different destination.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's right, will add these docs to is_direct_return
.
if self.use_external_transport: | ||
# Add the actual tensor to a buffer. The buffer of tensors should | ||
# later be popped by the caller and sent via external transport. | ||
self.tensors.append(tensor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean that:
(1) if we use NCCL to transfer data, both GPUs (sender and receiver) can access self.tensors
?
(2) if we use NCCL to transfer an integer, we don't need to serialize the integer?"
For the case self.use_external_transport: False
, can we transfer an integer from GPU -> CPU -> shared memory
, and then have the receiver GPU deserialize it by accessing self.tensors[integer]
instead of transferring a numpy array?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand you. We never send the integer placeholder over NCCL, only tensors. The protocol here is:
(1) Extract list of tensors. Replace with an integer as a placeholder. Each tensor's placeholder is its index into the list of tensors.
(2) On the sending GPU, use NCCL to send the list of tensors, in order.
(3) On the receiving GPU, use NCCL to receive the list of tensors, in order.
(4) Deserialize the rest of the data. If we see an integer placeholder, replace it with the corresponding received tensor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand how serialization/deserialization works after chatting with @rynewang. I will write it down and check with you tomorrow to see if my understanding is correct.
) | ||
raise TypeError(msg) from e | ||
|
||
tensors_to_send = self.serialization_ctx.reset_tensors([]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR:
The ChannelOutputType calls ray.util.serialization.register_serializer
to register the serializer / deserializer. When / Where are the serializer / deserializer used?
ray.util.serialization.register_serializer(
torch.Tensor,
serializer=serialize,
deserializer=deserialize,
)
The tensors appear to be serialized and stored in serialization_ctx.tensors
before this write function is called. We serialize CPU data in this function, but serialize tensors in other places. Why do we choose to serialize them in different locations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
register_serializer
registers a custom serializer for any objects that match the given type. So when we call the ray serializer with self._worker.get_serialization_context().serialize(value)
, we will trigger the custom deserialize
function for any torch.Tensors found in value
. That will replace the tensors with the placeholders and add them to serialization_ctx.tensors
.
): | ||
if isinstance(tensor, ray.exceptions.RayTaskError): | ||
if isinstance(tensors, ray.exceptions.RayTaskError): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Python newbie question:
Is it possible for isinstance(tensors, ray.exceptions.RayTaskError)
to be True if the type is Union["torch.Tensor", List["torch.Tensor"]]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type here is just a hint so yes it is possible. I'll add Exception to the possible types though.
tensors = self._gpu_data_channel.begin_read() | ||
|
||
if self._gpu_data_channel.has_static_type(): | ||
# If the channel was declared with a static TorchTensorType, then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the channel was declared with a static TorchTensorType, then the task is allowed to return at most one tensor
Would you mind giving me a code reference for this limitation? I don't see TorchTensorNcclChannel.begin_read
calls the function has_static_type
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you asking how we enforce that only one tensor is sent? It is here. The writer will check that only one tensor is sent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I saw it
if self.has_static_type():
# Make sure that there is exactly one tensor to send, and its
# metadata should have matched the static type.
if meta_list != [None]:
raise ValueError(
"DAGNode annotated with "
"TorchTensorType(shape=shape, dtype=dtype))` can return at "
"most one tensor with the declared `shape` and `dtype`. "
"Use TorchTensorType() if value contains more than one "
"tensor or tensor of dynamic size."
)
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
Signed-off-by: Your Name <[email protected]>
|
||
|
||
@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) | ||
def test_torch_tensor_nccl_nested_dynamic(ray_start_regular): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am trying to understand the difference between test_torch_tensor_nccl_dynamic
and test_torch_tensor_nccl_nested_dynamic
. The only difference between them is direct_return
. My understanding is:
-
In
test_torch_tensor_nccl_dynamic
,- Because
direct_return
is set to True, it doesn't have anycontains_type
. - Because
transport
is set to"nccl"
, it will create aTorchTensorNcclChannel
instead of a shared memory channel.
- Because
-
In
test_torch_tensor_nccl_nested_dynamic
,- Because
direct_return
is set to False, the DAGNode callsDAGNode._type_hint.set_contains_type(TorchTensorType)
. - [Question] What's the type of
_type_hint
? It seems to be ChannelOutputType (code), and ChannelOutputType'screate_channel
is not implemented (code).- I guess the expected result of this test is that a
NestedTorchTensorNcclChannel
should be created, but I didn't know how does it create because of the above question.
- I guess the expected result of this test is that a
- Because
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is all correct.
ChannelOutputType is an interface type and the subclasses are meant to implement create_channel
. The default type for DAG node is a SharedMemoryType. SharedMemoryType.create_channel creates a NestedTorchTensorNcclChannel
in this PR if it contains a TorchTensorType.
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good! So iiuc,
- by default, it can handle nested tensors (and dynamic shape/type)
- But it has perf impact. If you know you return tensor directly, set diretc_returns=True and it will improve performance
^ is this correct?
raise TypeError(msg) from e | ||
finally: | ||
# Pop the tensors that were found during serialization of `value`. | ||
tensors_to_send = self.serialization_ctx.reset_tensors([]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means the channel is not thread-safe anymore. Can you make sure to comment that to the docstring?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, will do. We could make the serialization context thread-local to avoid that in the future.
@@ -58,6 +59,10 @@ def __init__( | |||
shape; if it does not match, the task will error. | |||
dtype: The expected dtype of the torch.Tensor. Similar to the | |||
shape, this may be statically or dynamically declared. | |||
direct_return: Whether the tensor is sent directly or inside of | |||
other metadata. For GPU-GPU channels, this allows the sender |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels a little leaky abstraction to me. what's the benefit of removing additional channel? Maybe we should just mention the benefits? (is it improving the perf? Reducing memory overhead?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is for performance, will update the comment.
self._gpu_data_channel: TorchTensorNcclChannel = ( | ||
gpu_data_typ.create_channel(writer, readers) | ||
) | ||
self._cpu_data_channel: Optional["Channel"] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we always just create cpu channel for gpu (and remove this nested nccl channel)? Does it affect the performance drastically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my understanding, the GPU channel (i.e., TorchTensorNcclChannel
) already has two channels:
- NCCL channel: used to transfer tensors.
- CPU channel: used to transfer the tensor's metadata (i.e., shape).
The additional CPU channel here is used to transfer CPU data, which is not the tensor's metadata. Hence, we have two CPU channels, one for tensor's metadata and one for other CPU data.
Does it affect the performance drastically?
To the best of my knowledge, the answer is no.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now the performance hit for dynamic shape compared to NCCL is about 2x. On top of that, the additional CPU channel for nested tensors is about 1.5x. This is a bit higher than expected, though (see #45319).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now the performance hit for dynamic shape compared to NCCL is about 2x.
Do you mean
- Case 1: Send tensors via NCCL channel (only tensors, no tensor's metadata)
- Case 2:
- Send tensors via NCCL channel
- Send metadata via
meta_channel
(CPU channel) - Send other CPU data via
cpu_data_channel
(CPU channel)
(Case 1 performance) / (Case 2 performance) = 2?
On top of that, the additional CPU channel for nested tensors is about 1.5x.
Do you mean
- Case 1:
- Send tensors via NCCL channel
- Send metadata and other CPU data via
meta_channel
(CPU channel)
- Case 2:
- Send tensors via NCCL channel
- Send metadata via
meta_channel
(CPU channel) - Send other CPU data via
cpu_data_channel
(CPU channel)
(Case 1 performance) / (Case 2 performance) = 1.5?
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the first one, the comparison is between these:
Case 1: Send tensors via NCCL channel (only tensors, no tensor's metadata)
Case 2:
Send tensors via NCCL channel
Send metadata via meta_channel (CPU channel)
The second one is right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Case 1: Send tensors via NCCL channel (only tensors, no tensor's metadata)
Case 2:
- Send tensors via NCCL channel
- Send metadata via meta_channel (CPU channel)
In this case, (Case 1 performance) / (Case 2 performance) = 2. I am wondering that does it mean that:
(1)
2 * Time(Send tensors via NCCL channel) = Time(Send tensors via NCCL channel) + Time(Send metadata via meta_channel)
or
(2)
2 * Time(Send tensors via NCCL channel) = Max(Time(Send tensors via NCCL channel), Time(Send metadata via meta_channel))
It seems to be (1) because the call is not async. Is my understanding correct?
self._meta_channel.write(meta) | ||
raise tensors | ||
|
||
if isinstance(tensors, list): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: make tensors = [tensors] and always use this branch instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm prefer to keep it this way because sometimes we send a list of metadata and sometimes we send just one.
@@ -58,6 +59,10 @@ def __init__( | |||
shape; if it does not match, the task will error. | |||
dtype: The expected dtype of the torch.Tensor. Similar to the | |||
shape, this may be statically or dynamically declared. | |||
direct_return: Whether the tensor is sent directly or inside of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: do we have a test that raises an exception when direct_return = True but it actually doesn't do it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm we have one for DAG inputs but I think not for outputs sent between actors. Let me add.
self._contains_type.register_custom_serializer() | ||
|
||
@property | ||
def is_direct_return(self) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it expensive to figure this out automatically>? For example,
- if the return type is tensor (with a type hint tensor), it is True
- If not, we can set it False automatically
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Python doesn't guarantee that if it's true, it will always be true :( Also, when we use direct_return=True, the only thing we can do is loop on a NCCL channel. We might be able to figure out a way to send some control message along the NCCL channel, but it seems a bit complicated for now.
@MicroCheck //python/ray/dag Signed-off-by: Stephanie Wang <[email protected]>
…ython objects (ray-project#45473) Allows torch.Tensors nested inside Python objects to be transferred via NCCL using the following syntax: ```python with InputNode() as inp: dag = sender.send.bind(inp) dag = dag.with_type_hint(TorchTensorType(transport="nccl")) dag = receiver.recv.bind(dag) ``` We implement this by using an additional shared memory channel to pass CPU data, with a "nested" NCCL channel to pass the GPU data. Here is the send procedure for the above code: 1. Serialize the data. Extract out all tensors that are on the GPU and replace them with some kind of placeholder. 2. Send a list of metadata through the meta_channel. 3. Send the GPU tensors through the NCCL channel. 4. Send the rest of the CPU data through a cpu_data_channel, with the placeholders for the GPU tensors. Note that if the TorchTensorType doesn't have a shape and dtype specified, we currently use the separate meta_channel to pass metadata for the serialized tensors, as introduced in ray-project#45332. To elide the cpu_data_channel, the user should now use `TorchTensorType(direct_return=True)`, to indicate that no CPU data is sent along with the GPU data. To elide the meta_channel, the user should declare the shape and dtype, e.g., `TorchTensorType(shape=(10, ), dtype=torch.float16)`. ## Related issue number Closes ray-project#45306. --------- Signed-off-by: Stephanie Wang <[email protected]> Co-authored-by: SangBin Cho <[email protected]> Signed-off-by: Ryan O'Leary <[email protected]>
…ython objects (ray-project#45473) Allows torch.Tensors nested inside Python objects to be transferred via NCCL using the following syntax: ```python with InputNode() as inp: dag = sender.send.bind(inp) dag = dag.with_type_hint(TorchTensorType(transport="nccl")) dag = receiver.recv.bind(dag) ``` We implement this by using an additional shared memory channel to pass CPU data, with a "nested" NCCL channel to pass the GPU data. Here is the send procedure for the above code: 1. Serialize the data. Extract out all tensors that are on the GPU and replace them with some kind of placeholder. 2. Send a list of metadata through the meta_channel. 3. Send the GPU tensors through the NCCL channel. 4. Send the rest of the CPU data through a cpu_data_channel, with the placeholders for the GPU tensors. Note that if the TorchTensorType doesn't have a shape and dtype specified, we currently use the separate meta_channel to pass metadata for the serialized tensors, as introduced in ray-project#45332. To elide the cpu_data_channel, the user should now use `TorchTensorType(direct_return=True)`, to indicate that no CPU data is sent along with the GPU data. To elide the meta_channel, the user should declare the shape and dtype, e.g., `TorchTensorType(shape=(10, ), dtype=torch.float16)`. ## Related issue number Closes ray-project#45306. --------- Signed-off-by: Stephanie Wang <[email protected]> Co-authored-by: SangBin Cho <[email protected]> Signed-off-by: Ryan O'Leary <[email protected]>
…ython objects (ray-project#45473) Allows torch.Tensors nested inside Python objects to be transferred via NCCL using the following syntax: ```python with InputNode() as inp: dag = sender.send.bind(inp) dag = dag.with_type_hint(TorchTensorType(transport="nccl")) dag = receiver.recv.bind(dag) ``` We implement this by using an additional shared memory channel to pass CPU data, with a "nested" NCCL channel to pass the GPU data. Here is the send procedure for the above code: 1. Serialize the data. Extract out all tensors that are on the GPU and replace them with some kind of placeholder. 2. Send a list of metadata through the meta_channel. 3. Send the GPU tensors through the NCCL channel. 4. Send the rest of the CPU data through a cpu_data_channel, with the placeholders for the GPU tensors. Note that if the TorchTensorType doesn't have a shape and dtype specified, we currently use the separate meta_channel to pass metadata for the serialized tensors, as introduced in ray-project#45332. To elide the cpu_data_channel, the user should now use `TorchTensorType(direct_return=True)`, to indicate that no CPU data is sent along with the GPU data. To elide the meta_channel, the user should declare the shape and dtype, e.g., `TorchTensorType(shape=(10, ), dtype=torch.float16)`. ## Related issue number Closes ray-project#45306. --------- Signed-off-by: Stephanie Wang <[email protected]> Co-authored-by: SangBin Cho <[email protected]>
Why are these changes needed?
Allows torch.Tensors nested inside Python objects to be transferred via NCCL using the following syntax:
We implement this by using an additional shared memory channel to pass CPU data, with a "nested" NCCL channel to pass the GPU data. Here is the send procedure for the above code:
Note that if the TorchTensorType doesn't have a shape and dtype specified, we currently use the separate meta_channel to pass metadata for the serialized tensors, as introduced in #45332. To elide the cpu_data_channel, the user should now use
TorchTensorType(direct_return=True)
, to indicate that no CPU data is sent along with the GPU data. To elide the meta_channel, the user should declare the shape and dtype, e.g.,TorchTensorType(shape=(10, ), dtype=torch.float16)
.Related issue number
Closes #45306.
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.