Skip to content

Commit

Permalink
[AIR] Init Mosaic Trainer API (ray-project#29237)
Browse files Browse the repository at this point in the history
Signed-off-by: ilee300a [[email protected]](mailto:[email protected])

In this PR, we provide initial commits for integrating Mosaic library with Ray. As Mosaic library provides algorithmic acceleration, providing further acceleration from the system side via Ray's distributed training, we can improve the speedup of training process.

Included in this PR is MosaicTrainer skeleton code. For this PR, the trainer does not support using ray dataset shards in the worker loop and assumes that the data loaders are prepared in the trainer init function. The current trainer is able to run a composer model with callbacks and loggers as well as select algorithms. No metrics or checkpoints are reported from the trainer at the moment.

Co-authored-by: Amog Kamsetty <[email protected]>
  • Loading branch information
ilee300a and amogkam authored Oct 25, 2022
1 parent b0bd270 commit 51a7b09
Show file tree
Hide file tree
Showing 9 changed files with 569 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .buildkite/pipeline.gpu_large.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
- pip uninstall torch -y
- pip install -U torch==1.11.0+cu113 --extra-index-url https://download.pytorch.org/whl/cu113
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=torch_1_11 python/ray/train/...
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=torch_1_11,-mosaic python/ray/train/...

- label: ":tv: :database: :steam_locomotive: Datasets Train Integration GPU Tests and Examples (Python 3.7)"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TRAIN_AFFECTED"]
Expand Down
14 changes: 13 additions & 1 deletion .buildkite/pipeline.ml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,21 @@
- ./ci/env/env_info.sh
- ./ci/run/run_bazel_test_with_sharding.sh
--config=ci $(./ci/run/bazel_export_options)
--test_tag_filters=-gpu_only,-gpu,-minimal,-tune,-needs_credentials
--test_tag_filters=-gpu_only,-gpu,-minimal,-tune,-needs_credentials,-torch_1_11,-mosaic
python/ray/train/...

- label: ":tv: :steam_locomotive: Train tests and examples (PyTorch 1.11) "
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TRAIN_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TRAIN_TESTING=1 DATA_PROCESSING_TESTING=1 INSTALL_HOROVOD=1 ./ci/env/install-dependencies.sh
# TODO(amogkam): Remove this after we upgrade the PyTorch version.
- pip uninstall torch -y
- pip install -U torch==1.11.0+cu113 --extra-index-url https://download.pytorch.org/whl/cu113
- pip install mosaicml==0.10.1 --ignore-installed
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=mosaic,-gpu_only python/ray/train/...

- label: ":steam_locomotive: :octopus: Train + Tune tests and examples"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TRAIN_AFFECTED"]
instance_size: medium
Expand Down
4 changes: 4 additions & 0 deletions doc/source/custom_directives.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ def update_context(app, pagename, templatename, context, doctree):
"tree",
"wandb",
"zoopt",
"composer",
"composer.trainer",
"composer.loggers",
"composer.loggers.logger_destination",
]


Expand Down
15 changes: 15 additions & 0 deletions doc/source/train/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,21 @@ Scikit-Learn
:exclude-members: SklearnTrainer
:show-inheritance:

Mosaic
~~~~~~

.. autoclass:: ray.train.mosaic.MosaicTrainer
:members:
:show-inheritance:

.. automethod:: __init__


.. automodule:: ray.train.mosaic
:members:
:exclude-members: MosaicTrainer
:show-inheritance:


Reinforcement Learning (RLlib)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
8 changes: 8 additions & 0 deletions python/ray/train/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,14 @@ py_test(
deps = [":train_lib"]
)

py_test(
name = "test_mosaic_trainer",
size = "medium",
srcs = ["tests/test_mosaic_trainer.py"],
tags = ["team:ml", "exclusive", "ray_air", "mosaic"],
deps = [":train_lib", ":conftest"]
)

py_test(
name = "test_lightgbm_predictor",
size = "small",
Expand Down
121 changes: 121 additions & 0 deletions python/ray/train/examples/mosaic_cifar10_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import argparse
import torch
import torch.utils.data

import torchvision
from torchvision import transforms, datasets

from torchmetrics.classification.accuracy import Accuracy


import ray
from ray.air.config import ScalingConfig
import ray.train as train
from ray.air import session


def trainer_init_per_worker(config):
from composer.core.evaluator import Evaluator
from composer.models.tasks import ComposerClassifier
import composer.optim

BATCH_SIZE = 32
# prepare the model for distributed training and wrap with ComposerClassifier for
# Composer Trainer compatibility
model = torchvision.models.resnet18(num_classes=10)
model = ComposerClassifier(ray.train.torch.prepare_model(model))

# prepare train/test dataset
mean = (0.507, 0.487, 0.441)
std = (0.267, 0.256, 0.276)
cifar10_transforms = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize(mean, std)]
)

data_directory = "~/data"
train_dataset = torch.utils.data.Subset(
datasets.CIFAR10(
data_directory, train=True, download=True, transform=cifar10_transforms
),
list(range(64)),
)
test_dataset = torch.utils.data.Subset(
datasets.CIFAR10(
data_directory, train=False, download=True, transform=cifar10_transforms
),
list(range(64)),
)

batch_size_per_worker = BATCH_SIZE // session.get_world_size()
train_dataloader = torch.utils.data.DataLoader(
train_dataset, batch_size=batch_size_per_worker, shuffle=True
)
test_dataloader = torch.utils.data.DataLoader(
test_dataset, batch_size=batch_size_per_worker, shuffle=True
)

train_dataloader = train.torch.prepare_data_loader(train_dataloader)
test_dataloader = train.torch.prepare_data_loader(test_dataloader)

evaluator = Evaluator(
dataloader=test_dataloader, label="my_evaluator", metrics=Accuracy()
)

# prepare optimizer
optimizer = composer.optim.DecoupledSGDW(
model.parameters(),
lr=0.05,
momentum=0.9,
weight_decay=2.0e-3,
)

if config.pop("should_eval", False):
config["eval_dataloader"] = evaluator

return composer.trainer.Trainer(
model=model, train_dataloader=train_dataloader, optimizers=optimizer, **config
)


def train_mosaic_cifar10(num_workers=2, use_gpu=False):
from composer.algorithms import LabelSmoothing
from ray.train.mosaic import MosaicTrainer

trainer_init_config = {
"max_duration": "1ep",
"algorithms": [LabelSmoothing()],
"should_eval": False,
}

trainer = MosaicTrainer(
trainer_init_per_worker=trainer_init_per_worker,
trainer_init_config=trainer_init_config,
scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
)
result = trainer.fit()
print(f"Results: {result.metrics}")

return result


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--address", required=False, type=str, help="the address to use for Ray"
)
parser.add_argument(
"--num-workers",
"-n",
type=int,
default=2,
help="Sets number of workers for training.",
)
parser.add_argument(
"--use-gpu", action="store_true", default=False, help="Enables GPU training"
)

args, _ = parser.parse_known_args()

runtime_env = {"pip": ["mosaicml==0.10.1"]}
ray.init(address=args.address, runtime_env=runtime_env)
train_mosaic_cifar10(num_workers=args.num_workers, use_gpu=args.use_gpu)
3 changes: 3 additions & 0 deletions python/ray/train/mosaic/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from ray.train.mosaic.mosaic_trainer import MosaicTrainer

__all__ = ["MosaicTrainer"]
Loading

0 comments on commit 51a7b09

Please sign in to comment.