Skip to content

Commit

Permalink
[Data] Additional args for Data + Train benchmark (ray-project#37839)
Browse files Browse the repository at this point in the history
As a followup to ray-project#37624, add the following additional parameters for the multi-node training benchmark:
- File type (image, parquet)
- local shuffle buffer size
- preserve_order (train config)
- increases default # epochs to 10

Signed-off-by: Scott Lee <[email protected]>
Signed-off-by: NripeshN <[email protected]>
  • Loading branch information
scottjlee authored and NripeshN committed Aug 15, 2023
1 parent 0d43543 commit 67f1b1e
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from ray.air import session
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig
from ray.train import DataConfig


import time
import os
Expand All @@ -11,14 +13,70 @@
import torch

# This benchmark does the following:
# 1) Read images with ray.data.read_images()
# 1) Read files (images or parquet) with ray.data
# 2) Apply preprocessing with map_batches()
# 3) Train TorchTrainer on processed data
# Metrics recorded to the output file are:
# - ray.torchtrainer.fit: Throughput of the final epoch in
# TorchTrainer.fit() (step 3 above)


def parse_args():
import argparse

parser = argparse.ArgumentParser()

parser.add_argument(
"--file-type",
default="image",
type=str,
help="Input file type; choose from: ['image', 'parquet']",
)
parser.add_argument(
"--batch-size",
default=32,
type=int,
help="Batch size to use.",
)
parser.add_argument(
"--num-epochs",
# Use 10 epochs and report the throughput of the last epoch, in case
# there is warmup in the first epoch.
default=10,
type=int,
help="Number of epochs to run. The throughput for the last epoch will be kept.",
)
parser.add_argument(
"--num-workers",
default=1,
type=int,
help="Number of workers.",
)
parser.add_argument(
"--local-shuffle-buffer-size",
default=200,
type=int,
help="Parameter into ds.iter_batches(local_shuffle_buffer_size=...)",
)
parser.add_argument(
"--preserve-order",
action="store_true",
default=False,
help="Whether to configure Train with preserve_order flag.",
)
args = parser.parse_args()

if args.file_type == "image":
args.data_root = "s3:https://air-cuj-imagenet-1gb"
elif args.file_type == "parquet":
args.data_root = "s3:https://air-example-data-2/10G-image-data-synthetic-raw-parquet"
else:
raise Exception(
f"Unknown file type {args.file_type}; expected one of: ['image', 'parquet']"
)
return args


# Constants and utility methods for image-based benchmarks.
DEFAULT_IMAGE_SIZE = 224

Expand Down Expand Up @@ -54,46 +112,18 @@ def crop_and_flip_image_batch(image_batch):


if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser()

parser.add_argument(
"--data-root",
default="s3:https://air-cuj-imagenet-1gb",
type=str,
help="Directory path with files.",
)
parser.add_argument(
"--batch-size",
default=32,
type=int,
help="Batch size to use.",
)
parser.add_argument(
"--num-epochs",
# Use 2 epochs and report the throughput of the last epoch, in case
# there is warmup in the first epoch.
default=2,
type=int,
help="Number of epochs to run. The throughput for the last epoch will be kept.",
)
parser.add_argument(
"--num-workers",
default=1,
type=int,
help="Number of workers.",
)
args = parser.parse_args()

args = parse_args()
metrics = {}

ray_dataset = (
# 1) Read in data with read_images()
ray.data.read_images(args.data_root)
# 2) Preprocess data by applying transformation with map_batches()
.map_batches(crop_and_flip_image_batch)
)
# 1) Read in data with read_images() / read_parquet()
if args.file_type == "image":
ray_dataset = ray.data.read_images(args.data_root)
elif args.file_type == "parquet":
ray_dataset = ray.data.read_parquet(args.data_root)
else:
raise Exception(f"Unknown file type {args.file_type}")
# 2) Preprocess data by applying transformation with map_batches()
ray_dataset = ray_dataset.map_batches(crop_and_flip_image_batch)

def train_loop_per_worker():
it = session.get_dataset_shard("train")
Expand All @@ -102,38 +132,46 @@ def train_loop_per_worker():
num_rows = 0
start_t = time.time()
for batch in it.iter_batches(
batch_size=args.batch_size, prefetch_batches=10
batch_size=args.batch_size,
local_shuffle_buffer_size=args.local_shuffle_buffer_size,
prefetch_batches=10,
):
num_rows += len(batch["image"])
num_rows += args.batch_size
end_t = time.time()
# Record throughput per epoch.
epoch_tput = num_rows / (end_t - start_t)
session.report({"tput": epoch_tput, "epoch": i})

# 3) Train TorchTrainer on processed data
options = DataConfig.default_ingest_options()
options.preserve_order = args.preserve_order

torch_trainer = TorchTrainer(
train_loop_per_worker,
scaling_config=ScalingConfig(num_workers=args.num_workers),
datasets={"train": ray_dataset},
scaling_config=ScalingConfig(num_workers=args.num_workers),
dataset_config=ray.train.DataConfig(
execution_options=options,
),
)
result = torch_trainer.fit()

# Report the throughput of the last training epoch.
metrics["ray.TorchTrainer.fit"] = list(result.metrics_dataframe["tput"])[-1]
metrics["ray_TorchTrainer_fit"] = list(result.metrics_dataframe["tput"])[-1]

# Gather up collected metrics, and write to output JSON file.
metrics_dict = defaultdict(dict)
for label, tput in metrics.items():
metrics_dict[label].update({"THROUGHPUT": tput})

test_name = f"read_images_train{args.num_workers}_cpu"
test_name = f"read_{args.file_type}_train_{args.num_workers}workers"
result_dict = {
test_name: metrics_dict,
"success": 1,
}

test_output_json = os.environ.get(
"TEST_OUTPUT_JSON", "/tmp/train_torch_image_benchmark.json"
"TEST_OUTPUT_JSON", "/tmp/multi_node_train_benchmark.json"
)

with open(test_output_json, "wt") as f:
Expand Down
94 changes: 86 additions & 8 deletions release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6361,11 +6361,11 @@
cluster_env: app_config.yaml
cluster_compute: single_node_benchmark_compute_gce.yaml

- name: read_images_train_4_cpu
- name: read_images_train_4_gpu
group: data-tests
working_dir: nightly_tests/dataset

frequency: nightly
frequency: manual
team: data
python: "3.8"
cluster:
Expand All @@ -6375,8 +6375,8 @@
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_2x2_aws.yaml

run:
timeout: 600
script: python image_train_benchmark.py --num-workers 4
timeout: 1800
script: python multi_node_train_benchmark.py --num-workers 4 --file-type image

variations:
- __suffix__: aws
Expand All @@ -6387,11 +6387,11 @@
cluster_env: app_config.yaml
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_2x2_gce.yaml

- name: read_images_train_16_cpu
- name: read_images_train_16_gpu
group: data-tests
working_dir: nightly_tests/dataset

frequency: nightly
frequency: manual
team: data
python: "3.8"
cluster:
Expand All @@ -6401,8 +6401,86 @@
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_4x4_aws.yaml

run:
timeout: 600
script: python image_train_benchmark.py --num-workers 16
timeout: 1800
script: python multi_node_train_benchmark.py --num-workers 16 --file-type image

variations:
- __suffix__: aws
- __suffix__: gce
env: gce
frequency: manual
cluster:
cluster_env: app_config.yaml
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_4x4_gce.yaml

- name: read_images_train_16_gpu_preserve_order
group: data-tests
working_dir: nightly_tests/dataset

frequency: manual
team: data
python: "3.8"
cluster:
byod:
type: gpu
cluster_env: app_config.yaml
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_4x4_aws.yaml

run:
timeout: 1800
script: python multi_node_train_benchmark.py --num-workers 16 --file-type image --preserve-order

variations:
- __suffix__: aws
- __suffix__: gce
env: gce
frequency: manual
cluster:
cluster_env: app_config.yaml
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_4x4_gce.yaml

- name: read_parquet_train_4_gpu
group: data-tests
working_dir: nightly_tests/dataset

frequency: manual
team: data
python: "3.8"
cluster:
byod:
type: gpu
cluster_env: app_config.yaml
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_2x2_aws.yaml

run:
timeout: 1800
script: python multi_node_train_benchmark.py --num-workers 4 --file-type parquet

variations:
- __suffix__: aws
- __suffix__: gce
env: gce
frequency: manual
cluster:
cluster_env: app_config.yaml
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_2x2_gce.yaml

- name: read_parquet_train_16_gpu
group: data-tests
working_dir: nightly_tests/dataset

frequency: manual
team: data
python: "3.8"
cluster:
byod:
type: gpu
cluster_env: app_config.yaml
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_4x4_aws.yaml

run:
timeout: 1800
script: python multi_node_train_benchmark.py --num-workers 16 --file-type parquet

variations:
- __suffix__: aws
Expand Down

0 comments on commit 67f1b1e

Please sign in to comment.