From 081a8dfa76fede531e4b9ad06d9cde99c9148a29 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 19 Jan 2023 11:40:09 -0800 Subject: [PATCH] [Datasets] Make data nightly tests still work with lazy execution (#31460) This is followup from #31286 (comment), here we audit all data nightly tests to make sure they are still working with lazy execution enabled by default. Signed-off-by: Cheng Su --- release/nightly_tests/dataset/inference.py | 6 +++--- .../dataset/map_batches_benchmark.py | 16 +++++++++++++--- release/nightly_tests/dataset/sort.py | 1 + 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/release/nightly_tests/dataset/inference.py b/release/nightly_tests/dataset/inference.py index 21090b1664832..314502a8f9652 100644 --- a/release/nightly_tests/dataset/inference.py +++ b/release/nightly_tests/dataset/inference.py @@ -86,17 +86,17 @@ def infer(batch): ray_remote_args={"num_cpus": 0.5}, ) # Do a blocking map so that we can measure the download time. -ds = ds.map(lambda x: x) +ds = ds.map(lambda x: x).fully_executed() end_download_time = time.time() print("Preprocessing...") -ds = ds.map(preprocess) +ds = ds.map(preprocess).fully_executed() end_preprocess_time = time.time() print("Inferring...") # NOTE: set a small batch size to avoid OOM on GRAM when doing inference. ds = ds.map_batches( infer, num_gpus=0.25, batch_size=128, batch_format="pandas", compute="actors" -) +).fully_executed() end_time = time.time() diff --git a/release/nightly_tests/dataset/map_batches_benchmark.py b/release/nightly_tests/dataset/map_batches_benchmark.py index e777efaf0cccf..8bd588a34d033 100644 --- a/release/nightly_tests/dataset/map_batches_benchmark.py +++ b/release/nightly_tests/dataset/map_batches_benchmark.py @@ -19,9 +19,13 @@ def map_batches( batch_format: Literal["default", "pandas", "pyarrow", "numpy"], compute: Optional[Union[str, ComputeStrategy]] = None, num_calls: Optional[int] = 1, + is_eager_executed: Optional[bool] = False, ) -> Dataset: ds = input_ds + if is_eager_executed: + ds.fully_executed() + for _ in range(num_calls): ds = ds.map_batches( lambda x: x, @@ -29,6 +33,8 @@ def map_batches( batch_size=batch_size, compute=compute, ) + if is_eager_executed: + ds.fully_executed() return ds @@ -37,6 +43,7 @@ def run_map_batches_benchmark(benchmark: Benchmark): "s3://air-example-data/ursa-labs-taxi-data/by_year/2018/01" ) lazy_input_ds = input_ds.lazy() + input_ds.fully_executed() batch_formats = ["pandas", "numpy"] batch_sizes = [1024, 2048, 4096, None] @@ -56,7 +63,7 @@ def run_map_batches_benchmark(benchmark: Benchmark): continue num_calls = 2 - test_name = f"map-batches-{batch_format}-{batch_size}-{num_calls}-default" + test_name = f"map-batches-{batch_format}-{batch_size}-{num_calls}-eager" benchmark.run( test_name, map_batches, @@ -64,6 +71,7 @@ def run_map_batches_benchmark(benchmark: Benchmark): batch_format=batch_format, batch_size=batch_size, num_calls=num_calls, + is_eager_executed=True, ) test_name = f"map-batches-{batch_format}-{batch_size}-{num_calls}-lazy" benchmark.run( @@ -86,7 +94,7 @@ def run_map_batches_benchmark(benchmark: Benchmark): test_name = ( f"map-batches-{batch_format}-{batch_size}-{num_calls}-" - f"{compute_strategy}-default" + f"{compute_strategy}-eager" ) benchmark.run( test_name, @@ -96,6 +104,7 @@ def run_map_batches_benchmark(benchmark: Benchmark): batch_size=batch_size, compute=compute, num_calls=num_calls, + is_eager_executed=True, ) test_name = ( f"map-batches-{batch_format}-{batch_size}-{num_calls}-" @@ -131,7 +140,8 @@ def run_map_batches_benchmark(benchmark: Benchmark): # Test reading multiple files. input_ds = ray.data.read_parquet( "s3://air-example-data/ursa-labs-taxi-data/by_year/2018" - ) + ).fully_executed() + for batch_format in batch_formats: for compute in ["tasks", "actors"]: test_name = f"map-batches-{batch_format}-{compute}-multi-files" diff --git a/release/nightly_tests/dataset/sort.py b/release/nightly_tests/dataset/sort.py index 98d76767a4f9a..55d726f9c93cd 100644 --- a/release/nightly_tests/dataset/sort.py +++ b/release/nightly_tests/dataset/sort.py @@ -118,6 +118,7 @@ def make_block(count: int, num_columns: int) -> Block: ds = ds.random_shuffle() else: ds = ds.sort(key="c_0") + ds.fully_executed() except Exception as e: exc = e pass