diff --git a/cpp/include/ray/api.h b/cpp/include/ray/api.h index e7ac0ffad792f..9261563c1fa90 100644 --- a/cpp/include/ray/api.h +++ b/cpp/include/ray/api.h @@ -111,8 +111,7 @@ ray::internal::ActorCreator Actor(PyActorClass func); ray::internal::ActorCreator Actor(JavaActorClass func); /// Get a handle to a named actor in current namespace. -/// Gets a handle to a named actor with the given name. The actor must have been created -/// with name specified. +/// The actor must have been created with name specified. /// /// \param[in] actor_name The name of the named actor. /// \return An ActorHandle to the actor if the actor of specified name exists or an @@ -120,6 +119,17 @@ ray::internal::ActorCreator Actor(JavaActorClass func); template boost::optional> GetActor(const std::string &actor_name); +/// Get a handle to a named actor in the given namespace. +/// The actor must have been created with name specified. +/// +/// \param[in] actor_name The name of the named actor. +/// \param[in] namespace The namespace of the actor. +/// \return An ActorHandle to the actor if the actor of specified name exists in +/// specifiled namespace or an empty optional object. +template +boost::optional> GetActor(const std::string &actor_name, + const std::string &ray_namespace); + /// Intentionally exit the current actor. /// It is used to disconnect an actor and exit the worker. /// \Throws RayException if the current process is a driver or the current worker is not @@ -271,11 +281,17 @@ inline ray::internal::ActorCreator Actor(F create_func) { template boost::optional> GetActor(const std::string &actor_name) { + return GetActor(actor_name, ""); +} + +template +boost::optional> GetActor(const std::string &actor_name, + const std::string &ray_namespace) { if (actor_name.empty()) { return {}; } - auto actor_id = ray::internal::GetRayRuntime()->GetActorId(actor_name); + auto actor_id = ray::internal::GetRayRuntime()->GetActorId(actor_name, ray_namespace); if (actor_id.empty()) { return {}; } diff --git a/cpp/include/ray/api/actor_creator.h b/cpp/include/ray/api/actor_creator.h index 89742c345de39..feecfd20f40e3 100644 --- a/cpp/include/ray/api/actor_creator.h +++ b/cpp/include/ray/api/actor_creator.h @@ -39,6 +39,12 @@ class ActorCreator { return *this; } + ActorCreator &SetName(std::string name, std::string ray_namespace) { + create_options_.name = std::move(name); + create_options_.ray_namespace = std::move(ray_namespace); + return *this; + } + ActorCreator &SetResources(std::unordered_map resources) { create_options_.resources = std::move(resources); return *this; diff --git a/cpp/include/ray/api/ray_config.h b/cpp/include/ray/api/ray_config.h index ea187a65de3f4..5286b99c303f9 100644 --- a/cpp/include/ray/api/ray_config.h +++ b/cpp/include/ray/api/ray_config.h @@ -59,6 +59,9 @@ class RayConfig { // A specific flag for internal `default_worker`. Please don't use it in user code. bool is_worker_ = false; + + // A namespace is a logical grouping of jobs and named actors. + std::string ray_namespace = ""; }; } // namespace ray diff --git a/cpp/include/ray/api/ray_runtime.h b/cpp/include/ray/api/ray_runtime.h index ad29e792845e2..59f8ea6fbefce 100644 --- a/cpp/include/ray/api/ray_runtime.h +++ b/cpp/include/ray/api/ray_runtime.h @@ -78,7 +78,8 @@ class RayRuntime { const CallOptions &call_options) = 0; virtual void AddLocalReference(const std::string &id) = 0; virtual void RemoveLocalReference(const std::string &id) = 0; - virtual std::string GetActorId(const std::string &actor_name) = 0; + virtual std::string GetActorId(const std::string &actor_name, + const std::string &ray_namespace) = 0; virtual void KillActor(const std::string &str_actor_id, bool no_restart) = 0; virtual void ExitActor() = 0; virtual ray::PlacementGroup CreatePlacementGroup( diff --git a/cpp/include/ray/api/task_options.h b/cpp/include/ray/api/task_options.h index 5bae8abcaad00..8bdbf8327f5c0 100644 --- a/cpp/include/ray/api/task_options.h +++ b/cpp/include/ray/api/task_options.h @@ -99,6 +99,7 @@ struct CallOptions { struct ActorCreationOptions { std::string name; + std::string ray_namespace; std::unordered_map resources; int max_restarts = 0; int max_concurrency = 1; diff --git a/cpp/src/ray/config_internal.cc b/cpp/src/ray/config_internal.cc index de43092c0cfd6..8a8f3e9703e95 100644 --- a/cpp/src/ray/config_internal.cc +++ b/cpp/src/ray/config_internal.cc @@ -186,6 +186,10 @@ void ConfigInternal::Init(RayConfig &config, int argc, char **argv) { code_search_path = absolute_path; } } + if (worker_type == WorkerType::DRIVER) { + ray_namespace = + config.ray_namespace.empty() ? GenerateUUIDV4() : config.ray_namespace; + } }; void ConfigInternal::SetBootstrapAddress(std::string_view address) { diff --git a/cpp/src/ray/config_internal.h b/cpp/src/ray/config_internal.h index 8e24a72533762..d3c15be90b3c1 100644 --- a/cpp/src/ray/config_internal.h +++ b/cpp/src/ray/config_internal.h @@ -64,6 +64,8 @@ class ConfigInternal { rpc::JobConfig_ActorLifetime default_actor_lifetime = rpc::JobConfig_ActorLifetime_NON_DETACHED; + std::string ray_namespace = ""; + static ConfigInternal &Instance() { static ConfigInternal config; return config; diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.cc b/cpp/src/ray/runtime/abstract_ray_runtime.cc index 7596c79dbfabf..73af3afefb9ca 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.cc +++ b/cpp/src/ray/runtime/abstract_ray_runtime.cc @@ -242,8 +242,9 @@ void AbstractRayRuntime::RemoveLocalReference(const std::string &id) { } } -std::string AbstractRayRuntime::GetActorId(const std::string &actor_name) { - auto actor_id = task_submitter_->GetActor(actor_name); +std::string AbstractRayRuntime::GetActorId(const std::string &actor_name, + const std::string &ray_namespace) { + auto actor_id = task_submitter_->GetActor(actor_name, ray_namespace); if (actor_id.IsNil()) { return ""; } @@ -350,7 +351,9 @@ PlacementGroup AbstractRayRuntime::GetPlacementGroupById(const std::string &id) } PlacementGroup AbstractRayRuntime::GetPlacementGroup(const std::string &name) { - auto str_ptr = global_state_accessor_->GetPlacementGroupByName(name, ""); + // TODO(WangTaoTheTonic): Add namespace support for placement group. + auto str_ptr = global_state_accessor_->GetPlacementGroupByName( + name, CoreWorkerProcess::GetCoreWorker().GetJobConfig().ray_namespace()); if (str_ptr == nullptr) { return {}; } diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.h b/cpp/src/ray/runtime/abstract_ray_runtime.h index 12b061fdc7324..63e0d5fa75735 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.h +++ b/cpp/src/ray/runtime/abstract_ray_runtime.h @@ -75,7 +75,7 @@ class AbstractRayRuntime : public RayRuntime { void RemoveLocalReference(const std::string &id); - std::string GetActorId(const std::string &actor_name); + std::string GetActorId(const std::string &actor_name, const std::string &ray_namespace); void KillActor(const std::string &str_actor_id, bool no_restart); diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc index 5e3718d17e1ba..c62765d53b10b 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc @@ -145,7 +145,8 @@ ObjectID LocalModeTaskSubmitter::SubmitActorTask(InvocationSpec &invocation, return Submit(invocation, {}); } -ActorID LocalModeTaskSubmitter::GetActor(const std::string &actor_name) const { +ActorID LocalModeTaskSubmitter::GetActor(const std::string &actor_name, + const std::string &ray_namespace) const { absl::MutexLock lock(&named_actors_mutex_); auto it = named_actors_.find(actor_name); if (it == named_actors_.end()) { diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.h b/cpp/src/ray/runtime/task/local_mode_task_submitter.h index d79b76ea67a16..010a9d9c2369e 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.h +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.h @@ -38,7 +38,7 @@ class LocalModeTaskSubmitter : public TaskSubmitter { ObjectID SubmitActorTask(InvocationSpec &invocation, const CallOptions &call_options); - ActorID GetActor(const std::string &actor_name) const; + ActorID GetActor(const std::string &actor_name, const std::string &ray_namespace) const; ray::PlacementGroup CreatePlacementGroup( const ray::PlacementGroupCreationOptions &create_options); diff --git a/cpp/src/ray/runtime/task/native_task_submitter.cc b/cpp/src/ray/runtime/task/native_task_submitter.cc index eb5d7632c3176..41521b896e10c 100644 --- a/cpp/src/ray/runtime/task/native_task_submitter.cc +++ b/cpp/src/ray/runtime/task/native_task_submitter.cc @@ -108,7 +108,7 @@ ActorID NativeTaskSubmitter::CreateActor(InvocationSpec &invocation, auto &core_worker = CoreWorkerProcess::GetCoreWorker(); std::unordered_map resources; std::string name = create_options.name; - std::string ray_namespace = ""; + std::string ray_namespace = create_options.ray_namespace; BundleID bundle_id = GetBundleID(create_options); rpc::SchedulingStrategy scheduling_strategy; scheduling_strategy.mutable_default_scheduling_strategy(); @@ -145,9 +145,12 @@ ObjectID NativeTaskSubmitter::SubmitActorTask(InvocationSpec &invocation, return Submit(invocation, task_options); } -ActorID NativeTaskSubmitter::GetActor(const std::string &actor_name) const { +ActorID NativeTaskSubmitter::GetActor(const std::string &actor_name, + const std::string &ray_namespace) const { auto &core_worker = CoreWorkerProcess::GetCoreWorker(); - auto pair = core_worker.GetNamedActorHandle(actor_name, ""); + const std::string ns = + ray_namespace.empty() ? core_worker.GetJobConfig().ray_namespace() : ray_namespace; + auto pair = core_worker.GetNamedActorHandle(actor_name, ns); if (!pair.second.ok()) { RAY_LOG(WARNING) << pair.second.message(); return ActorID::Nil(); diff --git a/cpp/src/ray/runtime/task/native_task_submitter.h b/cpp/src/ray/runtime/task/native_task_submitter.h index 5beb5bbe54f40..6b71e6b329932 100644 --- a/cpp/src/ray/runtime/task/native_task_submitter.h +++ b/cpp/src/ray/runtime/task/native_task_submitter.h @@ -30,7 +30,7 @@ class NativeTaskSubmitter : public TaskSubmitter { ObjectID SubmitActorTask(InvocationSpec &invocation, const CallOptions &call_options); - ActorID GetActor(const std::string &actor_name) const; + ActorID GetActor(const std::string &actor_name, const std::string &ray_namespace) const; ray::PlacementGroup CreatePlacementGroup( const ray::PlacementGroupCreationOptions &create_options); diff --git a/cpp/src/ray/runtime/task/task_submitter.h b/cpp/src/ray/runtime/task/task_submitter.h index 567140886b766..5694aedc5cab1 100644 --- a/cpp/src/ray/runtime/task/task_submitter.h +++ b/cpp/src/ray/runtime/task/task_submitter.h @@ -38,7 +38,8 @@ class TaskSubmitter { virtual ObjectID SubmitActorTask(InvocationSpec &invocation, const CallOptions &call_options) = 0; - virtual ActorID GetActor(const std::string &actor_name) const = 0; + virtual ActorID GetActor(const std::string &actor_name, + const std::string &ray_namespace) const = 0; virtual ray::PlacementGroup CreatePlacementGroup( const ray::PlacementGroupCreationOptions &create_options) = 0; diff --git a/cpp/src/ray/test/cluster/cluster_mode_test.cc b/cpp/src/ray/test/cluster/cluster_mode_test.cc index 7fec0be0cae3b..a696549b3984d 100644 --- a/cpp/src/ray/test/cluster/cluster_mode_test.cc +++ b/cpp/src/ray/test/cluster/cluster_mode_test.cc @@ -479,6 +479,41 @@ TEST(RayClusterModeTest, TaskWithPlacementGroup) { ray::RemovePlacementGroup(placement_group.GetID()); } +TEST(RayClusterModeTest, NamespaceTest) { + // Create a named actor in namespace `isolated_ns`. + std::string actor_name_in_isolated_ns = "named_actor_in_isolated_ns"; + std::string isolated_ns_name = "isolated_ns"; + ray::ActorHandle actor = + ray::Actor(RAY_FUNC(Counter::FactoryCreate)) + .SetName(actor_name_in_isolated_ns, isolated_ns_name) + .Remote(); + auto initialized_obj = actor.Task(&Counter::Initialized).Remote(); + EXPECT_TRUE(*initialized_obj.Get()); + // It is invisible to job default namespace. + auto actor_optional = ray::GetActor(actor_name_in_isolated_ns); + EXPECT_TRUE(!actor_optional); + // It is visible to the namespace it belongs. + actor_optional = ray::GetActor(actor_name_in_isolated_ns, isolated_ns_name); + EXPECT_TRUE(actor_optional); + // It is invisible to any other namespaces. + actor_optional = ray::GetActor(actor_name_in_isolated_ns, "other_ns"); + EXPECT_TRUE(!actor_optional); + + // Create a named actor in job default namespace. + std::string actor_name_in_default_ns = "actor_name_in_default_ns"; + actor = ray::Actor(RAY_FUNC(Counter::FactoryCreate)) + .SetName(actor_name_in_default_ns) + .Remote(); + initialized_obj = actor.Task(&Counter::Initialized).Remote(); + EXPECT_TRUE(*initialized_obj.Get()); + // It is visible to job default namespace. + actor_optional = ray::GetActor(actor_name_in_default_ns); + EXPECT_TRUE(actor_optional); + // It is invisible to any other namespaces. + actor_optional = ray::GetActor(actor_name_in_default_ns, isolated_ns_name); + EXPECT_TRUE(!actor_optional); +} + int main(int argc, char **argv) { absl::ParseCommandLine(argc, argv); cmd_argc = argc; diff --git a/cpp/src/ray/util/process_helper.cc b/cpp/src/ray/util/process_helper.cc index ad3d34b966dcb..d684c67b4ba9d 100644 --- a/cpp/src/ray/util/process_helper.cc +++ b/cpp/src/ray/util/process_helper.cc @@ -151,6 +151,7 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback) for (const auto &path : ConfigInternal::Instance().code_search_path) { job_config.add_code_search_path(path); } + job_config.set_ray_namespace(ConfigInternal::Instance().ray_namespace); std::string serialized_job_config; RAY_CHECK(job_config.SerializeToString(&serialized_job_config)); options.serialized_job_config = serialized_job_config; diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 97977d280799d..d968d9c1436cf 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1107,7 +1107,7 @@ def init( is true. log_to_driver: If true, the output from all of the worker processes on all nodes will be directed to the driver. - namespace: Namespace to use + namespace: A namespace is a logical grouping of jobs and named actors. runtime_env: The runtime environment to use for this job (see :ref:`runtime-environments` for details). storage: [Experimental] Specify a URI for persistent cluster-wide storage. diff --git a/src/ray/util/util.h b/src/ray/util/util.h index 7cd8650c19756..78c534e62d857 100644 --- a/src/ray/util/util.h +++ b/src/ray/util/util.h @@ -117,6 +117,38 @@ inline int64_t current_sys_time_us() { return mu_since_epoch.count(); } +inline std::string GenerateUUIDV4() { + static std::random_device rd; + static std::mt19937 gen(rd()); + static std::uniform_int_distribution<> dis(0, 15); + static std::uniform_int_distribution<> dis2(8, 11); + + std::stringstream ss; + int i; + ss << std::hex; + for (i = 0; i < 8; i++) { + ss << dis(gen); + } + ss << "-"; + for (i = 0; i < 4; i++) { + ss << dis(gen); + } + ss << "-4"; + for (i = 0; i < 3; i++) { + ss << dis(gen); + } + ss << "-"; + ss << dis2(gen); + for (i = 0; i < 3; i++) { + ss << dis(gen); + } + ss << "-"; + for (i = 0; i < 12; i++) { + ss << dis(gen); + }; + return ss.str(); +} + /// A helper function to parse command-line arguments in a platform-compatible manner. /// /// \param cmdline The command-line to split.