Skip to content

Commit

Permalink
Streaming executor fixes (#32658)
Browse files Browse the repository at this point in the history
- current resource can have none object store memory (we don't account input buffer)
- We need at least 2 CPUs to run a actorpool streaming
- We no longer use LazyBlockList and cache partial results in streaming executor

#32132
  • Loading branch information
jianoaix committed Feb 22, 2023
1 parent e0af1fb commit 875a555
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 36 deletions.
2 changes: 1 addition & 1 deletion python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def _get_or_refresh_resource_limits(self) -> ExecutionResources:
)

def _get_current_usage(self, topology: Topology) -> ExecutionResources:
cur_usage = ExecutionResources()
cur_usage = ExecutionResources(0, 0, 0)
for op, state in topology.items():
cur_usage = cur_usage.add(op.current_resource_usage())
if isinstance(op, InputDataBuffer):
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def test_bulk_lazy_eval_split_mode(shutdown_only, block_split, tmp_path):

@pytest.mark.parametrize("pipelined", [False, True])
def test_basic_actors(shutdown_only, pipelined):
ray.init(num_cpus=2)
ray.init(num_cpus=6)
n = 5
ds = ray.data.range(n)
ds = maybe_pipeline(ds, pipelined)
Expand Down Expand Up @@ -152,7 +152,7 @@ def run():


def test_callable_classes(shutdown_only):
ray.init(num_cpus=1)
ray.init(num_cpus=2)
ds = ray.data.range(10, parallelism=10)

class StatefulFn:
Expand Down
5 changes: 4 additions & 1 deletion python/ray/data/tests/test_dataset_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,16 @@ def get_relative_path(path: str) -> str:
"image-datasets/simple/image3.jpg",
]

def test_e2e_prediction(self, ray_start_regular_shared):
def test_e2e_prediction(self, shutdown_only):
from ray.train.torch import TorchCheckpoint, TorchPredictor
from ray.train.batch_predictor import BatchPredictor

from torchvision import transforms
from torchvision.models import resnet18

ray.shutdown()
ray.init(num_cpus=2)

dataset = ray.data.read_images("example:https://image-datasets/simple")
transform = transforms.ToTensor()

Expand Down
69 changes: 40 additions & 29 deletions python/ray/data/tests/test_dataset_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@
from pytest_lazyfixture import lazy_fixture


def check_num_computed(ds, expected, streaming_expected) -> None:
# When streaming executor is on, the _num_computed() is affected only
# by the ds.schema() which will still partial read the blocks, but will
# not affected by operations like take() as it's executed via streaming
# executor.
if not ray.data.context.DatasetContext.get_current().use_streaming_executor:
assert ds._plan.execute()._num_computed() == expected
else:
assert ds._plan.execute()._num_computed() == streaming_expected


@pytest.mark.parametrize(
"fs,data_path",
[
Expand Down Expand Up @@ -122,11 +133,11 @@ def test_parquet_read_basic(ray_start_regular_shared, fs, data_path):
ds = ray.data.read_parquet(data_path, filesystem=fs)

# Test metadata-only parquet ops.
assert ds._plan.execute()._num_computed() == 0
check_num_computed(ds, 0, 0)
assert ds.count() == 6
assert ds.size_bytes() > 0
assert ds.schema() is not None
assert ds._plan.execute()._num_computed() == 1
check_num_computed(ds, 1, 1)
input_files = ds.input_files()
assert len(input_files) == 2, input_files
assert "test1.parquet" in str(input_files)
Expand All @@ -139,11 +150,11 @@ def test_parquet_read_basic(ray_start_regular_shared, fs, data_path):
repr(ds) == "Dataset(num_blocks=2, num_rows=6, "
"schema={one: int64, two: string})"
), ds
assert ds._plan.execute()._num_computed() == 1
check_num_computed(ds, 1, 1)

# Forces a data read.
values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 2
values = [[s["one"], s["two"]] for s in ds.take_all()]
check_num_computed(ds, 2, 1)
assert sorted(values) == [
[1, "a"],
[2, "b"],
Expand Down Expand Up @@ -201,7 +212,7 @@ def prefetch_file_metadata(self, pieces):
)

# Expect to lazily compute all metadata correctly.
assert ds._plan.execute()._num_computed() == 0
check_num_computed(ds, 0, 0)
assert ds.count() == 6
assert ds.size_bytes() > 0
assert ds.schema() is not None
Expand All @@ -217,11 +228,11 @@ def prefetch_file_metadata(self, pieces):
repr(ds) == "Dataset(num_blocks=2, num_rows=6, "
"schema={one: int64, two: string})"
), ds
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 2)

# Forces a data read.
values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 2)
assert sorted(values) == [
[1, "a"],
[2, "b"],
Expand Down Expand Up @@ -278,7 +289,7 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path):
assert ds._meta_count() is None

# Expect to lazily compute all metadata correctly.
assert ds._plan.execute()._num_computed() == 0
check_num_computed(ds, 0, 0)
assert ds.count() == 6
assert ds.size_bytes() > 0
assert ds.schema() is not None
Expand All @@ -294,11 +305,11 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path):
repr(ds) == "Dataset(num_blocks=2, num_rows=6, "
"schema={one: int64, two: string})"
), ds
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 2)

# Forces a data read.
values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 2)
assert sorted(values) == [
[1, "a"],
[2, "b"],
Expand All @@ -319,7 +330,7 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path):

# Forces a data read.
values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 0)
assert sorted(values) == [
[1, "a"],
[2, "b"],
Expand Down Expand Up @@ -368,7 +379,7 @@ def test_parquet_read_bulk_meta_provider(ray_start_regular_shared, fs, data_path
assert ds._meta_count() is None

# Expect to lazily compute all metadata correctly.
assert ds._plan.execute()._num_computed() == 0
check_num_computed(ds, 0, 0)
assert ds.count() == 6
assert ds.size_bytes() > 0
assert ds.schema() is not None
Expand All @@ -384,11 +395,11 @@ def test_parquet_read_bulk_meta_provider(ray_start_regular_shared, fs, data_path
repr(ds) == "Dataset(num_blocks=2, num_rows=6, "
"schema={one: int64, two: string})"
), ds
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 2)

# Forces a data read.
values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 2)
assert sorted(values) == [
[1, "a"],
[2, "b"],
Expand Down Expand Up @@ -427,7 +438,7 @@ def test_parquet_read_partitioned(ray_start_regular_shared, fs, data_path):
ds = ray.data.read_parquet(data_path, filesystem=fs)

# Test metadata-only parquet ops.
assert ds._plan.execute()._num_computed() == 0
check_num_computed(ds, 0, 0)
assert ds.count() == 6
assert ds.size_bytes() > 0
assert ds.schema() is not None
Expand All @@ -443,11 +454,11 @@ def test_parquet_read_partitioned(ray_start_regular_shared, fs, data_path):
"schema={two: string, "
"one: dictionary<values=int32, indices=int32, ordered=0>})"
), ds
assert ds._plan.execute()._num_computed() == 1
check_num_computed(ds, 1, 1)

# Forces a data read.
values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 1)
assert sorted(values) == [
[1, "a"],
[1, "b"],
Expand Down Expand Up @@ -479,7 +490,7 @@ def test_parquet_read_partitioned_with_filter(ray_start_regular_shared, tmp_path
)

values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 1
check_num_computed(ds, 1, 0)
assert sorted(values) == [[1, "a"], [1, "a"]]

# 2 partitions, 1 empty partition, 2 block/read tasks, 1 empty block
Expand All @@ -489,7 +500,7 @@ def test_parquet_read_partitioned_with_filter(ray_start_regular_shared, tmp_path
)

values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 0)
assert sorted(values) == [[1, "a"], [1, "a"]]


Expand All @@ -513,7 +524,7 @@ def test_parquet_read_partitioned_explicit(ray_start_regular_shared, tmp_path):
)

# Test metadata-only parquet ops.
assert ds._plan.execute()._num_computed() == 0
check_num_computed(ds, 0, 0)
assert ds.count() == 6
assert ds.size_bytes() > 0
assert ds.schema() is not None
Expand All @@ -527,11 +538,11 @@ def test_parquet_read_partitioned_explicit(ray_start_regular_shared, tmp_path):
repr(ds) == "Dataset(num_blocks=2, num_rows=6, "
"schema={two: string, one: int32})"
), ds
assert ds._plan.execute()._num_computed() == 1
check_num_computed(ds, 1, 1)

# Forces a data read.
values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 1)
assert sorted(values) == [
[1, "a"],
[1, "b"],
Expand Down Expand Up @@ -560,15 +571,15 @@ def _block_udf(block: pa.Table):
ds = ray.data.read_parquet(str(tmp_path), parallelism=1, _block_udf=_block_udf)

ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()])
assert ds._plan.execute()._num_computed() == 1
check_num_computed(ds, 1, 0)
np.testing.assert_array_equal(sorted(ones), np.array(one_data) + 1)

# 2 blocks/read tasks

ds = ray.data.read_parquet(str(tmp_path), parallelism=2, _block_udf=_block_udf)

ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()])
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 0)
np.testing.assert_array_equal(sorted(ones), np.array(one_data) + 1)

# 2 blocks/read tasks, 1 empty block
Expand All @@ -581,7 +592,7 @@ def _block_udf(block: pa.Table):
)

ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()])
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 0)
np.testing.assert_array_equal(sorted(ones), np.array(one_data[:2]) + 1)


Expand Down Expand Up @@ -611,17 +622,17 @@ def test_parquet_read_parallel_meta_fetch(ray_start_regular_shared, fs, data_pat
ds = ray.data.read_parquet(data_path, filesystem=fs, parallelism=parallelism)

# Test metadata-only parquet ops.
assert ds._plan.execute()._num_computed() == 0
check_num_computed(ds, 0, 0)
assert ds.count() == num_dfs * 3
assert ds.size_bytes() > 0
assert ds.schema() is not None
input_files = ds.input_files()
assert len(input_files) == num_dfs, input_files
assert ds._plan.execute()._num_computed() == 1
check_num_computed(ds, 1, 1)

# Forces a data read.
values = [s["one"] for s in ds.take(limit=3 * num_dfs)]
assert ds._plan.execute()._num_computed() == parallelism
check_num_computed(ds, parallelism, 1)
assert sorted(values) == list(range(3 * num_dfs))


Expand Down
8 changes: 6 additions & 2 deletions python/ray/data/tests/test_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,9 @@ def test_optimize_equivalent_remote_args(ray_start_regular_shared):
)


def test_optimize_incompatible_stages(ray_start_regular_shared):
def test_optimize_incompatible_stages(shutdown_only):
ray.shutdown()
ray.init(num_cpus=2)
context = DatasetContext.get_current()
context.optimize_fuse_stages = True
context.optimize_fuse_read_stages = True
Expand Down Expand Up @@ -548,7 +550,9 @@ def test_optimize_incompatible_stages(ray_start_regular_shared):
)


def test_optimize_callable_classes(ray_start_regular_shared, tmp_path):
def test_optimize_callable_classes(shutdown_only, tmp_path):
ray.shutdown()
ray.init(num_cpus=2)
context = DatasetContext.get_current()
context.optimize_fuse_stages = True
context.optimize_fuse_read_stages = True
Expand Down
4 changes: 3 additions & 1 deletion python/ray/data/tests/test_size_estimation.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ def gen(name):


@pytest.mark.parametrize("use_actors", [False, True])
def test_split_map(ray_start_regular_shared, use_actors):
def test_split_map(shutdown_only, use_actors):
ray.shutdown()
ray.init(num_cpus=2)
kwargs = {}
if use_actors:
kwargs = {"compute": "actors"}
Expand Down

0 comments on commit 875a555

Please sign in to comment.