-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Periodically GC metadata for streaming generators #43772
[core] Periodically GC metadata for streaming generators #43772
Conversation
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
src/ray/core_worker/task_manager.cc
Outdated
@@ -238,7 +238,7 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask( | |||
// The language frontend is responsible for calling DeleteObjectRefStream. | |||
if (spec.IsStreamingGenerator()) { | |||
const auto generator_id = spec.ReturnId(0); | |||
RAY_LOG(DEBUG) << "Create an object ref stream of an id " << generator_id; | |||
RAY_LOG(INFO) << "Create an object ref stream of an id " << generator_id; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this debug -> info intended? we should revert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it has been reverted.
src/ray/core_worker/task_manager.h
Outdated
@@ -410,7 +423,9 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa | |||
ABSL_LOCKS_EXCLUDED(mu_); | |||
|
|||
/// Return True if there's no more object to read. False otherwise. | |||
bool IsFinished(const ObjectID &generator_id) const ABSL_LOCKS_EXCLUDED(mu_); | |||
bool StreamingGeneratorIsFinished(const ObjectID &generator_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add docstring for num_objects_generated?
src/ray/core_worker/task_manager.cc
Outdated
for (const auto &return_id_info : reply.streaming_generator_return_ids()) { | ||
if (return_id_info.is_plasma_object()) { | ||
// NOTE(swang): It is possible that the dynamically returned refs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a separate issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Existing issue that I added a fix for in this PR.
src/ray/core_worker/core_worker.cc
Outdated
continue; | ||
} | ||
|
||
bool can_gc = reference_counter_->CheckGeneratorRefsOutOfScope(generator_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove all unconsumed objects here (and add a test)? This seems like a regression (where unconsumed objects are not deleted if lineage is alive).
E.g., when delete hapapens, all unconsumed objects are deleted (same behavior as now). And only the stream metadata is cleaned up after a delay.
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
assert_no_leak() | ||
|
||
# Test that when the generator task stays in the in-scope lineage, we still | ||
# clean up the unconsumed objects' values. The lineage (task and stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice tests!
@@ -641,6 +641,11 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ | |||
RayConfig::instance().metrics_report_interval_ms() / 2, | |||
"CoreWorker.RecordMetrics"); | |||
|
|||
periodical_runner_.RunFnPeriodically( | |||
[this] { TryDeleteObjectRefStreams(); }, | |||
RayConfig::instance().local_gc_min_interval_s() * 1000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the wrong value? (it is already a second, but you multiply 1000).
Besides, do you think 10 seconds is good enough? Feel like it needs to be a little more frequent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's to convert to ms?
Should be fine because we trigger it once immediately now.
@@ -57,6 +57,38 @@ absl::flat_hash_set<ObjectID> ObjectRefStream::GetItemsUnconsumed() const { | |||
return result; | |||
} | |||
|
|||
std::vector<ObjectID> ObjectRefStream::PopUnconsumedItems() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it same as GetItemsUnconsumed? If so can we remove other APi?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method pops the items, not just get.
Signed-off-by: Stephanie Wang <[email protected]>
…#43772) Fix lineage reconstruction bug for streaming generators by only garbage-collecting stream metadata once the refs' lineage have gone out of scope. Changes: When generator goes out of scope, add it to a list of streaming generator tasks that we scan periodically For each generator task, check if the task and streaming metadata can be removed. It can be removed if the generator task and all generated return refs have gone out of scope. Fixes an existing potential leak where task completes after the generator ref and returned refs have gone out of scope, by deleting the task metadata with the stream metadata. ray-project#43584 is probably better long-term as it refactors stream metadata to be GCed through the normal ref counting path, and lineage can be GCed eagerly. However, this fix is safer. Related issue number Closes ray-project#39151. --------- Signed-off-by: Stephanie Wang <[email protected]>
Why are these changes needed?
Less invasive fix than #43584. Changes:
#43584 is probably better long-term as it refactors stream metadata to be GCed through the normal ref counting path, and lineage can be GCed eagerly. However, this fix is safer.
Related issue number
Closes #39151.
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.