[dataset] Pipeline task submission during reduce stage in push-based shuffle #25795
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Why are these changes needed?
Reduce stage in push-based shuffle fails to complete at 100k output partitions or more. This is likely because of driver or raylet load from having too many tasks in flight at once.
We can fix this from ray core too, but for now, this PR adds pipelining for the reduce stage, to limit the total number of reduce tasks in flight at the same time. This is currently set to 2 * available parallelism in the cluster. We have to pick which reduce tasks to submit carefully since these are pinned to specific nodes. The PR does this by assigning tasks round-robin according to the corresponding merge task (which get spread throughout the cluster).
In addition, this PR refactors the map, merge, and reduce stages to use a common pipelined iterator pattern, since they all have a similar pattern of submitting a round of tasks at a time, then waiting for a previous round to finish before submitting more.
Related issue number
Closes #25412.
Checks
scripts/format.sh
to lint the changes in this PR.Adds a multinode test, but it would be nice to add a test that hooks into ray.remote and ray.get to check that we are actually submitting the right tasks at the right time.