Skip to content

Commit

Permalink
[Datasets] Fix test_dynamic_block_split.py (ray-project#33096)
Browse files Browse the repository at this point in the history
Fix ray-project#33077

Co-authored-by: Ubuntu <[email protected]>
  • Loading branch information
jianoaix and Ubuntu committed Mar 7, 2023
1 parent d2b855f commit c01e47a
Showing 1 changed file with 56 additions and 46 deletions.
102 changes: 56 additions & 46 deletions python/ray/data/tests/test_dynamic_block_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,30 @@
# Data source generates random bytes data
class RandomBytesDatasource(Datasource):
def create_reader(self, **read_args):
return RandomBytesReader(read_args["num_blocks"], read_args["block_size"])
return RandomBytesReader(
read_args["num_blocks_per_task"], read_args["block_size"]
)


class RandomBytesReader(Reader):
def __init__(self, num_blocks: int, block_size: int):
self.num_blocks = num_blocks
def __init__(self, num_blocks_per_task: int, block_size: int):
self.num_blocks_per_task = num_blocks_per_task
self.block_size = block_size

def estimate_inmemory_data_size(self):
return None

def get_read_tasks(self, parallelism: int):
def _blocks_generator():
for _ in range(self.num_blocks):
for _ in range(self.num_blocks_per_task):
yield pd.DataFrame({"one": [np.random.bytes(self.block_size)]})

return parallelism * [
ReadTask(
lambda: _blocks_generator(),
BlockMetadata(
num_rows=self.num_blocks,
size_bytes=self.num_blocks * self.block_size,
num_rows=self.num_blocks_per_task,
size_bytes=self.num_blocks_per_task * self.block_size,
schema=None,
input_files=None,
exec_stats=None,
Expand Down Expand Up @@ -82,81 +84,87 @@ def test_enable_in_ray_client(ray_start_cluster_enabled):
],
)
def test_dataset(
ray_start_regular_shared,
shutdown_only,
enable_dynamic_block_splitting,
target_max_block_size,
compute,
):
# Test 10 blocks from 10 tasks, each block is 1024 bytes.
num_blocks = 10
ray.shutdown()
# We need at least 2 CPUs to run a actorpool streaming
ray.init(num_cpus=2)
# Test 10 tasks, each task returning 10 blocks, each block has 1 row and each
# row has 1024 bytes.
num_blocks_per_task = 10
block_size = 1024
num_tasks = 10

ds = ray.data.read_datasource(
RandomBytesDatasource(),
parallelism=num_tasks,
num_blocks=num_blocks,
num_blocks_per_task=num_blocks_per_task,
block_size=block_size,
)
# Note the following calls to ds will not fully execute it.
assert ds.schema() is not None
assert ds.count() == num_blocks * num_tasks
assert ds.count() == num_blocks_per_task * num_tasks
assert ds.num_blocks() == num_tasks
assert ds.size_bytes() >= 0.7 * block_size * num_blocks * num_tasks
assert ds.size_bytes() >= 0.7 * block_size * num_blocks_per_task * num_tasks

map_ds = ds.map_batches(lambda x: x, compute=compute)
map_ds.fully_executed()
assert map_ds.num_blocks() == num_tasks
map_ds = ds.map_batches(
lambda x: x, batch_size=num_blocks * num_tasks, compute=compute
lambda x: x, batch_size=num_blocks_per_task * num_tasks, compute=compute
)
map_ds.fully_executed()
assert map_ds.num_blocks() == 1
map_ds = ds.map(lambda x: x, compute=compute)
map_ds.fully_executed()
assert map_ds.num_blocks() == num_blocks * num_tasks
assert map_ds.num_blocks() == num_blocks_per_task * num_tasks

ds_list = ds.split(5)
assert len(ds_list) == 5
for new_ds in ds_list:
assert new_ds.num_blocks() == num_blocks * num_tasks / 5
assert new_ds.num_blocks() == num_blocks_per_task * num_tasks / 5

train, test = ds.train_test_split(test_size=0.25)
assert train.num_blocks() == num_blocks * num_tasks * 0.75
assert test.num_blocks() == num_blocks * num_tasks * 0.25
assert train.num_blocks() == num_blocks_per_task * num_tasks * 0.75
assert test.num_blocks() == num_blocks_per_task * num_tasks * 0.25

new_ds = ds.union(ds, ds)
assert new_ds.num_blocks() == num_tasks * 3
new_ds.fully_executed()
assert new_ds.num_blocks() == num_blocks * num_tasks * 3
assert new_ds.num_blocks() == num_blocks_per_task * num_tasks * 3

new_ds = ds.random_shuffle()
assert new_ds.num_blocks() == num_tasks
new_ds = ds.randomize_block_order()
assert new_ds.num_blocks() == num_tasks
assert ds.groupby("one").count().count() == num_blocks * num_tasks
assert ds.groupby("one").count().count() == num_blocks_per_task * num_tasks

new_ds = ds.zip(ds)
new_ds.fully_executed()
assert new_ds.num_blocks() == num_blocks * num_tasks
assert new_ds.num_blocks() == num_blocks_per_task * num_tasks

assert len(ds.take(5)) == 5
assert len(ds.take_all()) == num_blocks * num_tasks
assert len(ds.take_all()) == num_blocks_per_task * num_tasks
for batch in ds.iter_batches(batch_size=10):
assert len(batch) == 10


def test_dataset_pipeline(
ray_start_regular_shared, enable_dynamic_block_splitting, target_max_block_size
):
# Test 10 blocks from 10 tasks, each block is 1024 bytes.
num_blocks = 10
# Test 10 tasks, each task returning 10 blocks, each block has 1 row and each
# row has 1024 bytes.
num_blocks_per_task = 10
block_size = 1024
num_tasks = 10

ds = ray.data.read_datasource(
RandomBytesDatasource(),
parallelism=num_tasks,
num_blocks=num_blocks,
num_blocks_per_task=num_blocks_per_task,
block_size=block_size,
)
dsp = ds.window(blocks_per_window=2)
Expand All @@ -166,7 +174,7 @@ def test_dataset_pipeline(
result_batches = list(ds.iter_batches(batch_size=5))
for batch in result_batches:
assert len(batch) == 5
assert len(result_batches) == num_blocks * num_tasks / 5
assert len(result_batches) == num_blocks_per_task * num_tasks / 5

dsp = ds.window(blocks_per_window=2)
assert dsp._length == num_tasks / 2
Expand All @@ -178,40 +186,42 @@ def test_dataset_pipeline(
def test_filter(
ray_start_regular_shared, enable_dynamic_block_splitting, target_max_block_size
):
# Test 10 blocks from 1 task, each block is 1024 bytes.
num_blocks = 10
# Test 10 tasks, each task returning 10 blocks, each block has 1 row and each
# row has 1024 bytes.
num_blocks_per_task = 10
block_size = 1024

ds = ray.data.read_datasource(
RandomBytesDatasource(),
parallelism=1,
num_blocks=num_blocks,
num_blocks_per_task=num_blocks_per_task,
block_size=block_size,
)

ds = ds.filter(lambda _: True)
ds.fully_executed()
assert ds.count() == num_blocks
assert ds.num_blocks() == num_blocks
assert ds.count() == num_blocks_per_task
assert ds.num_blocks() == num_blocks_per_task

ds = ds.filter(lambda _: False)
ds.fully_executed()
assert ds.count() == 0
assert ds.num_blocks() == num_blocks
assert ds.num_blocks() == num_blocks_per_task


def test_lazy_block_list(
shutdown_only, enable_dynamic_block_splitting, target_max_block_size
):
# Test 10 blocks from 10 tasks, each block is 1024 bytes.
num_blocks = 10
# Test 10 tasks, each task returning 10 blocks, each block has 1 row and each
# row has 1024 bytes.
num_blocks_per_task = 10
block_size = 1024
num_tasks = 10

ds = ray.data.read_datasource(
RandomBytesDatasource(),
parallelism=num_tasks,
num_blocks=num_blocks,
num_blocks_per_task=num_blocks_per_task,
block_size=block_size,
)
ds.schema()
Expand All @@ -231,18 +241,18 @@ def test_lazy_block_list(
assert len(cached_metadata) == num_tasks
for i, block_metadata in enumerate(cached_metadata):
if i == 0:
assert len(block_metadata) == num_blocks
assert len(block_metadata) == num_blocks_per_task
for m in block_metadata:
assert m.num_rows == 1
else:
assert block_metadata is None
assert len(metadata) == num_tasks - 1 + num_blocks
assert len(metadata) == num_tasks - 1 + num_blocks_per_task
for i, block_metadata in enumerate(metadata):
if i < num_blocks:
if i < num_blocks_per_task:
assert block_metadata.num_rows == 1
assert block_metadata.schema is not None
else:
assert block_metadata.num_rows == num_blocks
assert block_metadata.num_rows == num_blocks_per_task
assert block_metadata.schema is None

# Check APIs of LazyBlockList
Expand All @@ -255,12 +265,12 @@ def test_lazy_block_list(
assert len(block_lists[0]._block_partition_refs) == 2
assert len(block_lists[0]._cached_metadata) == 2

block_lists = block_list.split_by_bytes(block_size * num_blocks * 2)
block_lists = block_list.split_by_bytes(block_size * num_blocks_per_task * 2)
assert len(block_lists) == num_tasks / 2
assert len(block_lists[0]._block_partition_refs) == 2
assert len(block_lists[0]._cached_metadata) == 2

new_block_list = block_list.truncate_by_rows(num_blocks * 3)
new_block_list = block_list.truncate_by_rows(num_blocks_per_task * 3)
assert len(new_block_list._block_partition_refs) == 3
assert len(new_block_list._cached_metadata) == 3

Expand All @@ -275,7 +285,7 @@ def test_lazy_block_list(
assert len(new_block_list._cached_metadata) == num_tasks

output_blocks = block_list.get_blocks_with_metadata()
assert len(output_blocks) == num_tasks * num_blocks
assert len(output_blocks) == num_tasks * num_blocks_per_task
for _, metadata in output_blocks:
assert metadata.num_rows == 1
for _, metadata in block_list.iter_blocks_with_metadata():
Expand All @@ -291,18 +301,18 @@ def test_lazy_block_list(
assert all(map(lambda ref: ref is None, block_list._block_partition_meta_refs))
assert len(cached_metadata) == num_tasks
for block_metadata in cached_metadata:
assert len(block_metadata) == num_blocks
assert len(block_metadata) == num_blocks_per_task
for m in block_metadata:
assert m.num_rows == 1
assert len(metadata) == num_tasks * num_blocks
assert len(metadata) == num_tasks * num_blocks_per_task
for block_metadata in metadata:
assert block_metadata.num_rows == 1
assert block_metadata.schema is not None


def test_read_large_data(ray_start_cluster, enable_dynamic_block_splitting):
# Test 20G input with single task
num_blocks = 20
num_blocks_per_task = 20
block_size = 1024 * 1024 * 1024

cluster = ray_start_cluster
Expand All @@ -316,12 +326,12 @@ def foo(batch):
ds = ray.data.read_datasource(
RandomBytesDatasource(),
parallelism=1,
num_blocks=num_blocks,
num_blocks_per_task=num_blocks_per_task,
block_size=block_size,
)

ds = ds.map_batches(foo, batch_size=None)
assert ds.count() == num_blocks
assert ds.count() == num_blocks_per_task


if __name__ == "__main__":
Expand Down

0 comments on commit c01e47a

Please sign in to comment.