Skip to content

Commit

Permalink
[Core] Guarantee the ordering of put ActorTaskSpecTable and ActorTable (
Browse files Browse the repository at this point in the history
ray-project#35683)

In order to guarantee that we put ActorTaskSpecTable before ActorTable, we should put ActorTable inside the ActorTaskSpecTable put callback. Otherwise, Redis may receive ActorTable put before ActorTaskSpecTable put. If we crash in the middle, we may end up with actor data inside ActorTable but not ActorTaskSpecTable.

Signed-off-by: Jiajun Yao <[email protected]>
  • Loading branch information
jjyao committed May 24, 2023
1 parent 10f2d7d commit f035000
Showing 1 changed file with 34 additions and 31 deletions.
65 changes: 34 additions & 31 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -591,38 +591,41 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ

// The backend storage is supposed to be reliable, so the status must be ok.
RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().Put(
actor_id, request.task_spec(), [](const Status &status) {}));
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor->GetActorID(),
*actor->GetMutableActorTableData(),
[this, actor](const Status &status) {
// The backend storage is supposed to be reliable, so the status must be ok.
RAY_CHECK_OK(status);
// If a creator dies before this callback is called, the actor could have been
// already destroyed. It is okay not to invoke a callback because we don't need
// to reply to the creator as it is already dead.
auto registered_actor_it = registered_actors_.find(actor->GetActorID());
if (registered_actor_it == registered_actors_.end()) {
// NOTE(sang): This logic assumes that the ordering of backend call is
// guaranteed. It is currently true because we use a single TCP socket to call
// the default Redis backend. If ordering is not guaranteed, we should overwrite
// the actor state to DEAD to avoid race condition.
return;
}
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor->GetActorID(), actor->GetActorTableData(), nullptr));
// Invoke all callbacks for all registration requests of this actor (duplicated
// requests are included) and remove all of them from
// actor_to_register_callbacks_.
// Reply to the owner to indicate that the actor has been registered.
auto iter = actor_to_register_callbacks_.find(actor->GetActorID());
RAY_CHECK(iter != actor_to_register_callbacks_.end() && !iter->second.empty());
auto callbacks = std::move(iter->second);
actor_to_register_callbacks_.erase(iter);
for (auto &callback : callbacks) {
callback(actor);
}
actor_id, request.task_spec(), [this, actor](const Status &status) {
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor->GetActorID(),
*actor->GetMutableActorTableData(),
[this, actor](const Status &status) {
// The backend storage is supposed to be reliable, so the status must be ok.
RAY_CHECK_OK(status);
// If a creator dies before this callback is called, the actor could have
// been already destroyed. It is okay not to invoke a callback because we
// don't need to reply to the creator as it is already dead.
auto registered_actor_it = registered_actors_.find(actor->GetActorID());
if (registered_actor_it == registered_actors_.end()) {
// NOTE(sang): This logic assumes that the ordering of backend call is
// guaranteed. It is currently true because we use a single TCP socket to
// call the default Redis backend. If ordering is not guaranteed, we
// should overwrite the actor state to DEAD to avoid race condition.
return;
}
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor->GetActorID(), actor->GetActorTableData(), nullptr));
// Invoke all callbacks for all registration requests of this actor
// (duplicated requests are included) and remove all of them from
// actor_to_register_callbacks_.
// Reply to the owner to indicate that the actor has been registered.
auto iter = actor_to_register_callbacks_.find(actor->GetActorID());
RAY_CHECK(iter != actor_to_register_callbacks_.end() &&
!iter->second.empty());
auto callbacks = std::move(iter->second);
actor_to_register_callbacks_.erase(iter);
for (auto &callback : callbacks) {
callback(actor);
}
}));
}));

return Status::OK();
}

Expand Down

0 comments on commit f035000

Please sign in to comment.