Skip to content

Commit

Permalink
[RAY AIR] set the correct gpu id in TorchTrainer (ray-project#26493)
Browse files Browse the repository at this point in the history
  • Loading branch information
JiahaoYao committed Jul 19, 2022
1 parent 5b93716 commit 8284270
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 10 deletions.
2 changes: 1 addition & 1 deletion python/ray/train/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ py_test(

py_test(
name = "test_tune",
size = "medium",
size = "large",
srcs = ["tests/test_tune.py"],
tags = ["team:ml", "exclusive", "tune"],
deps = [":train_lib"]
Expand Down
62 changes: 61 additions & 1 deletion python/ray/train/tests/test_gpu.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
import os
from timeit import default_timer as timer
from collections import Counter

from unittest.mock import patch
import pytest
import torch
import torchvision
from test_tune import torch_fashion_mnist, tune_tensorflow_mnist
from test_tune import (
torch_fashion_mnist,
tune_tensorflow_mnist,
)
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data import DataLoader, DistributedSampler

import ray
from ray.cluster_utils import Cluster

import ray.train as train
from ray.train import Trainer, TrainingCallback
from ray.air.config import ScalingConfig
Expand All @@ -25,6 +32,7 @@
from ray.train.examples.torch_linear_example import LinearDataset
from ray.train.horovod.horovod_trainer import HorovodTrainer
from ray.train.tensorflow.tensorflow_trainer import TensorflowTrainer
from ray.train.torch import TorchConfig
from ray.train.torch.torch_trainer import TorchTrainer


Expand All @@ -43,6 +51,20 @@ def ray_start_1_cpu_1_gpu():
ray.shutdown()


@pytest.fixture
def ray_2_node_4_gpu():
cluster = Cluster()
for _ in range(2):
cluster.add_node(num_cpus=8, num_gpus=4)

ray.init(address=cluster.address)

yield

ray.shutdown()
cluster.shutdown()


# TODO: Refactor as a backend test.
@pytest.mark.parametrize("num_gpus_per_worker", [0.5, 1])
def test_torch_get_device(ray_start_4_cpus_2_gpus, num_gpus_per_worker):
Expand Down Expand Up @@ -70,6 +92,40 @@ def train_fn():
)


# TODO: Refactor as a backend test.
@pytest.mark.parametrize("num_gpus_per_worker", [0.5, 1, 2])
def test_torch_get_device_dist(ray_2_node_4_gpu, num_gpus_per_worker):
@patch("torch.cuda.is_available", lambda: True)
def train_fn():
return train.torch.get_device().index

trainer = Trainer(
TorchConfig(backend="gloo"),
num_workers=int(8 / num_gpus_per_worker),
use_gpu=True,
resources_per_worker={"GPU": num_gpus_per_worker},
)
trainer.start()
devices = trainer.run(train_fn)
trainer.shutdown()

count = Counter(devices)
if num_gpus_per_worker == 0.5:
for i in range(4):
assert count[i] == 4
elif num_gpus_per_worker == 1:
for i in range(4):
assert count[i] == 2
elif num_gpus_per_worker == 2:
for i in range(2):
assert count[2 * i] == 2
else:
raise RuntimeError(
"New parameter for this test has been added without checking that the "
"correct devices have been returned."
)


# TODO: Refactor as a backend test.
def test_torch_prepare_model(ray_start_4_cpus_2_gpus):
"""Tests if ``prepare_model`` correctly wraps in DDP."""
Expand Down Expand Up @@ -346,6 +402,10 @@ def test_tune_fashion_mnist_gpu(ray_start_4_cpus_2_gpus):
torch_fashion_mnist(num_workers=2, use_gpu=True, num_samples=1)


def test_concurrent_tune_fashion_mnist_gpu(ray_start_4_cpus_2_gpus):
torch_fashion_mnist(num_workers=1, use_gpu=True, num_samples=2)


def test_tune_tensorflow_mnist_gpu(ray_start_4_cpus_2_gpus):
tune_tensorflow_mnist(num_workers=2, use_gpu=True, num_samples=1)

Expand Down
67 changes: 61 additions & 6 deletions python/ray/train/tests/test_tune.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

import ray
from ray.cluster_utils import Cluster
import ray.train as train
from ray import tune
from ray.tune import TuneError
Expand All @@ -23,6 +24,9 @@
from ray.tune.tune_config import TuneConfig
from ray.tune.tuner import Tuner

from ray.train.torch import TorchConfig
from unittest.mock import patch


@pytest.fixture
def ray_start_4_cpus():
Expand All @@ -40,6 +44,20 @@ def ray_start_8_cpus():
ray.shutdown()


@pytest.fixture
def ray_2_node_4_gpu():
cluster = Cluster()
for _ in range(2):
cluster.add_node(num_cpus=2, num_gpus=4)

ray.init(address=cluster.address)

yield

ray.shutdown()
cluster.shutdown()


class TestConfig(BackendConfig):
@property
def backend_cls(self):
Expand All @@ -55,8 +73,6 @@ def on_shutdown(self, worker_group: WorkerGroup, backend_config: TestConfig):


def torch_fashion_mnist(num_workers, use_gpu, num_samples):
epochs = 2

trainer = TorchTrainer(
fashion_mnist_train_func,
scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
Expand All @@ -67,7 +83,7 @@ def torch_fashion_mnist(num_workers, use_gpu, num_samples):
"train_loop_config": {
"lr": tune.loguniform(1e-4, 1e-1),
"batch_size": tune.choice([32, 64, 128]),
"epochs": epochs,
"epochs": 2,
}
},
tune_config=TuneConfig(
Expand All @@ -86,8 +102,6 @@ def test_tune_torch_fashion_mnist(ray_start_8_cpus):


def tune_tensorflow_mnist(num_workers, use_gpu, num_samples):
epochs = 2

trainer = TensorflowTrainer(
tensorflow_mnist_train_func,
scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
Expand All @@ -98,7 +112,7 @@ def tune_tensorflow_mnist(num_workers, use_gpu, num_samples):
"train_loop_config": {
"lr": tune.loguniform(1e-4, 1e-1),
"batch_size": tune.choice([32, 64, 128]),
"epochs": epochs,
"epochs": 2,
}
},
tune_config=TuneConfig(
Expand Down Expand Up @@ -307,6 +321,47 @@ def train_func():
assert len(trial_dfs[0]["training_iteration"]) == 4


@pytest.mark.parametrize("num_gpus_per_worker", [0.5, 1, 2])
def test_tune_torch_get_device_gpu(ray_2_node_4_gpu, num_gpus_per_worker):
from ray import tune
from ray.tune.tuner import Tuner, TuneConfig

num_samples = 2

@patch("torch.cuda.is_available", lambda: True)
def train_func():
train.report(device_id=train.torch.get_device().index)

trainer = TorchTrainer(
train_func,
torch_config=TorchConfig(backend="gloo"),
scaling_config={
"num_workers": 2,
"use_gpu": True,
"resources_per_worker": {"GPU": num_gpus_per_worker},
},
)

tuner = Tuner(
trainer,
param_space={
"train_loop_config": {
"dummy": tune.choice([32, 64, 128]),
}
},
tune_config=TuneConfig(
num_samples=num_samples,
),
)
analysis = tuner.fit()._experiment_analysis
trial_dfs = list(analysis.trial_dataframes.values())
device_ids = [trial_df["device_id"].tolist() for trial_df in trial_dfs]

assert len(device_ids) == num_samples
for i in range(num_samples):
assert device_ids[i][0] == 0


if __name__ == "__main__":
import sys

Expand Down
41 changes: 39 additions & 2 deletions python/ray/train/torch/train_loop_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,11 +463,48 @@ def wrapper(worker_id):
return data_loader

def get_device(self) -> torch.device:
"""Gets the correct torch device to use for training."""
"""Gets the correct torch device to use for training.
Assumes that `CUDA_VISIBLE_DEVICES` is set and is a
superset of the `ray.get_gpu_ids()`.
Example:
>>> # os.environ["CUDA_VISIBLE_DEVICES"] = "3,4"
>>> # ray.get_gpu_ids() == [3]
>>> # torch.cuda.is_available() == True
>>> # get_device() == torch.device("cuda:0")
>>> # os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3,4"
>>> # ray.get_gpu_ids() == [4]
>>> # torch.cuda.is_available() == True
>>> # get_device() == torch.device("cuda:4")
>>> # os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3,4,5"
>>> # ray.get_gpu_ids() == [4,5]
>>> # torch.cuda.is_available() == True
>>> # get_device() == torch.device("cuda:4")
"""
if torch.cuda.is_available():
# GPU IDs are assigned by Ray after you specify "use_gpu"
gpu_ids = ray.get_gpu_ids()

if len(gpu_ids) > 0:
device_id = gpu_ids[0]
# By default, there should only be one GPU ID if `use_gpu=True`.
# If there are multiple GPUs, use the first one.
# If using fractional GPUs, these IDs are not guaranteed
# to be unique across different processes.
gpu_id = gpu_ids[0]

cuda_visible_str = os.environ.get("CUDA_VISIBLE_DEVICES", "")
if cuda_visible_str and cuda_visible_str != "NoDevFiles":
cuda_visible_list = [
int(dev) for dev in cuda_visible_str.split(",")
]
device_id = cuda_visible_list.index(gpu_id)
else:
raise RuntimeError(
f"CUDA_VISIBLE_DEVICES set incorrectly: {cuda_visible_str}"
)
else:
# If called on the driver or outside of Ray Train, return the
# 0th device.
Expand Down

0 comments on commit 8284270

Please sign in to comment.