-
Notifications
You must be signed in to change notification settings - Fork 5.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Support generators to allow tasks to return a dynamic number of objects #28291
Changes from 1 commit
5636eb9
3457a46
031640b
e59dd65
775ff3e
b316b9f
2d0f5d4
faf64cb
5d92940
7c69b32
c1ecf26
422e4b6
62b427c
636954c
0b73319
450d428
a0bca43
6511a04
7ea1404
6e4663a
7bba5b7
c31b1c4
167c6be
44d9d85
39ba48f
805a088
3360d63
2ed252a
3a811dc
285d98d
7a86f19
b045d34
7b3cf27
92a8491
492b95e
07ca73d
c196c67
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
Signed-off-by: Stephanie Wang <[email protected]>
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -860,15 +860,10 @@ cdef execute_task( | |
worker, errors, | ||
returns) | ||
if dynamic_returns != NULL: | ||
# We generated dynamic objects during the first execution and | ||
# we are now re-executing the task during object | ||
# reconstruction. Store the error for the dynamically generated | ||
# objects too. | ||
# Store errors for any dynamically generated objects too. | ||
dynamic_errors = [] | ||
for _ in range(dynamic_returns[0].size()): | ||
dynamic_errors.append(failure_object) | ||
# We pass is_dynamic=False because we have a fixed number of | ||
# return objects to populate. | ||
core_worker.store_task_outputs( | ||
worker, dynamic_errors, | ||
dynamic_returns) | ||
|
@@ -2184,6 +2179,10 @@ cdef class CoreWorker: | |
else: | ||
# This is the first execution of the task, so we don't know how | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't necessarily mean it's the first execution of the task? It can also mean the generator is empty? Are we able to catch this case: the first execution returns an empty generator but re-execution returns a non-empty generator. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm good catch, let me check. We'll probably have to resolve this case as a follow-up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually it works since we don't reconstruct empty ObjectRefGenerators. Added a test. |
||
# many return objects it should have yet. | ||
# NOTE(swang): returns could also be empty if the task returned | ||
# an empty generator and was re-executed. However, this should | ||
# not happen because we never reconstruct empty | ||
# ObjectRefGenerators (since these are not stored in plasma). | ||
num_returns = -1 | ||
else: | ||
# The task specified how many return values it should have. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -316,6 +316,61 @@ def fetch(x): | |
# ray.get(ref) | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can generator itself be passed around as task args? Can generator_obj_ref be passed around like other obj refs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, and yes. I'll add test cases for these. |
||
def test_dynamic_empty_generator_reconstruction_nondeterministic(ray_start_cluster): | ||
config = { | ||
"num_heartbeats_timeout": 10, | ||
"raylet_heartbeat_period_milliseconds": 100, | ||
"max_direct_call_object_size": 100, | ||
"task_retry_delay_ms": 100, | ||
"object_timeout_milliseconds": 200, | ||
"fetch_warn_timeout_milliseconds": 1000, | ||
} | ||
cluster = ray_start_cluster | ||
# Head node with no resources. | ||
cluster.add_node( | ||
num_cpus=0, | ||
_system_config=config, | ||
enable_object_reconstruction=True, | ||
resources={"head": 1}, | ||
) | ||
ray.init(address=cluster.address) | ||
# Node to place the initial object. | ||
node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10 ** 8) | ||
cluster.wait_for_nodes() | ||
|
||
@ray.remote(num_cpus=0, resources={"head": 1}) | ||
class ExecutionCounter: | ||
def __init__(self): | ||
self.count = 0 | ||
|
||
def inc(self): | ||
self.count += 1 | ||
return self.count | ||
|
||
def get_count(self): | ||
return self.count | ||
|
||
@ray.remote(num_returns="dynamic") | ||
def maybe_empty_generator(exec_counter): | ||
if ray.get(exec_counter.inc.remote()) > 1: | ||
for i in range(3): | ||
yield np.ones(1_000_000, dtype=np.int8) * i | ||
|
||
@ray.remote | ||
def check(empty_generator): | ||
return len(empty_generator) == 0 | ||
|
||
exec_counter = ExecutionCounter.remote() | ||
gen = maybe_empty_generator.remote(exec_counter) | ||
assert ray.get(check.remote(gen)) | ||
cluster.remove_node(node_to_kill, allow_graceful=False) | ||
node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10 ** 8) | ||
assert ray.get(check.remote(gen)) | ||
|
||
# We should never reconstruct an empty generator. | ||
assert exec_counter.get_count.remote() == 1 | ||
|
||
|
||
if __name__ == "__main__": | ||
import os | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused. Does
dynamic_returns == NULL
mean that we are doing reconstruction? From line 815, seems it's not.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means that the caller set
num_returns="dynamic"
.