Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add objects GC in dataset iterator #34030

Merged
merged 16 commits into from
Apr 6, 2023

Conversation

jianoaix
Copy link
Contributor

@jianoaix jianoaix commented Apr 3, 2023

Why are these changes needed?

Fix: #33846

The DatasetIterator doesn't eagerly GC objects, which resulted in OOM of consumer nodes. The new consumer nodes that got brought up were not in sync with other healthy consumer nodes. The DatasetPipeline requires all consumers to read windows in sync, so this caused the pipeline to hang and then fail with timeout.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Copy link
Contributor Author

@jianoaix jianoaix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -24,6 +35,8 @@ def test_iter_batches_no_spilling_upon_no_transformation(shutdown_only):

check_no_spill(ctx, ds.repeat())
check_no_spill(ctx, ds.window(blocks_per_window=20))
check_to_torch_no_spill(ctx, ds.repeat())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will fail (i.e. have spilling) if without this PR.

@jianoaix jianoaix changed the title [WIP] Add objects GC in dataset iterator Add objects GC in dataset iterator Apr 4, 2023
@ericl
Copy link
Contributor

ericl commented Apr 4, 2023

Nice find!

@jianoaix jianoaix assigned ericl and c21 Apr 4, 2023
@@ -30,7 +30,7 @@ def _to_block_iterator(
ds = self._base_dataset
block_iterator, stats, executor = ds._plan.execute_to_iterator()
ds._current_executor = executor
return block_iterator, stats
return block_iterator, stats, False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we update the type hint at line 28 as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!

Comment on lines +38 to +45
if epoch_pipeline._first_dataset is not None:
blocks_owned_by_consumer = (
epoch_pipeline._first_dataset._plan.execute()._owned_by_consumer
)
else:
blocks_owned_by_consumer = (
epoch_pipeline._peek()._plan.execute()._owned_by_consumer
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add a comment in code for why we need to do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Apr 5, 2023
Copy link
Contributor Author

@jianoaix jianoaix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pipelined_ingestion_1500_gb has been consistently passing: https://buildkite.com/ray-project/release-tests-pr/builds?branch=jianoaix%3Aiteratorgcblocks
Will wait the CI to pass and then merge.

@@ -30,7 +30,7 @@ def _to_block_iterator(
ds = self._base_dataset
block_iterator, stats, executor = ds._plan.execute_to_iterator()
ds._current_executor = executor
return block_iterator, stats
return block_iterator, stats, False
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!

Comment on lines +38 to +45
if epoch_pipeline._first_dataset is not None:
blocks_owned_by_consumer = (
epoch_pipeline._first_dataset._plan.execute()._owned_by_consumer
)
else:
blocks_owned_by_consumer = (
epoch_pipeline._peek()._plan.execute()._owned_by_consumer
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@jianoaix
Copy link
Contributor Author

jianoaix commented Apr 6, 2023

There is failure in python/ray/data/tests/test_dataset_consumption.py:: test_dataset_lineage_serialization_unsupported, but it's not relevant here.

@jianoaix jianoaix merged commit 1999c9d into ray-project:master Apr 6, 2023
jianoaix added a commit to jianoaix/ray that referenced this pull request Apr 6, 2023
* Revert "[Datasets] Revert "Enable streaming executor by default (ray-project#32493)" (ray-project#33485)"

This reverts commit 5c79954.

* Add objects GC in dataset iterator

* test it

* more tests

* fix comment

* add a little more memory as it's close to the limit and may make test flaky

* feedback
jianoaix added a commit that referenced this pull request Apr 6, 2023
* Revert "[Datasets] Revert "Enable streaming executor by default (#32493)" (#33485)"

This reverts commit 5c79954.

* Add objects GC in dataset iterator

* test it

* more tests

* fix comment

* add a little more memory as it's close to the limit and may make test flaky

* feedback
ArturNiederfahrenhorst pushed a commit to ArturNiederfahrenhorst/ray that referenced this pull request Apr 10, 2023
* Revert "[Datasets] Revert "Enable streaming executor by default (ray-project#32493)" (ray-project#33485)"

This reverts commit 5c79954.

* Add objects GC in dataset iterator

* test it

* more tests

* fix comment

* add a little more memory as it's close to the limit and may make test flaky

* feedback
elliottower pushed a commit to elliottower/ray that referenced this pull request Apr 22, 2023
* Revert "[Datasets] Revert "Enable streaming executor by default (ray-project#32493)" (ray-project#33485)"

This reverts commit 5c79954.

* Add objects GC in dataset iterator

* test it

* more tests

* fix comment

* add a little more memory as it's close to the limit and may make test flaky

* feedback

Signed-off-by: elliottower <[email protected]>
ProjectsByJackHe pushed a commit to ProjectsByJackHe/ray that referenced this pull request May 4, 2023
* Revert "[Datasets] Revert "Enable streaming executor by default (ray-project#32493)" (ray-project#33485)"

This reverts commit 5c79954.

* Add objects GC in dataset iterator

* test it

* more tests

* fix comment

* add a little more memory as it's close to the limit and may make test flaky

* feedback

Signed-off-by: Jack He <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[data] release test failure : pipelined_ingestion_1500_gb
3 participants