From edc51bdce6a5ec40e11f7d98423e1e39f7c37eda Mon Sep 17 00:00:00 2001 From: jianoaix Date: Thu, 8 Dec 2022 23:20:23 +0000 Subject: [PATCH 01/19] Fix read_tfrecords_benchmark nightly test Signed-off-by: jianoaix --- release/nightly_tests/dataset/read_tfrecords_benchmark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release/nightly_tests/dataset/read_tfrecords_benchmark.py b/release/nightly_tests/dataset/read_tfrecords_benchmark.py index 45a5cea2d24bd..bf189b450d8f4 100644 --- a/release/nightly_tests/dataset/read_tfrecords_benchmark.py +++ b/release/nightly_tests/dataset/read_tfrecords_benchmark.py @@ -25,7 +25,7 @@ def generate_tfrecords_from_images( # Convert images from NumPy to bytes def images_to_bytes(batch): - images_as_bytes = [image.tobytes() for image in batch] + images_as_bytes = [image.tobytes() for image in batch.values()] return pa.table({"image": images_as_bytes}) ds = ds.map_batches(images_to_bytes, batch_format="numpy") From bfd724d8ed3afa5c40a8dd17d85f6aee783ab0b1 Mon Sep 17 00:00:00 2001 From: jianoaix Date: Mon, 13 Feb 2023 21:14:18 +0000 Subject: [PATCH 02/19] Enable streaming executor by default --- python/ray/data/context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/context.py b/python/ray/data/context.py index cb953f04696a1..96cd4f86d00e1 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -76,7 +76,7 @@ # Whether to use the streaming executor. This only has an effect if the new execution # backend is enabled. DEFAULT_USE_STREAMING_EXECUTOR = bool( - int(os.environ.get("RAY_DATASET_USE_STREAMING_EXECUTOR", "0")) + int(os.environ.get("RAY_DATASET_USE_STREAMING_EXECUTOR", "1")) ) # Whether to eagerly free memory (new backend only). From e1d0df53cf1ab31dc78d058acd4142281465c4db Mon Sep 17 00:00:00 2001 From: jianoaix Date: Thu, 16 Feb 2023 18:28:21 +0000 Subject: [PATCH 03/19] current resource usage --- python/ray/data/_internal/execution/streaming_executor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index db3783c897da2..8647b7f6aa129 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -236,7 +236,8 @@ def _get_current_usage(self, topology: Topology) -> ExecutionResources: if isinstance(op, InputDataBuffer): continue # Don't count input refs towards dynamic memory usage. for bundle in state.outqueue: - cur_usage.object_store_memory += bundle.size_bytes() + if cur_usage.object_store_memory: + cur_usage.object_store_memory += bundle.size_bytes() return cur_usage def _report_current_usage( From 94a88eee5024b4ea00447833839325df34af8965 Mon Sep 17 00:00:00 2001 From: jianoaix Date: Thu, 16 Feb 2023 21:38:25 +0000 Subject: [PATCH 04/19] parquet test: num computed --- python/ray/data/tests/test_dataset_parquet.py | 65 ++++++++++--------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/python/ray/data/tests/test_dataset_parquet.py b/python/ray/data/tests/test_dataset_parquet.py index 472b69fb0335d..7e98c7fd0e326 100644 --- a/python/ray/data/tests/test_dataset_parquet.py +++ b/python/ray/data/tests/test_dataset_parquet.py @@ -31,6 +31,11 @@ from pytest_lazyfixture import lazy_fixture +def check_num_computed(ds, expected) -> None: + if not ray.data.context.DatasetContext.get_current().use_streaming_executor: + ds._plan.execute()._num_computed() == expected + + @pytest.mark.parametrize( "fs,data_path", [ @@ -122,11 +127,11 @@ def test_parquet_read_basic(ray_start_regular_shared, fs, data_path): ds = ray.data.read_parquet(data_path, filesystem=fs) # Test metadata-only parquet ops. - assert ds._plan.execute()._num_computed() == 0 + check_num_computed(ds, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 - assert ds.schema() is not None - assert ds._plan.execute()._num_computed() == 1 + # assert ds.schema() is not None + check_num_computed(ds, 1) input_files = ds.input_files() assert len(input_files) == 2, input_files assert "test1.parquet" in str(input_files) @@ -139,11 +144,11 @@ def test_parquet_read_basic(ray_start_regular_shared, fs, data_path): repr(ds) == "Dataset(num_blocks=2, num_rows=6, " "schema={one: int64, two: string})" ), ds - assert ds._plan.execute()._num_computed() == 1 + check_num_computed(ds, 1) # Forces a data read. - values = [[s["one"], s["two"]] for s in ds.take()] - assert ds._plan.execute()._num_computed() == 2 + values = [[s["one"], s["two"]] for s in ds.take_all()] + check_num_computed(ds, 2) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -201,7 +206,7 @@ def prefetch_file_metadata(self, pieces): ) # Expect to lazily compute all metadata correctly. - assert ds._plan.execute()._num_computed() == 0 + check_num_computed(ds, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 assert ds.schema() is not None @@ -217,11 +222,11 @@ def prefetch_file_metadata(self, pieces): repr(ds) == "Dataset(num_blocks=2, num_rows=6, " "schema={one: int64, two: string})" ), ds - assert ds._plan.execute()._num_computed() == 2 + check_num_computed(ds, 2) # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - assert ds._plan.execute()._num_computed() == 2 + check_num_computed(ds, 2) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -278,7 +283,7 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path): assert ds._meta_count() is None # Expect to lazily compute all metadata correctly. - assert ds._plan.execute()._num_computed() == 0 + check_num_computed(ds, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 assert ds.schema() is not None @@ -294,11 +299,11 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path): repr(ds) == "Dataset(num_blocks=2, num_rows=6, " "schema={one: int64, two: string})" ), ds - assert ds._plan.execute()._num_computed() == 2 + check_num_computed(ds, 2) # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - assert ds._plan.execute()._num_computed() == 2 + check_num_computed(ds, 2) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -319,7 +324,7 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path): # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - assert ds._plan.execute()._num_computed() == 2 + check_num_computed(ds, 2) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -368,7 +373,7 @@ def test_parquet_read_bulk_meta_provider(ray_start_regular_shared, fs, data_path assert ds._meta_count() is None # Expect to lazily compute all metadata correctly. - assert ds._plan.execute()._num_computed() == 0 + check_num_computed(ds, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 assert ds.schema() is not None @@ -384,11 +389,11 @@ def test_parquet_read_bulk_meta_provider(ray_start_regular_shared, fs, data_path repr(ds) == "Dataset(num_blocks=2, num_rows=6, " "schema={one: int64, two: string})" ), ds - assert ds._plan.execute()._num_computed() == 2 + check_num_computed(ds, 2) # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - assert ds._plan.execute()._num_computed() == 2 + check_num_computed(ds, 2) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -427,7 +432,7 @@ def test_parquet_read_partitioned(ray_start_regular_shared, fs, data_path): ds = ray.data.read_parquet(data_path, filesystem=fs) # Test metadata-only parquet ops. - assert ds._plan.execute()._num_computed() == 0 + check_num_computed(ds, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 assert ds.schema() is not None @@ -443,11 +448,11 @@ def test_parquet_read_partitioned(ray_start_regular_shared, fs, data_path): "schema={two: string, " "one: dictionary})" ), ds - assert ds._plan.execute()._num_computed() == 1 + check_num_computed(ds, 1) # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - assert ds._plan.execute()._num_computed() == 2 + check_num_computed(ds, 2) assert sorted(values) == [ [1, "a"], [1, "b"], @@ -479,7 +484,7 @@ def test_parquet_read_partitioned_with_filter(ray_start_regular_shared, tmp_path ) values = [[s["one"], s["two"]] for s in ds.take()] - assert ds._plan.execute()._num_computed() == 1 + check_num_computed(ds, 1) assert sorted(values) == [[1, "a"], [1, "a"]] # 2 partitions, 1 empty partition, 2 block/read tasks, 1 empty block @@ -489,7 +494,7 @@ def test_parquet_read_partitioned_with_filter(ray_start_regular_shared, tmp_path ) values = [[s["one"], s["two"]] for s in ds.take()] - assert ds._plan.execute()._num_computed() == 2 + check_num_computed(ds, 2) assert sorted(values) == [[1, "a"], [1, "a"]] @@ -513,7 +518,7 @@ def test_parquet_read_partitioned_explicit(ray_start_regular_shared, tmp_path): ) # Test metadata-only parquet ops. - assert ds._plan.execute()._num_computed() == 0 + check_num_computed(ds, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 assert ds.schema() is not None @@ -527,11 +532,11 @@ def test_parquet_read_partitioned_explicit(ray_start_regular_shared, tmp_path): repr(ds) == "Dataset(num_blocks=2, num_rows=6, " "schema={two: string, one: int32})" ), ds - assert ds._plan.execute()._num_computed() == 1 + check_num_computed(ds, 1) # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - assert ds._plan.execute()._num_computed() == 2 + check_num_computed(ds, 2) assert sorted(values) == [ [1, "a"], [1, "b"], @@ -560,7 +565,7 @@ def _block_udf(block: pa.Table): ds = ray.data.read_parquet(str(tmp_path), parallelism=1, _block_udf=_block_udf) ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()]) - assert ds._plan.execute()._num_computed() == 1 + check_num_computed(ds, 1) np.testing.assert_array_equal(sorted(ones), np.array(one_data) + 1) # 2 blocks/read tasks @@ -568,7 +573,7 @@ def _block_udf(block: pa.Table): ds = ray.data.read_parquet(str(tmp_path), parallelism=2, _block_udf=_block_udf) ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()]) - assert ds._plan.execute()._num_computed() == 2 + check_num_computed(ds, 2) np.testing.assert_array_equal(sorted(ones), np.array(one_data) + 1) # 2 blocks/read tasks, 1 empty block @@ -581,7 +586,7 @@ def _block_udf(block: pa.Table): ) ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()]) - assert ds._plan.execute()._num_computed() == 2 + check_num_computed(ds, 2) np.testing.assert_array_equal(sorted(ones), np.array(one_data[:2]) + 1) @@ -611,17 +616,17 @@ def test_parquet_read_parallel_meta_fetch(ray_start_regular_shared, fs, data_pat ds = ray.data.read_parquet(data_path, filesystem=fs, parallelism=parallelism) # Test metadata-only parquet ops. - assert ds._plan.execute()._num_computed() == 0 + check_num_computed(ds, 0) assert ds.count() == num_dfs * 3 assert ds.size_bytes() > 0 assert ds.schema() is not None input_files = ds.input_files() assert len(input_files) == num_dfs, input_files - assert ds._plan.execute()._num_computed() == 1 + check_num_computed(ds, 1) # Forces a data read. values = [s["one"] for s in ds.take(limit=3 * num_dfs)] - assert ds._plan.execute()._num_computed() == parallelism + check_num_computed(ds, parallelism) assert sorted(values) == list(range(3 * num_dfs)) From c670a3b584968dac8d48d1a2f361c18abc57d732 Mon Sep 17 00:00:00 2001 From: jianoaix Date: Thu, 16 Feb 2023 23:38:15 +0000 Subject: [PATCH 05/19] resource limits --- python/ray/data/tests/test_dataset.py | 4 ++-- python/ray/data/tests/test_dataset_image.py | 5 ++++- python/ray/data/tests/test_optimize.py | 8 ++++++-- python/ray/data/tests/test_size_estimation.py | 4 +++- 4 files changed, 15 insertions(+), 6 deletions(-) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 933dc3952e1da..7322c9bc7df30 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -90,7 +90,7 @@ def test_bulk_lazy_eval_split_mode(shutdown_only, block_split, tmp_path): @pytest.mark.parametrize("pipelined", [False, True]) def test_basic_actors(shutdown_only, pipelined): - ray.init(num_cpus=2) + ray.init(num_cpus=6) n = 5 ds = ray.data.range(n) ds = maybe_pipeline(ds, pipelined) @@ -152,7 +152,7 @@ def run(): def test_callable_classes(shutdown_only): - ray.init(num_cpus=1) + ray.init(num_cpus=2) ds = ray.data.range(10, parallelism=10) class StatefulFn: diff --git a/python/ray/data/tests/test_dataset_image.py b/python/ray/data/tests/test_dataset_image.py index c544744e41a20..1d17ece2d860e 100644 --- a/python/ray/data/tests/test_dataset_image.py +++ b/python/ray/data/tests/test_dataset_image.py @@ -114,13 +114,16 @@ def get_relative_path(path: str) -> str: "image-datasets/simple/image3.jpg", ] - def test_e2e_prediction(self, ray_start_regular_shared): + def test_e2e_prediction(self, shutdown_only): from ray.train.torch import TorchCheckpoint, TorchPredictor from ray.train.batch_predictor import BatchPredictor from torchvision import transforms from torchvision.models import resnet18 + ray.shutdown() + ray.init(num_cpus=2) + dataset = ray.data.read_images("example://image-datasets/simple") transform = transforms.ToTensor() diff --git a/python/ray/data/tests/test_optimize.py b/python/ray/data/tests/test_optimize.py index 15793f4ed3b08..9267d0b783d60 100644 --- a/python/ray/data/tests/test_optimize.py +++ b/python/ray/data/tests/test_optimize.py @@ -508,7 +508,9 @@ def test_optimize_equivalent_remote_args(ray_start_regular_shared): ) -def test_optimize_incompatible_stages(ray_start_regular_shared): +def test_optimize_incompatible_stages(shutdown_only): + ray.shutdown() + ray.init(num_cpus=2) context = DatasetContext.get_current() context.optimize_fuse_stages = True context.optimize_fuse_read_stages = True @@ -548,7 +550,9 @@ def test_optimize_incompatible_stages(ray_start_regular_shared): ) -def test_optimize_callable_classes(ray_start_regular_shared, tmp_path): +def test_optimize_callable_classes(shutdown_only, tmp_path): + ray.shutdown() + ray.init(num_cpus=2) context = DatasetContext.get_current() context.optimize_fuse_stages = True context.optimize_fuse_read_stages = True diff --git a/python/ray/data/tests/test_size_estimation.py b/python/ray/data/tests/test_size_estimation.py index 018a853942601..63426126ebb06 100644 --- a/python/ray/data/tests/test_size_estimation.py +++ b/python/ray/data/tests/test_size_estimation.py @@ -193,7 +193,9 @@ def gen(name): @pytest.mark.parametrize("use_actors", [False, True]) -def test_split_map(ray_start_regular_shared, use_actors): +def test_split_map(shutdown_only, use_actors): + ray.shutdown() + ray.init(num_cpus=2) kwargs = {} if use_actors: kwargs = {"compute": "actors"} From c5a6aa9d7a9b57ef9702d93c14d4e25155736069 Mon Sep 17 00:00:00 2001 From: jianoaix Date: Wed, 22 Feb 2023 17:57:56 +0000 Subject: [PATCH 06/19] merge --- python/ray/data/_internal/execution/streaming_executor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 3d9f5779bd893..df3ebed6fd14a 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -236,8 +236,7 @@ def _get_current_usage(self, topology: Topology) -> ExecutionResources: if isinstance(op, InputDataBuffer): continue # Don't count input refs towards dynamic memory usage. for bundle in state.outqueue: - if cur_usage.object_store_memory: - cur_usage.object_store_memory += bundle.size_bytes() + cur_usage.object_store_memory += bundle.size_bytes() return cur_usage def _report_current_usage( From 6c82ad253d45ccfed10f95b2eac2467f7b3da723 Mon Sep 17 00:00:00 2001 From: jianoaix Date: Wed, 22 Feb 2023 18:51:28 +0000 Subject: [PATCH 07/19] fix treating numpy as block format --- python/ray/data/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index bd61c92f645e2..065d3098d1a0c 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2825,7 +2825,7 @@ def iter_rows(self, *, prefetch_blocks: int = 0) -> Iterator[Union[T, TableRow]] for batch in self.iter_batches( batch_size=None, prefetch_blocks=prefetch_blocks, batch_format=batch_format ): - batch = BlockAccessor.for_block(batch) + batch = BlockAccessor.for_block(BlockAccessor.batch_to_block(batch)) for row in batch.iter_rows(): yield row From 4cbcbaeec4a102abfac13b1128f92c461a033dfd Mon Sep 17 00:00:00 2001 From: jianoaix Date: Wed, 22 Feb 2023 19:17:15 +0000 Subject: [PATCH 08/19] fix issues related to default dataset_format and partial execution in LazyBlocklist --- python/ray/data/tests/test_dataset.py | 46 +++++++++++++++++++++------ 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 7322c9bc7df30..33624bd25d03c 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -1369,17 +1369,26 @@ def test_count_lazy(ray_start_regular_shared): def test_lazy_loading_exponential_rampup(ray_start_regular_shared): ds = ray.data.range(100, parallelism=20) - assert ds._plan.execute()._num_computed() == 0 + + def check_num_computed(expected): + if ray.data.context.DatasetContext.get_current().use_streaming_executor: + # In streaing executor, ds.take() will not invoke partial execution + # in LazyBlocklist. + assert ds._plan.execute()._num_computed() == 0 + else: + assert ds._plan.execute()._num_computed() == expected + + check_num_computed(0) assert ds.take(10) == list(range(10)) - assert ds._plan.execute()._num_computed() == 2 + check_num_computed(2) assert ds.take(20) == list(range(20)) - assert ds._plan.execute()._num_computed() == 4 + check_num_computed(4) assert ds.take(30) == list(range(30)) - assert ds._plan.execute()._num_computed() == 8 + check_num_computed(8) assert ds.take(50) == list(range(50)) - assert ds._plan.execute()._num_computed() == 16 + check_num_computed(16) assert ds.take(100) == list(range(100)) - assert ds._plan.execute()._num_computed() == 20 + check_num_computed(20) def test_dataset_repr(ray_start_regular_shared): @@ -1696,7 +1705,14 @@ def to_pylist(table): # Default ArrowRows. for row, t_row in zip(ds.iter_rows(), to_pylist(t)): assert isinstance(row, TableRow) - assert isinstance(row, ArrowRow) + # In streaming, we set batch_format to "default" because calling + # ds.dataset_format() will still invoke bulk execution and we want + # to avoid that. As a result, it's receiving PandasRow (the defaut + # batch format). + if ray.data.context.DatasetContext.get_current().use_streaming_executor: + assert isinstance(row, PandasRow) + else: + assert isinstance(row, ArrowRow) assert row == t_row # PandasRows after conversion. @@ -1710,7 +1726,14 @@ def to_pylist(table): # Prefetch. for row, t_row in zip(ds.iter_rows(prefetch_blocks=1), to_pylist(t)): assert isinstance(row, TableRow) - assert isinstance(row, ArrowRow) + # In streaming, we set batch_format to "default" because calling + # ds.dataset_format() will still invoke bulk execution and we want + # to avoid that. As a result, it's receiving PandasRow (the defaut + # batch format). + if ray.data.context.DatasetContext.get_current().use_streaming_executor: + assert isinstance(row, PandasRow) + else: + assert isinstance(row, ArrowRow) assert row == t_row @@ -2181,7 +2204,12 @@ def test_lazy_loading_iter_batches_exponential_rampup(ray_start_regular_shared): ds = ray.data.range(32, parallelism=8) expected_num_blocks = [1, 2, 4, 4, 8, 8, 8, 8] for _, expected in zip(ds.iter_batches(batch_size=None), expected_num_blocks): - assert ds._plan.execute()._num_computed() == expected + if ray.data.context.DatasetContext.get_current().use_streaming_executor: + # In streaming execution of ds.iter_batches(), there is no partial + # execution so _num_computed() in LazyBlocklist is 0. + assert ds._plan.execute()._num_computed() == 0 + else: + assert ds._plan.execute()._num_computed() == expected def test_add_column(ray_start_regular_shared): From 82bc7a7feeeecd2fe3811dc891aeb9f2b8f7d594 Mon Sep 17 00:00:00 2001 From: jianoaix Date: Wed, 22 Feb 2023 23:39:45 +0000 Subject: [PATCH 09/19] fix incorrect ownership resolution from bundle iter to blocklist --- python/ray/data/_internal/execution/legacy_compat.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/legacy_compat.py b/python/ray/data/_internal/execution/legacy_compat.py index e5d59d0f3106e..0579f38c2cea0 100644 --- a/python/ray/data/_internal/execution/legacy_compat.py +++ b/python/ray/data/_internal/execution/legacy_compat.py @@ -266,11 +266,13 @@ def bulk_fn( def _bundles_to_block_list(bundles: Iterator[RefBundle]) -> BlockList: blocks, metadata = [], [] + owns_blocks = True for ref_bundle in bundles: + if not ref_bundle.owns_blocks: + owns_blocks = False for block, meta in ref_bundle.blocks: blocks.append(block) metadata.append(meta) - owns_blocks = all(b.owns_blocks for b in bundles) return BlockList(blocks, metadata, owned_by_consumer=owns_blocks) From 548f5cf3cc95d8b0b7b0c09ce6b879f8df3d4254 Mon Sep 17 00:00:00 2001 From: jianoaix Date: Fri, 24 Feb 2023 22:21:38 +0000 Subject: [PATCH 10/19] rewind the plan when clear snapshot --- python/ray/data/_internal/plan.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index bf53ec0668bef..92ca8bf2f9e5a 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -610,6 +610,10 @@ def clear_block_refs(self) -> None: This will render the plan un-executable unless the root is a LazyBlockList.""" self._in_blocks.clear() + self._clear_snapshot() + + def _clear_snapshot(self) -> None: + """Clear the snapshot kept in the plan to the beginning state.""" self._snapshot_blocks = None self._snapshot_stats = None # We're erasing the snapshot, so put all stages into the "after snapshot" @@ -691,7 +695,7 @@ def _get_source_blocks_and_stages( stats = self._snapshot_stats # Unlink the snapshot blocks from the plan so we can eagerly reclaim the # snapshot block memory after the first stage is done executing. - self._snapshot_blocks = None + self._clear_snapshot() else: # Snapshot exists but has been cleared, so we need to recompute from the # source (input blocks). From aae0e542d99ac28495a058ff862597911046c58c Mon Sep 17 00:00:00 2001 From: jianoaix Date: Fri, 24 Feb 2023 23:02:37 +0000 Subject: [PATCH 11/19] propagate the remote args --- python/ray/data/_internal/execution/legacy_compat.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/legacy_compat.py b/python/ray/data/_internal/execution/legacy_compat.py index 0579f38c2cea0..7e5b63250fc59 100644 --- a/python/ray/data/_internal/execution/legacy_compat.py +++ b/python/ray/data/_internal/execution/legacy_compat.py @@ -125,6 +125,7 @@ def _blocks_to_input_buffer(blocks: BlockList, owns_blocks: bool) -> PhysicalOpe if hasattr(blocks, "_tasks"): read_tasks = blocks._tasks + remote_args = blocks._remote_args assert all(isinstance(t, ReadTask) for t in read_tasks), read_tasks inputs = InputDataBuffer( [ @@ -157,7 +158,7 @@ def do_read(blocks: Iterator[Block], ctx: TaskContext) -> Iterator[Block]: for read_task in blocks: yield from read_task() - return MapOperator.create(do_read, inputs, name="DoRead") + return MapOperator.create(do_read, inputs, name="DoRead", ray_remote_args=remote_args) else: output = _block_list_to_bundles(blocks, owns_blocks=owns_blocks) for i in output: From 2f51cc593127940731a3739642e05ce10ac1070b Mon Sep 17 00:00:00 2001 From: jianoaix Date: Fri, 24 Feb 2023 23:24:35 +0000 Subject: [PATCH 12/19] pandasrow v.s. pyarrowrow --- python/ray/data/tests/test_dataset.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 33624bd25d03c..b18a3929400dd 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -1542,7 +1542,14 @@ def test_convert_types(ray_start_regular_shared): arrow_ds = ray.data.range_table(1) assert arrow_ds.map(lambda x: "plain_{}".format(x["value"])).take() == ["plain_0"] - assert arrow_ds.map(lambda x: {"a": (x["value"],)}).take() == [{"a": [0]}] + # In streaming, we set batch_format to "default" (because calling + # ds.dataset_format() will still invoke bulk execution and we want + # to avoid that). As a result, it's receiving PandasRow (the defaut + # batch format), which unwraps [0] to plain 0. + if ray.data.context.DatasetContext.get_current().use_streaming_executor: + assert arrow_ds.map(lambda x: {"a": (x["value"],)}).take() == [{"a": 0}] + else: + assert arrow_ds.map(lambda x: {"a": (x["value"],)}).take() == [{"a": [0]}] def test_from_items(ray_start_regular_shared): From 69328cbdffdd6aba46247abbddbd62a5f8aee969 Mon Sep 17 00:00:00 2001 From: jianoaix Date: Fri, 24 Feb 2023 23:41:09 +0000 Subject: [PATCH 13/19] fix stats --- python/ray/data/tests/test_stats.py | 42 ++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index e3920dea53273..eea49d805719f 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -35,9 +35,14 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): context.optimize_fuse_stages = True if context.new_execution_backend: - logger = DatasetLogger("ray.data._internal.execution.bulk_executor").get_logger( - log_to_stdout=enable_auto_log_stats, - ) + if context.use_streaming_executor: + logger = DatasetLogger("ray.data._internal.execution.streaming_executor").get_logger( + log_to_stdout=enable_auto_log_stats, + ) + else: + logger = DatasetLogger("ray.data._internal.execution.bulk_executor").get_logger( + log_to_stdout=enable_auto_log_stats, + ) else: logger = DatasetLogger("ray.data._internal.plan").get_logger( log_to_stdout=enable_auto_log_stats, @@ -111,9 +116,23 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): stats = canonicalize(ds.fully_executed().stats()) if context.new_execution_backend: - assert ( - stats - == """Stage N read->MapBatches(dummy_map_batches): N/N blocks executed in T + if context.use_streaming_executor: + assert( + stats + == """Stage N read->MapBatches(dummy_map_batches)->map: N/N blocks executed in T +* Remote wall time: T min, T max, T mean, T total +* Remote cpu time: T min, T max, T mean, T total +* Peak heap memory usage (MiB): N min, N max, N mean +* Output num rows: N min, N max, N mean, N total +* Output size bytes: N min, N max, N mean, N total +* Tasks per node: N min, N max, N mean; N nodes used +* Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, 'obj_store_mem_peak': N} +""" + ) + else: + assert ( + stats + == """Stage N read->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -364,9 +383,14 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ context.optimize_fuse_stages = True if context.new_execution_backend: - logger = DatasetLogger("ray.data._internal.execution.bulk_executor").get_logger( - log_to_stdout=enable_auto_log_stats, - ) + if context.use_streaming_executor: + logger = DatasetLogger("ray.data._internal.execution.streaming_executor").get_logger( + log_to_stdout=enable_auto_log_stats, + ) + else: + logger = DatasetLogger("ray.data._internal.execution.bulk_executor").get_logger( + log_to_stdout=enable_auto_log_stats, + ) else: logger = DatasetLogger("ray.data._internal.plan").get_logger( log_to_stdout=enable_auto_log_stats, From 59ec39b19819e3bbfb2283c033161e4cea7c9b0f Mon Sep 17 00:00:00 2001 From: jianoaix Date: Sat, 25 Feb 2023 00:05:49 +0000 Subject: [PATCH 14/19] fix arrowrow v.s. pandasrow regarding column ordering --- .../ray/data/tests/test_dataset_tfrecords.py | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/python/ray/data/tests/test_dataset_tfrecords.py b/python/ray/data/tests/test_dataset_tfrecords.py index e61493947b6e3..d683bf630dffb 100644 --- a/python/ray/data/tests/test_dataset_tfrecords.py +++ b/python/ray/data/tests/test_dataset_tfrecords.py @@ -244,13 +244,29 @@ def test_readback_tfrecords(ray_start_regular_shared, tmp_path): # for type inference involving partially missing columns. parallelism=1, ) - # Write the TFRecords. ds.write_tfrecords(tmp_path) - # Read the TFRecords. readback_ds = ray.data.read_tfrecords(tmp_path) - assert ds.take() == readback_ds.take() + if not ray.data.context.DatasetContext.get_current().use_streaming_executor: + assert ds.take() == readback_ds.take() + else: + # In streaming, we set batch_format to "default" (because calling + # ds.dataset_format() will still invoke bulk execution and we want + # to avoid that). As a result, it's receiving PandasRow (the defaut + # batch format), which doesn't have the same ordering of columns as + # the ArrowRow. + from ray.data.block import BlockAccessor + def get_rows(ds): + rows = [] + for batch in ds.iter_batches( + batch_size=None, batch_format="pyarrow" + ): + batch = BlockAccessor.for_block(BlockAccessor.batch_to_block(batch)) + for row in batch.iter_rows(): + rows.append(row) + return rows + assert get_rows(ds) == get_rows(readback_ds) def test_write_invalid_tfrecords(ray_start_regular_shared, tmp_path): From 322b353a06272c6a361442de047e547f90f519c4 Mon Sep 17 00:00:00 2001 From: jianoaix Date: Sat, 25 Feb 2023 00:06:23 +0000 Subject: [PATCH 15/19] lint --- .../data/_internal/execution/legacy_compat.py | 4 +- python/ray/data/tests/test_dataset1.py | 124 ++++++++++++++++++ .../ray/data/tests/test_dataset_tfrecords.py | 8 +- python/ray/data/tests/test_stats.py | 20 ++- 4 files changed, 145 insertions(+), 11 deletions(-) create mode 100644 python/ray/data/tests/test_dataset1.py diff --git a/python/ray/data/_internal/execution/legacy_compat.py b/python/ray/data/_internal/execution/legacy_compat.py index 7e5b63250fc59..e6f146dd376bf 100644 --- a/python/ray/data/_internal/execution/legacy_compat.py +++ b/python/ray/data/_internal/execution/legacy_compat.py @@ -158,7 +158,9 @@ def do_read(blocks: Iterator[Block], ctx: TaskContext) -> Iterator[Block]: for read_task in blocks: yield from read_task() - return MapOperator.create(do_read, inputs, name="DoRead", ray_remote_args=remote_args) + return MapOperator.create( + do_read, inputs, name="DoRead", ray_remote_args=remote_args + ) else: output = _block_list_to_bundles(blocks, owns_blocks=owns_blocks) for i in output: diff --git a/python/ray/data/tests/test_dataset1.py b/python/ray/data/tests/test_dataset1.py new file mode 100644 index 0000000000000..3389797b6413c --- /dev/null +++ b/python/ray/data/tests/test_dataset1.py @@ -0,0 +1,124 @@ +import itertools +import math +import os +import random +import signal +import time +from unittest.mock import patch + +import numpy as np +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq +import pytest + +import ray +from ray._private.test_utils import wait_for_condition +from ray.air.util.tensor_extensions.arrow import ArrowVariableShapedTensorType +from ray.air.util.tensor_extensions.utils import _create_possibly_ragged_ndarray +from ray.data._internal.dataset_logger import DatasetLogger +from ray.data._internal.stats import _StatsActor +from ray.data._internal.arrow_block import ArrowRow +from ray.data._internal.block_builder import BlockBuilder +from ray.data._internal.lazy_block_list import LazyBlockList +from ray.data._internal.pandas_block import PandasRow +from ray.data.aggregate import AggregateFn, Count, Max, Mean, Min, Std, Sum +from ray.data.block import BlockAccessor, BlockMetadata +from ray.data.context import DatasetContext +from ray.data.dataset import Dataset, _sliding_window +from ray.data.datasource.datasource import Datasource, ReadTask +from ray.data.datasource.csv_datasource import CSVDatasource +from ray.data.extensions.tensor_extension import ( + ArrowTensorArray, + ArrowTensorType, + ArrowVariableShapedTensorArray, + TensorArray, + TensorDtype, +) +from ray.data.row import TableRow +from ray.data.tests.conftest import * # noqa +from ray.tests.conftest import * # noqa +from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy + + +def maybe_pipeline(ds, enabled): + if enabled: + return ds.window(blocks_per_window=1) + else: + return ds + + +class SlowCSVDatasource(CSVDatasource): + def _read_stream(self, f: "pa.NativeFile", path: str, **reader_args): + for block in CSVDatasource._read_stream(self, f, path, **reader_args): + time.sleep(3) + yield block + + +@ray.remote +class Counter: + def __init__(self): + self.value = 0 + + def increment(self): + self.value += 1 + return self.value + + +class FlakyCSVDatasource(CSVDatasource): + def __init__(self): + self.counter = Counter.remote() + + def _read_stream(self, f: "pa.NativeFile", path: str, **reader_args): + count = self.counter.increment.remote() + if ray.get(count) == 1: + raise ValueError("oops") + else: + for block in CSVDatasource._read_stream(self, f, path, **reader_args): + yield block + + def _write_block(self, f: "pa.NativeFile", block: BlockAccessor, **writer_args): + count = self.counter.increment.remote() + if ray.get(count) == 1: + raise ValueError("oops") + else: + CSVDatasource._write_block(self, f, block, **writer_args) + + +def test_dataset_retry_exceptions(ray_start_regular, local_path): + df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) + path1 = os.path.join(local_path, "test1.csv") + df1.to_csv(path1, index=False, storage_options={}) + # ds1 = ray.data.read_datasource(FlakyCSVDatasource(), parallelism=1, paths=path1) + # ds1.write_datasource(FlakyCSVDatasource(), path=local_path, dataset_uuid="data") + # assert df1.equals( + # pd.read_csv(os.path.join(local_path, "data_000000.csv"), storage_options={}) + # ) + + counter = Counter.remote() + + def flaky_mapper(x): + count = counter.increment.remote() + if ray.get(count) == 1: + raise ValueError("oops") + else: + return ray.get(count) + + # assert sorted(ds1.map(flaky_mapper).take()) == [2, 3, 4] + + with pytest.raises(ValueError): + ds = ray.data.read_datasource( + FlakyCSVDatasource(), + parallelism=1, + paths=path1, + ray_remote_args={"retry_exceptions": False}, + ) + print("XXXXX ds: plan:", ds._plan) + for _ in ds.iter_batches(): + pass + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/data/tests/test_dataset_tfrecords.py b/python/ray/data/tests/test_dataset_tfrecords.py index d683bf630dffb..a6b2a702cfccd 100644 --- a/python/ray/data/tests/test_dataset_tfrecords.py +++ b/python/ray/data/tests/test_dataset_tfrecords.py @@ -251,21 +251,21 @@ def test_readback_tfrecords(ray_start_regular_shared, tmp_path): if not ray.data.context.DatasetContext.get_current().use_streaming_executor: assert ds.take() == readback_ds.take() else: - # In streaming, we set batch_format to "default" (because calling + # In streaming, we set batch_format to "default" (because calling # ds.dataset_format() will still invoke bulk execution and we want # to avoid that). As a result, it's receiving PandasRow (the defaut # batch format), which doesn't have the same ordering of columns as # the ArrowRow. from ray.data.block import BlockAccessor + def get_rows(ds): rows = [] - for batch in ds.iter_batches( - batch_size=None, batch_format="pyarrow" - ): + for batch in ds.iter_batches(batch_size=None, batch_format="pyarrow"): batch = BlockAccessor.for_block(BlockAccessor.batch_to_block(batch)) for row in batch.iter_rows(): rows.append(row) return rows + assert get_rows(ds) == get_rows(readback_ds) diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index eea49d805719f..8196a4f1c14b4 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -36,11 +36,15 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): if context.new_execution_backend: if context.use_streaming_executor: - logger = DatasetLogger("ray.data._internal.execution.streaming_executor").get_logger( + logger = DatasetLogger( + "ray.data._internal.execution.streaming_executor" + ).get_logger( log_to_stdout=enable_auto_log_stats, ) else: - logger = DatasetLogger("ray.data._internal.execution.bulk_executor").get_logger( + logger = DatasetLogger( + "ray.data._internal.execution.bulk_executor" + ).get_logger( log_to_stdout=enable_auto_log_stats, ) else: @@ -117,7 +121,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): if context.new_execution_backend: if context.use_streaming_executor: - assert( + assert ( stats == """Stage N read->MapBatches(dummy_map_batches)->map: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total @@ -160,7 +164,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): * In user code: T * Total time: T """ - ) + ) else: assert ( stats @@ -384,11 +388,15 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ if context.new_execution_backend: if context.use_streaming_executor: - logger = DatasetLogger("ray.data._internal.execution.streaming_executor").get_logger( + logger = DatasetLogger( + "ray.data._internal.execution.streaming_executor" + ).get_logger( log_to_stdout=enable_auto_log_stats, ) else: - logger = DatasetLogger("ray.data._internal.execution.bulk_executor").get_logger( + logger = DatasetLogger( + "ray.data._internal.execution.bulk_executor" + ).get_logger( log_to_stdout=enable_auto_log_stats, ) else: From e169caf18cf88cd9d499a967666d3ff56a7a0b55 Mon Sep 17 00:00:00 2001 From: jianoaix Date: Sat, 25 Feb 2023 00:07:17 +0000 Subject: [PATCH 16/19] unset streaming --- python/ray/data/context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/context.py b/python/ray/data/context.py index 96cd4f86d00e1..cb953f04696a1 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -76,7 +76,7 @@ # Whether to use the streaming executor. This only has an effect if the new execution # backend is enabled. DEFAULT_USE_STREAMING_EXECUTOR = bool( - int(os.environ.get("RAY_DATASET_USE_STREAMING_EXECUTOR", "1")) + int(os.environ.get("RAY_DATASET_USE_STREAMING_EXECUTOR", "0")) ) # Whether to eagerly free memory (new backend only). From 85aee0bf30b65555508d5242c0c8f22060dbfdc0 Mon Sep 17 00:00:00 2001 From: jianoaix Date: Sat, 25 Feb 2023 00:13:13 +0000 Subject: [PATCH 17/19] fix --- python/ray/data/tests/test_dataset1.py | 124 ------------------------- 1 file changed, 124 deletions(-) delete mode 100644 python/ray/data/tests/test_dataset1.py diff --git a/python/ray/data/tests/test_dataset1.py b/python/ray/data/tests/test_dataset1.py deleted file mode 100644 index 3389797b6413c..0000000000000 --- a/python/ray/data/tests/test_dataset1.py +++ /dev/null @@ -1,124 +0,0 @@ -import itertools -import math -import os -import random -import signal -import time -from unittest.mock import patch - -import numpy as np -import pandas as pd -import pyarrow as pa -import pyarrow.parquet as pq -import pytest - -import ray -from ray._private.test_utils import wait_for_condition -from ray.air.util.tensor_extensions.arrow import ArrowVariableShapedTensorType -from ray.air.util.tensor_extensions.utils import _create_possibly_ragged_ndarray -from ray.data._internal.dataset_logger import DatasetLogger -from ray.data._internal.stats import _StatsActor -from ray.data._internal.arrow_block import ArrowRow -from ray.data._internal.block_builder import BlockBuilder -from ray.data._internal.lazy_block_list import LazyBlockList -from ray.data._internal.pandas_block import PandasRow -from ray.data.aggregate import AggregateFn, Count, Max, Mean, Min, Std, Sum -from ray.data.block import BlockAccessor, BlockMetadata -from ray.data.context import DatasetContext -from ray.data.dataset import Dataset, _sliding_window -from ray.data.datasource.datasource import Datasource, ReadTask -from ray.data.datasource.csv_datasource import CSVDatasource -from ray.data.extensions.tensor_extension import ( - ArrowTensorArray, - ArrowTensorType, - ArrowVariableShapedTensorArray, - TensorArray, - TensorDtype, -) -from ray.data.row import TableRow -from ray.data.tests.conftest import * # noqa -from ray.tests.conftest import * # noqa -from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy - - -def maybe_pipeline(ds, enabled): - if enabled: - return ds.window(blocks_per_window=1) - else: - return ds - - -class SlowCSVDatasource(CSVDatasource): - def _read_stream(self, f: "pa.NativeFile", path: str, **reader_args): - for block in CSVDatasource._read_stream(self, f, path, **reader_args): - time.sleep(3) - yield block - - -@ray.remote -class Counter: - def __init__(self): - self.value = 0 - - def increment(self): - self.value += 1 - return self.value - - -class FlakyCSVDatasource(CSVDatasource): - def __init__(self): - self.counter = Counter.remote() - - def _read_stream(self, f: "pa.NativeFile", path: str, **reader_args): - count = self.counter.increment.remote() - if ray.get(count) == 1: - raise ValueError("oops") - else: - for block in CSVDatasource._read_stream(self, f, path, **reader_args): - yield block - - def _write_block(self, f: "pa.NativeFile", block: BlockAccessor, **writer_args): - count = self.counter.increment.remote() - if ray.get(count) == 1: - raise ValueError("oops") - else: - CSVDatasource._write_block(self, f, block, **writer_args) - - -def test_dataset_retry_exceptions(ray_start_regular, local_path): - df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) - path1 = os.path.join(local_path, "test1.csv") - df1.to_csv(path1, index=False, storage_options={}) - # ds1 = ray.data.read_datasource(FlakyCSVDatasource(), parallelism=1, paths=path1) - # ds1.write_datasource(FlakyCSVDatasource(), path=local_path, dataset_uuid="data") - # assert df1.equals( - # pd.read_csv(os.path.join(local_path, "data_000000.csv"), storage_options={}) - # ) - - counter = Counter.remote() - - def flaky_mapper(x): - count = counter.increment.remote() - if ray.get(count) == 1: - raise ValueError("oops") - else: - return ray.get(count) - - # assert sorted(ds1.map(flaky_mapper).take()) == [2, 3, 4] - - with pytest.raises(ValueError): - ds = ray.data.read_datasource( - FlakyCSVDatasource(), - parallelism=1, - paths=path1, - ray_remote_args={"retry_exceptions": False}, - ) - print("XXXXX ds: plan:", ds._plan) - for _ in ds.iter_batches(): - pass - - -if __name__ == "__main__": - import sys - - sys.exit(pytest.main(["-v", __file__])) From e68d1ff5a37675f0e214e0991af8d46555a9cd6b Mon Sep 17 00:00:00 2001 From: jianoaix Date: Sat, 25 Feb 2023 18:09:43 +0000 Subject: [PATCH 18/19] mark streaming CI as stable --- .buildkite/pipeline.ml.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.buildkite/pipeline.ml.yml b/.buildkite/pipeline.ml.yml index cecd15428ee57..4cdd62ea2b606 100644 --- a/.buildkite/pipeline.ml.yml +++ b/.buildkite/pipeline.ml.yml @@ -297,7 +297,7 @@ - sudo service mongodb stop - sudo apt-get purge -y mongodb* -- label: "[unstable] Dataset tests (streaming executor)" +- label: "Dataset tests (streaming executor)" conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_DATA_AFFECTED"] instance_size: medium commands: From 476aac706d82f86c355f77fa411eeabc47654a0c Mon Sep 17 00:00:00 2001 From: jianoaix Date: Mon, 27 Feb 2023 16:53:03 +0000 Subject: [PATCH 19/19] fix lint --- python/ray/data/tests/test_stats.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 8196a4f1c14b4..d9625b065162e 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -130,7 +130,8 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): * Output num rows: N min, N max, N mean, N total * Output size bytes: N min, N max, N mean, N total * Tasks per node: N min, N max, N mean; N nodes used -* Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, 'obj_store_mem_peak': N} +* Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, \ +'obj_store_mem_peak': N} """ ) else: