Skip to content

Commit

Permalink
Revert "[Datasets] Revert "Enable streaming executor by default (#324… (
Browse files Browse the repository at this point in the history
#33601)

The failure in rllib should have been fixed by #33562

Verified with `python -m pytest rllib/core/learner/torch/tests/test_torch_learner.py::TestLearner::test_end_to_end_update`.
  • Loading branch information
jianoaix committed Mar 23, 2023
1 parent 42bb035 commit c38a3cf
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
12 changes: 6 additions & 6 deletions python/ray/air/tests/test_dataset_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def checker(shard, results):
assert "Stage 1 ReadRange->BatchMapper: 1/1 blocks executed " in stats, stats

def rand(x):
x["value"] = [random.random() for _ in range(len(x))]
x["value"] = x["value"].multiply(x["value"])
return x

prep = BatchMapper(rand, batch_format="pandas")
Expand Down Expand Up @@ -355,7 +355,7 @@ def checker(shard, results):
assert len(results[0]) == 5, results
assert results[0] != results[1], results
stats = shard.stats()
assert "RandomizeBlockOrder: 5/5 blocks executed in 0s" in stats, stats
assert "RandomizeBlockOrder: 5/5 blocks executed in" in stats, stats

ds = ray.data.range_table(5)
test = TestStream(
Expand All @@ -378,9 +378,9 @@ def checker(shard, results):

def checker(shard, results):
assert len(results[0]) == 5, results
# Randomize block order for bulk ingest only executes once at the
# beginning, not once per epoch.
assert results[0] == results[1], results
# In streaming executor, the randomization in each epoch can be different, so
# we eliminate the ordering in comparison.
assert set(results[0]) == set(results[1]), results
stats = shard.stats()
assert "RandomizeBlockOrder: 5/5 blocks executed" in stats, stats

Expand All @@ -397,7 +397,7 @@ def checker(shard, results):
assert len(results[0]) == 5, results
assert results[0] != results[1], results
stats = shard.stats()
assert "RandomizeBlockOrder: 5/5 blocks executed in 0s" in stats, stats
assert "RandomizeBlockOrder: 5/5 blocks executed in" in stats, stats

ds = ray.data.range_table(5)
test = TestStream(
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
# Whether to use the streaming executor. This only has an effect if the new execution
# backend is enabled.
DEFAULT_USE_STREAMING_EXECUTOR = bool(
int(os.environ.get("RAY_DATASET_USE_STREAMING_EXECUTOR", "0"))
int(os.environ.get("RAY_DATASET_USE_STREAMING_EXECUTOR", "1"))
)

# Whether to eagerly free memory (new backend only).
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2024,7 +2024,7 @@ def std(
Examples:
>>> import ray
>>> ray.data.range(100).std()
29.011491975882016
29.01149197588202
>>> ray.data.from_items([
... (i, i**2)
... for i in range(100)]).std(lambda x: x[1])
Expand Down

0 comments on commit c38a3cf

Please sign in to comment.