Skip to content

Commit

Permalink
[Data] Add GCE variant for shuffle_data_loader (ray-project#34632)
Browse files Browse the repository at this point in the history
This PR configures BuildKite to run the shuffle_data_loader Data release tests on GCE. This test was previously excluded in ray-project#34105.

---------

Signed-off-by: Balaji Veeramani <[email protected]>
  • Loading branch information
bveeramani committed Apr 24, 2023
1 parent e39efea commit f6c6954
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 20 deletions.
27 changes: 20 additions & 7 deletions release/nightly_tests/dataset/dataset_shuffle_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@

import ray

from pyarrow import fs
import numpy as np
import torch

PATH = [
f"s3:https://shuffling-data-loader-benchmarks/data/input_data_{i}.parquet.snappy"
for i in range(0, 25)
]
PATHS = {
"aws": [
f"s3:https://shuffling-data-loader-benchmarks/data/input_data_{i}.parquet.snappy"
for i in range(0, 25)
],
"gcp": [
f"gcs:https://shuffling-data-loader-benchmarks/data/input_data_{i}.parquet.snappy"
for i in range(0, 25)
],
}


def create_parser():
Expand All @@ -26,6 +33,7 @@ def create_parser():
)
parser.add_argument("--num-workers", type=int, default=4)
parser.add_argument("--repeat-times", type=int, default=16)
parser.add_argument("--cloud", type=str, choices=["aws", "gcp"])
return parser


Expand Down Expand Up @@ -83,9 +91,14 @@ def create_torch_iterator(split, batch_size, rank=None):
return torch_iterator


def create_dataset(filenames, repeat_times):
def create_dataset(filenames, repeat_times, cloud):
if cloud == "gcp":
filesystem = fs.GcsFileSystem()
else:
filesystem = None

pipeline = (
ray.data.read_parquet(list(filenames))
ray.data.read_parquet(list(filenames), filesystem=filesystem)
.repeat(times=repeat_times)
.random_shuffle_each_window()
)
Expand All @@ -100,7 +113,7 @@ def create_dataset(filenames, repeat_times):

start = time.time()

pipeline = create_dataset(PATH, args.repeat_times)
pipeline = create_dataset(PATHS[args.cloud], args.repeat_times, args.cloud)
splits = pipeline.split(args.num_workers)

@ray.remote(num_gpus=1)
Expand Down
1 change: 0 additions & 1 deletion release/nightly_tests/dataset/shuffle_app_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ base_image: {{ env["RAY_IMAGE_ML_NIGHTLY_GPU"] | default("anyscale/ray-ml:nightl
python:
pip_packages:
- boto3
- pyarrow<7.0.0
conda_packages: []

post_build_cmds:
Expand Down
24 changes: 12 additions & 12 deletions release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3371,14 +3371,14 @@
cluster:
cluster_env: app_config.yaml
cluster_compute: multi_node_checkpointing_compute_config.yaml

run:
timeout: 3600
script: pytest checkpointing_tests/test_learner_group_checkpointing.py

wait_for_nodes:
num_nodes: 3

alert: default


Expand Down Expand Up @@ -4982,17 +4982,17 @@

run:
timeout: 1800
script: python dataset_shuffle_data_loader.py
script: python dataset_shuffle_data_loader.py --cloud aws

# TODO: Port s3:https://shuffling-data-loader-benchmarks/ to GCS.
# variations:
# - __suffix__: aws
# - __suffix__: gce
# env: gce
# frequency: manual
# cluster:
# cluster_env: shuffle_app_config.yaml
# cluster_compute: shuffle_compute_gce.yaml
variations:
- __suffix__: aws
- __suffix__: gce
env: gce
frequency: manual
cluster:
cluster_compute: shuffle_compute_gce.yaml
run:
script: python dataset_shuffle_data_loader.py --cloud gcp

- name: parquet_metadata_resolution
group: data-tests
Expand Down

0 comments on commit f6c6954

Please sign in to comment.