Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[datasets] Fix scheduling strategy propagation and stats collection in push-based shuffle #25108

Merged

Conversation

stephanie-wang
Copy link
Contributor

Why are these changes needed?

This fixes two bugs in Datasets push-based shuffle:

  1. Scheduling strategy specified by the caller was not getting propagated correctly to the map stage in push-based shuffle. This is because the map and reduce stages shared the same ray.remote options dict, and we deleted the caller-specified scheduling strategy from the reduce stage so that we could specify a NodeAffinitySchedulingStrategy instead.
  2. We were only reporting partial stats for the merge stage.

Related issue number

Issue 1 is necessary for performance at large-scale (#24480).

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@@ -138,6 +138,7 @@ def execute(
# The placement strategy for reduce tasks is overwritten to colocate
# them with their inputs from the merge stage, so remove any
# pre-specified scheduling strategy here.
reduce_ray_remote_args = reduce_ray_remote_args.copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch.

# Check all merge tasks are included in stats.
internal_stats = ds._plan.stats()
num_merge_tasks = len(internal_stats.stages["random_shuffle_merge"])
assert parallelism // 3 <= num_merge_tasks <= parallelism // 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is 2 the merge factor? maybe rename it to a constant for easier understanding.

@stephanie-wang stephanie-wang merged commit c876538 into ray-project:master May 25, 2022
@stephanie-wang stephanie-wang deleted the push-based-shuffle-bugs branch May 25, 2022 01:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants