Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support generators for tasks with multiple return values #25247

Merged
merged 9 commits into from
Jun 1, 2022

Conversation

stephanie-wang
Copy link
Contributor

@stephanie-wang stephanie-wang commented May 27, 2022

Why are these changes needed?

Adds support for Python generators instead of just normal return functions when a task has multiple return values. This will allow developers to cut down on total memory usage for tasks, as they can free previous return values before allocating the next one on the heap.

The semantics for num_returns are about the same as usual tasks - the function will throw an error if the number of values returned by the generator does not match the number of return values specified by the user. The one difference is that if num_returns=1, the task will throw the usual Python exception that the generator cannot be pickled.

As an example, this feature will allow us to reduce memory usage in Datasets shuffle operations (see #25200 for a prototype).

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@stephanie-wang stephanie-wang added the core-interface-change-approval-required This changes the Ray core behavior / API and requires broader approvals. label May 27, 2022
@stephanie-wang
Copy link
Contributor Author

There isn't really an API change since the caller and reader of the task will still be written the same way. However, this is adding support for a new set of programs.

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Could we also add a section in the docs on task returns covering this?

Also, should we document / test it for actor method calls too?

@@ -2051,6 +2062,12 @@ cdef class CoreWorker:
contained_id, &task_output_inlined_bytes,
&returns[0][i])

i += 1
if i < n_returns:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great error messages

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 27, 2022
for i in range(3):
yield i

# NOTE: Similar to normal functions, these objects will not be available
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of a footgun and won't be what people will be expecting given the normal behavior of generators. Not a problem for this PR, but to do yield properly, on the receiving side it should return an iterator over ObjectRefs of the result and produce each element as soon as it has been generated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point! For now, I think we should use the same semantics as normal tasks with multiple returns, especially since the ObjectRefs returned to the caller are a list, not a generator. But if we decide to support the generator interface in the future, I do think we should consider providing the iterator semantics instead.

@stephanie-wang stephanie-wang merged commit 1f94887 into ray-project:master Jun 1, 2022
@stephanie-wang stephanie-wang deleted the static-generators branch June 1, 2022 20:30
fishbone added a commit that referenced this pull request Jun 1, 2022
stephanie-wang pushed a commit that referenced this pull request Jun 1, 2022
stephanie-wang added a commit to stephanie-wang/ray that referenced this pull request Jun 1, 2022
stephanie-wang added a commit that referenced this pull request Jun 2, 2022
…urn values (#25247)" (#25380)" (#25383)

Duplicate for #25247.

Adds a fix for Dask-on-Ray. Previously, for tasks with multiple return values, we implicitly allowed returning a dict with the return index as the key. This was used by Dask-on-Ray, but this is not documented behavior, and we now require task returns to be iterable instead.
ericl added a commit that referenced this pull request Jun 17, 2022
This uses the generators introduced in #25247 to reduce memory usage during the merge stage in push-based shuffle. These tasks merge groups of map outputs, so it fits a generator pattern where we want to return merged outputs one at a time. Verified that this allows for merging more/larger objects at a time than the current list-based version.

I also tried this for the map stage in random_shuffle, but it didn't seem to make a difference in memory usage for Arrow blocks. I think this is probably because Arrow is already doing some zero-copy optimizations when selecting rows?

Also adds a new line to Dataset stats for memory usage. Unfortunately it's hard to get an accurate reading of physical memory usage in Python and this value will probably be an overestimate in a lot of cases. I didn't see a difference before and after this PR for the merge stage, for example. Arguably this field should be opt-in. For 100MB partitions, for example:
```
        Substage 0 read->random_shuffle_map: 10/10 blocks executed
        * Remote wall time: 1.44s min, 3.32s max, 2.57s mean, 25.74s total
        * Remote cpu time: 1.42s min, 2.53s max, 2.03s mean, 20.25s total
        * Worker memory usage (MB): 462 min, 864 max, 552 mean
        * Output num rows: 12500000 min, 12500000 max, 12500000 mean, 125000000 total
        * Output size bytes: 101562500 min, 101562500 max, 101562500 mean, 1015625000 total
        * Tasks per node: 10 min, 10 max, 10 mean; 1 nodes used

        Substage 1 random_shuffle_reduce: 10/10 blocks executed
        * Remote wall time: 1.47s min, 2.94s max, 2.17s mean, 21.69s total
        * Remote cpu time: 1.45s min, 1.88s max, 1.71s mean, 17.09s total
        * Worker memory usage (MB): 462 min, 1047 max, 831 mean
        * Output num rows: 12500000 min, 12500000 max, 12500000 mean, 125000000 total
        * Output size bytes: 101562500 min, 101562500 max, 101562500 mean, 1015625000 total
        * Tasks per node: 10 min, 10 max, 10 mean; 1 nodes used
```


## Checks

- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for https://docs.ray.io/en/master/.
- [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Co-authored-by: Eric Liang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. core-interface-change-approval-required This changes the Ray core behavior / API and requires broader approvals.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants