From c38a3cff2398863bf42914d9da50b949b2165de3 Mon Sep 17 00:00:00 2001 From: Jian Xiao <99709935+jianoaix@users.noreply.github.com> Date: Thu, 23 Mar 2023 13:14:43 -0700 Subject: [PATCH] =?UTF-8?q?Revert=20"[Datasets]=20Revert=20"Enable=20strea?= =?UTF-8?q?ming=20executor=20by=20default=20(#324=E2=80=A6=20(#33601)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The failure in rllib should have been fixed by https://github.com/ray-project/ray/pull/33562 Verified with `python -m pytest rllib/core/learner/torch/tests/test_torch_learner.py::TestLearner::test_end_to_end_update`. --- python/ray/air/tests/test_dataset_config.py | 12 ++++++------ python/ray/data/context.py | 2 +- python/ray/data/dataset.py | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/python/ray/air/tests/test_dataset_config.py b/python/ray/air/tests/test_dataset_config.py index 334cfbea9cba7..c00637a331de5 100644 --- a/python/ray/air/tests/test_dataset_config.py +++ b/python/ray/air/tests/test_dataset_config.py @@ -255,7 +255,7 @@ def checker(shard, results): assert "Stage 1 ReadRange->BatchMapper: 1/1 blocks executed " in stats, stats def rand(x): - x["value"] = [random.random() for _ in range(len(x))] + x["value"] = x["value"].multiply(x["value"]) return x prep = BatchMapper(rand, batch_format="pandas") @@ -355,7 +355,7 @@ def checker(shard, results): assert len(results[0]) == 5, results assert results[0] != results[1], results stats = shard.stats() - assert "RandomizeBlockOrder: 5/5 blocks executed in 0s" in stats, stats + assert "RandomizeBlockOrder: 5/5 blocks executed in" in stats, stats ds = ray.data.range_table(5) test = TestStream( @@ -378,9 +378,9 @@ def checker(shard, results): def checker(shard, results): assert len(results[0]) == 5, results - # Randomize block order for bulk ingest only executes once at the - # beginning, not once per epoch. - assert results[0] == results[1], results + # In streaming executor, the randomization in each epoch can be different, so + # we eliminate the ordering in comparison. + assert set(results[0]) == set(results[1]), results stats = shard.stats() assert "RandomizeBlockOrder: 5/5 blocks executed" in stats, stats @@ -397,7 +397,7 @@ def checker(shard, results): assert len(results[0]) == 5, results assert results[0] != results[1], results stats = shard.stats() - assert "RandomizeBlockOrder: 5/5 blocks executed in 0s" in stats, stats + assert "RandomizeBlockOrder: 5/5 blocks executed in" in stats, stats ds = ray.data.range_table(5) test = TestStream( diff --git a/python/ray/data/context.py b/python/ray/data/context.py index 6630cc02bb291..b6ac7b2db261c 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -76,7 +76,7 @@ # Whether to use the streaming executor. This only has an effect if the new execution # backend is enabled. DEFAULT_USE_STREAMING_EXECUTOR = bool( - int(os.environ.get("RAY_DATASET_USE_STREAMING_EXECUTOR", "0")) + int(os.environ.get("RAY_DATASET_USE_STREAMING_EXECUTOR", "1")) ) # Whether to eagerly free memory (new backend only). diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 4830ebdef4f42..dbd96616b138f 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2024,7 +2024,7 @@ def std( Examples: >>> import ray >>> ray.data.range(100).std() - 29.011491975882016 + 29.01149197588202 >>> ray.data.from_items([ ... (i, i**2) ... for i in range(100)]).std(lambda x: x[1])