Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][c++ worker]Add namespace support for c++ worker #26327

Merged
merged 8 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add getActor(name, namespace) api
  • Loading branch information
WangTaoTheTonic committed Jul 7, 2022
commit 9e4ca3b7f41d247a673fc0b69a208c2328466426
20 changes: 19 additions & 1 deletion cpp/include/ray/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,18 @@ ray::internal::ActorCreator<JavaActorClass> Actor(JavaActorClass func);
template <typename T>
boost::optional<ActorHandle<T>> GetActor(const std::string &actor_name);

/// Get a handle to a named actor in the given namespace.
/// Gets a handle to a named actor with the given name of the given namespace. The actor
/// must have been created with name specified.
WangTaoTheTonic marked this conversation as resolved.
Show resolved Hide resolved
///
/// \param[in] actor_name The name of the named actor.
/// \param[in] namespace The namespace of the actor.
/// \return An ActorHandle to the actor if the actor of specified name exists in
/// specifiled namespace or an empty optional object.
template <typename T>
boost::optional<ActorHandle<T>> GetActor(const std::string &actor_name,
const std::string &ray_namespace);

/// Intentionally exit the current actor.
/// It is used to disconnect an actor and exit the worker.
/// \Throws RayException if the current process is a driver or the current worker is not
Expand Down Expand Up @@ -271,11 +283,17 @@ inline ray::internal::ActorCreator<F> Actor(F create_func) {

template <typename T>
boost::optional<ActorHandle<T>> GetActor(const std::string &actor_name) {
return GetActor<T>(actor_name, "");
}

template <typename T>
boost::optional<ActorHandle<T>> GetActor(const std::string &actor_name,
const std::string &ray_namespace) {
if (actor_name.empty()) {
return {};
}

auto actor_id = ray::internal::GetRayRuntime()->GetActorId(actor_name);
auto actor_id = ray::internal::GetRayRuntime()->GetActorId(actor_name, ray_namespace);
if (actor_id.empty()) {
return {};
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/include/ray/api/ray_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class RayRuntime {
const CallOptions &call_options) = 0;
virtual void AddLocalReference(const std::string &id) = 0;
virtual void RemoveLocalReference(const std::string &id) = 0;
virtual std::string GetActorId(const std::string &actor_name) = 0;
virtual std::string GetActorId(const std::string &actor_name,
const std::string &ray_namespace) = 0;
virtual void KillActor(const std::string &str_actor_id, bool no_restart) = 0;
virtual void ExitActor() = 0;
virtual ray::PlacementGroup CreatePlacementGroup(
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/ray/runtime/abstract_ray_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,9 @@ void AbstractRayRuntime::RemoveLocalReference(const std::string &id) {
}
}

std::string AbstractRayRuntime::GetActorId(const std::string &actor_name) {
auto actor_id = task_submitter_->GetActor(actor_name);
std::string AbstractRayRuntime::GetActorId(const std::string &actor_name,
const std::string &ray_namespace) {
auto actor_id = task_submitter_->GetActor(actor_name, ray_namespace);
if (actor_id.IsNil()) {
return "";
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/abstract_ray_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class AbstractRayRuntime : public RayRuntime {

void RemoveLocalReference(const std::string &id);

std::string GetActorId(const std::string &actor_name);
std::string GetActorId(const std::string &actor_name, const std::string &ray_namespace);

void KillActor(const std::string &str_actor_id, bool no_restart);

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/ray/runtime/task/local_mode_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ ObjectID LocalModeTaskSubmitter::SubmitActorTask(InvocationSpec &invocation,
return Submit(invocation, {});
}

ActorID LocalModeTaskSubmitter::GetActor(const std::string &actor_name) const {
ActorID LocalModeTaskSubmitter::GetActor(const std::string &actor_name,
const std::string &ray_namespace) const {
absl::MutexLock lock(&named_actors_mutex_);
auto it = named_actors_.find(actor_name);
if (it == named_actors_.end()) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/task/local_mode_task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class LocalModeTaskSubmitter : public TaskSubmitter {

ObjectID SubmitActorTask(InvocationSpec &invocation, const CallOptions &call_options);

ActorID GetActor(const std::string &actor_name) const;
ActorID GetActor(const std::string &actor_name, const std::string &ray_namespace) const;

ray::PlacementGroup CreatePlacementGroup(
const ray::PlacementGroupCreationOptions &create_options);
Expand Down
7 changes: 5 additions & 2 deletions cpp/src/ray/runtime/task/native_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,12 @@ ObjectID NativeTaskSubmitter::SubmitActorTask(InvocationSpec &invocation,
return Submit(invocation, task_options);
}

ActorID NativeTaskSubmitter::GetActor(const std::string &actor_name) const {
ActorID NativeTaskSubmitter::GetActor(const std::string &actor_name,
const std::string &ray_namespace) const {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
auto pair = core_worker.GetNamedActorHandle(actor_name, "");
const std::string ns =
ray_namespace.empty() ? core_worker.GetJobConfig().ray_namespace() : ray_namespace;
auto pair = core_worker.GetNamedActorHandle(actor_name, ns);
if (!pair.second.ok()) {
RAY_LOG(WARNING) << pair.second.message();
return ActorID::Nil();
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/task/native_task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class NativeTaskSubmitter : public TaskSubmitter {

ObjectID SubmitActorTask(InvocationSpec &invocation, const CallOptions &call_options);

ActorID GetActor(const std::string &actor_name) const;
ActorID GetActor(const std::string &actor_name, const std::string &ray_namespace) const;

ray::PlacementGroup CreatePlacementGroup(
const ray::PlacementGroupCreationOptions &create_options);
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/ray/runtime/task/task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class TaskSubmitter {
virtual ObjectID SubmitActorTask(InvocationSpec &invocation,
const CallOptions &call_options) = 0;

virtual ActorID GetActor(const std::string &actor_name) const = 0;
virtual ActorID GetActor(const std::string &actor_name,
const std::string &ray_namespace) const = 0;

virtual ray::PlacementGroup CreatePlacementGroup(
const ray::PlacementGroupCreationOptions &create_options) = 0;
Expand Down
14 changes: 11 additions & 3 deletions cpp/src/ray/test/cluster/cluster_mode_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -480,14 +480,22 @@ TEST(RayClusterModeTest, TaskWithPlacementGroup) {
}

TEST(RayClusterModeTest, NamespaceTest) {
// Create a named actor in namespace `isolated_ns`.
std::string actor_name = "named_actor";
ray::ActorHandle<Counter> actor = ray::Actor(RAY_FUNC(Counter::FactoryCreate))
.SetName(actor_name, "isolated_ns")
.Remote();
std::string ns_name = "isolated_ns";
ray::ActorHandle<Counter> actor =
ray::Actor(RAY_FUNC(Counter::FactoryCreate)).SetName(actor_name, ns_name).Remote();
auto initialized_obj = actor.Task(&Counter::Initialized).Remote();
EXPECT_TRUE(*initialized_obj.Get());
// It is invisible to job default namespace.
auto actor_optional = ray::GetActor<Counter>(actor_name);
EXPECT_TRUE(!actor_optional);
// It is visible to the namespace it belongs.
actor_optional = ray::GetActor<Counter>(actor_name, ns_name);
EXPECT_TRUE(actor_optional);
// It is invisible to any other namespaces.
actor_optional = ray::GetActor<Counter>(actor_name, "other_ns");
EXPECT_TRUE(!actor_optional);
}

int main(int argc, char **argv) {
Expand Down