Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support generators to allow tasks to return a dynamic number of objects #28291

Merged
merged 37 commits into from
Sep 21, 2022

Conversation

stephanie-wang
Copy link
Contributor

@stephanie-wang stephanie-wang commented Sep 6, 2022

Why are these changes needed?

This adds support for tasks that need to return a dynamic number of objects. When a remote generator function is invoked and num_returns for the task is 1, the worker will dynamically allocate ray.put IDs for these objects and store an ObjectRefGenerator as its return value. This allows the worker to choose how many objects to return and to keep heap memory low, since it does not need to keep all objects in memory simultaneously.

Unlike normal ray.put(), we assign the task caller as the owner of the object. This is to improve fault tolerance, as the owner can recover dynamically generated objects through the normal lineage reconstruction codepath.

The main complication has to do with notifying the task caller that it owns these objects. We do this in two places, which is necessary because the protocols are asynchronous, so either message can arrive first.

  1. When the task reply is received.
  2. When the primary raylet subscribes to the eviction notice from the owner.
    To register the dynamic return, the owner adds the ObjectRef to the ref counter and marks that it is contained in the generator object.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

TODO:

  • docs

Signed-off-by: Stephanie Wang <[email protected]>
- nondeterministic recovery test
- ref counting bug?

Signed-off-by: Stephanie Wang <[email protected]>
@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Sep 6, 2022
@stephanie-wang
Copy link
Contributor Author

Still have some TODOs, but the main changes are ready for review.

Signed-off-by: Stephanie Wang <[email protected]>
Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet, general approach LGTM but I'll defer to others for a more thorough review.

In addition to the object eviction subscription, is there a potential race between the pinned object location update and the push task reply for the dynamic return objects? IIRC both of those RPCs are async with no synchronization barrier in-between. Maybe this is a non-issue because we always add the pinned object location when processing the push task reply?

// NOTE(swang): We need to add the location of the object before marking
// it as local in the in-memory store so that the data locality policy
// will choose the right raylet for any queued dependent tasks.
reference_counter_->UpdateObjectPinnedAtRaylet(object_id, worker_raylet_id);

src/ray/protobuf/node_manager.proto Outdated Show resolved Hide resolved
src/ray/protobuf/pubsub.proto Outdated Show resolved Hide resolved
// eviction events before we know about the object. This can happen when we
// receive the subscription request before the reply from the task that
// created the object. Add the dynamically created object to our ref
// counter so that we know that it exists.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I was wondering if this race was going to be covered.

for (const auto &return_id : return_ids) {
RAY_LOG(DEBUG) << "Task " << task_spec.TaskId() << " will return object "
<< return_id;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This return ID iteration + debug logging could be moved to the loop directly above this one.

@@ -343,6 +343,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \return Status.
Status SealExisting(const ObjectID &object_id,
bool pin_object,
const ObjectID &generator_id = ObjectID::Nil(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generator_id should be added to the docstring for this method and SealReturnObject, PinExistingReturnObject.

@stephanie-wang
Copy link
Contributor Author

Sweet, general approach LGTM but I'll defer to others for a more thorough review.

In addition to the object eviction subscription, is there a potential race between the pinned object location update and the push task reply for the dynamic return objects? IIRC both of those RPCs are async with no synchronization barrier in-between. Maybe this is a non-issue because we always add the pinned object location when processing the push task reply?

// NOTE(swang): We need to add the location of the object before marking
// it as local in the in-memory store so that the data locality policy
// will choose the right raylet for any queued dependent tasks.
reference_counter_->UpdateObjectPinnedAtRaylet(object_id, worker_raylet_id);

Hmm I think this part is okay because of what you said: it all happens locally at the task caller when processing the push task reply.

I think there may be a race condition for object directory location updates, though, let me look into this.

stephanie-wang and others added 5 commits September 6, 2022 15:39
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
@stephanie-wang
Copy link
Contributor Author

Sweet, general approach LGTM but I'll defer to others for a more thorough review.
In addition to the object eviction subscription, is there a potential race between the pinned object location update and the push task reply for the dynamic return objects? IIRC both of those RPCs are async with no synchronization barrier in-between. Maybe this is a non-issue because we always add the pinned object location when processing the push task reply?

// NOTE(swang): We need to add the location of the object before marking
// it as local in the in-memory store so that the data locality policy
// will choose the right raylet for any queued dependent tasks.
reference_counter_->UpdateObjectPinnedAtRaylet(object_id, worker_raylet_id);

Hmm I think this part is okay because of what you said: it all happens locally at the task caller when processing the push task reply.

I think there may be a race condition for object directory location updates, though, let me look into this.

Okay, I believe I've resolved this issue by also attaching the generator ID to spill location updates. I don't think it's necessary for in-memory locations (see added comments).

@stephanie-wang stephanie-wang removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Sep 7, 2022
Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tried this, very cool! A couple API comments:

  • When you specify num_returns, it seems that you can also yield the exact same number of values, but it is not a generator return. This seems confusing, should we disallow mixing num_returns and generators?
  • Should we define __len__ on the generator return object? It seems not unreasonable to include this, and even if we decide in the future to support streaming generators, we could just raise an error trying to get the length in that case.

python/ray/_raylet.pyx Outdated Show resolved Hide resolved
python/ray/tests/test_generators.py Outdated Show resolved Hide resolved
python/ray/_raylet.pyx Outdated Show resolved Hide resolved
@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Sep 8, 2022
Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add docs to this I think.

x
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
x
Signed-off-by: Stephanie Wang <[email protected]>
x
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
@jjyao
Copy link
Collaborator

jjyao commented Sep 19, 2022

Ping me when it's ready for more reviews :)

@stephanie-wang
Copy link
Contributor Author

Ping me when it's ready for more reviews :)

I think all the comments were addressed already, trying to fix CI now.

@stephanie-wang stephanie-wang added tests-ok The tagger certifies test failures are unrelated and assumes personal liability. core-interface-change-approval-required This changes the Ray core behavior / API and requires broader approvals. and removed @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. labels Sep 19, 2022
Copy link
Contributor

@pcmoritz pcmoritz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM! Before merging, let's please mark the num_returns="dynamic" API as experimental.

Before we declare it stable, in particular I'm curious why the num_returns="dynamic" option doesn't return a generator of ObjectRefs (from the API perspective that seems more natural for me). Are there implementation limitations for this?

Copy link
Collaborator

@jjyao jjyao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My last few comments :)

python/ray/_private/worker.py Outdated Show resolved Hide resolved
python/ray/_private/worker.py Outdated Show resolved Hide resolved
python/ray/_raylet.pyx Outdated Show resolved Hide resolved
python/ray/_raylet.pyx Outdated Show resolved Hide resolved
# number of objects as before.
num_returns = returns[0].size()
else:
# This is the first execution of the task, so we don't know how
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't necessarily mean it's the first execution of the task? It can also mean the generator is empty?

Are we able to catch this case: the first execution returns an empty generator but re-execution returns a non-empty generator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm good catch, let me check. We'll probably have to resolve this case as a follow-up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it works since we don't reconstruct empty ObjectRefGenerators. Added a test.

raise ValueError(
"Task returned more than num_returns={} objects.".format(
n_returns))
num_returns))
while i >= returns[0].size():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this? Is it possible that this loop will be executed more than once?

/// \param[in] owner_address Address of the owner of the object who will be contacted by
/// the raylet if the object is pinned. If not provided, defaults to this worker.
/// \return Status.
Status SealExisting(const ObjectID &object_id,
bool pin_object,
const ObjectID &generator_id = ObjectID::Nil(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we decide when to use which style? What's the guideline we should follow in the future?

@stephanie-wang
Copy link
Contributor Author

stephanie-wang commented Sep 21, 2022

Thanks, LGTM! Before merging, let's please mark the num_returns="dynamic" API as experimental.

Before we declare it stable, in particular I'm curious why the num_returns="dynamic" option doesn't return a generator of ObjectRefs (from the API perspective that seems more natural for me). Are there implementation limitations for this?

The main reason right now was to avoid complicating ray.wait. Here are the pros/cons for returning a generator directly:

pros:

  • more consistent with the usual semantics of num_returns=x (directly return a generator of refs instead of an objectref containing a generator of refs). this would also make it simpler to swap between code that's using a static vs dynamic num_returns, since the caller code doesn't need to insert an extra ray.get
  • more explicit that the function is returning a generator instead of a normal value

cons:

  • accessing the generator for the first time will implicitly block until we know how many refs to return. might be a gotcha for users.
  • need to extend ray.wait to support ObjectRefGenerators in addition to ObjectRefs, whereas before you could just wait on the returned ObjectRef. i don't think other APIs are affected

Signed-off-by: Stephanie Wang <[email protected]>
x
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
@stephanie-wang stephanie-wang merged commit 45d7cd2 into ray-project:master Sep 21, 2022
@stephanie-wang
Copy link
Contributor Author

All new tests passing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core-interface-change-approval-required This changes the Ray core behavior / API and requires broader approvals. tests-ok The tagger certifies test failures are unrelated and assumes personal liability.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants