Skip to content

Commit

Permalink
Fix map_batches benchmark (#41812)
Browse files Browse the repository at this point in the history
This PR is to fix the map_batches benchmark in nightly test.

Signed-off-by: Cheng Su <[email protected]>
  • Loading branch information
c21 committed Dec 13, 2023
1 parent 9562078 commit 4c40f2b
Showing 1 changed file with 23 additions and 12 deletions.
35 changes: 23 additions & 12 deletions release/nightly_tests/dataset/map_batches_benchmark.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import Optional, Union
from typing import Optional, Union, Tuple
import numpy as np

import ray
from ray.data._internal.compute import ActorPoolStrategy, ComputeStrategy
from ray.data.dataset import Dataset, MaterializedDataset

from benchmark import Benchmark
Expand All @@ -14,19 +13,31 @@ def map_batches(
input_ds: Dataset,
batch_size: int,
batch_format: Literal["default", "pandas", "pyarrow", "numpy"],
compute: Optional[Union[str, ComputeStrategy]] = None,
concurrency: Optional[Union[int, Tuple[int, int]]] = None,
num_calls: Optional[int] = 1,
is_eager_executed: Optional[bool] = False,
) -> Dataset:
assert isinstance(input_ds, MaterializedDataset)
ds = input_ds

def udf(x):
return x

class UDFClass:
def __call__(self, x):
return x

if concurrency is None:
fn = udf
else:
fn = UDFClass

for _ in range(num_calls):
ds = ds.map_batches(
lambda x: x,
fn,
batch_format=batch_format,
batch_size=batch_size,
compute=compute,
concurrency=concurrency,
)
if is_eager_executed:
ds = ds.materialize()
Expand Down Expand Up @@ -68,9 +79,9 @@ def run_map_batches_benchmark(benchmark: Benchmark):

# Test multiple calls of map_batches.
for num_calls in num_calls_list:
for compute in [None, ActorPoolStrategy(size=1)]:
for concurrency in [None, 1]:
batch_size = 4096
if compute is None:
if concurrency is None:
compute_strategy = "tasks"
else:
compute_strategy = "actors"
Expand All @@ -85,7 +96,7 @@ def run_map_batches_benchmark(benchmark: Benchmark):
input_ds=input_ds,
batch_format=batch_format,
batch_size=batch_size,
compute=compute,
concurrency=concurrency,
num_calls=num_calls,
is_eager_executed=True,
)
Expand All @@ -99,7 +110,7 @@ def run_map_batches_benchmark(benchmark: Benchmark):
input_ds=input_ds,
batch_format=batch_format,
batch_size=batch_size,
compute=compute,
concurrency=concurrency,
num_calls=num_calls,
)

Expand All @@ -126,8 +137,8 @@ def run_map_batches_benchmark(benchmark: Benchmark):
).materialize()

for batch_format in batch_formats:
for compute in [None, ActorPoolStrategy(min_size=1, max_size=float("inf"))]:
if compute is None:
for concurrency in [None, (1, 1000)]:
if concurrency is None:
compute_strategy = "tasks"
else:
compute_strategy = "actors"
Expand All @@ -139,7 +150,7 @@ def run_map_batches_benchmark(benchmark: Benchmark):
input_ds=input_ds,
batch_format=batch_format,
batch_size=4096,
compute=compute,
concurrency=concurrency,
num_calls=1,
)

Expand Down

0 comments on commit 4c40f2b

Please sign in to comment.