Skip to content

Commit

Permalink
[core][c++ worker]Add namespace support for c++ worker (ray-project#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
WangTaoTheTonic committed Jul 12, 2022
1 parent be6e4c6 commit 1de0d35
Show file tree
Hide file tree
Showing 18 changed files with 125 additions and 16 deletions.
22 changes: 19 additions & 3 deletions cpp/include/ray/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,25 @@ ray::internal::ActorCreator<PyActorClass> Actor(PyActorClass func);
ray::internal::ActorCreator<JavaActorClass> Actor(JavaActorClass func);

/// Get a handle to a named actor in current namespace.
/// Gets a handle to a named actor with the given name. The actor must have been created
/// with name specified.
/// The actor must have been created with name specified.
///
/// \param[in] actor_name The name of the named actor.
/// \return An ActorHandle to the actor if the actor of specified name exists or an
/// empty optional object.
template <typename T>
boost::optional<ActorHandle<T>> GetActor(const std::string &actor_name);

/// Get a handle to a named actor in the given namespace.
/// The actor must have been created with name specified.
///
/// \param[in] actor_name The name of the named actor.
/// \param[in] namespace The namespace of the actor.
/// \return An ActorHandle to the actor if the actor of specified name exists in
/// specifiled namespace or an empty optional object.
template <typename T>
boost::optional<ActorHandle<T>> GetActor(const std::string &actor_name,
const std::string &ray_namespace);

/// Intentionally exit the current actor.
/// It is used to disconnect an actor and exit the worker.
/// \Throws RayException if the current process is a driver or the current worker is not
Expand Down Expand Up @@ -271,11 +281,17 @@ inline ray::internal::ActorCreator<F> Actor(F create_func) {

template <typename T>
boost::optional<ActorHandle<T>> GetActor(const std::string &actor_name) {
return GetActor<T>(actor_name, "");
}

template <typename T>
boost::optional<ActorHandle<T>> GetActor(const std::string &actor_name,
const std::string &ray_namespace) {
if (actor_name.empty()) {
return {};
}

auto actor_id = ray::internal::GetRayRuntime()->GetActorId(actor_name);
auto actor_id = ray::internal::GetRayRuntime()->GetActorId(actor_name, ray_namespace);
if (actor_id.empty()) {
return {};
}
Expand Down
6 changes: 6 additions & 0 deletions cpp/include/ray/api/actor_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class ActorCreator {
return *this;
}

ActorCreator &SetName(std::string name, std::string ray_namespace) {
create_options_.name = std::move(name);
create_options_.ray_namespace = std::move(ray_namespace);
return *this;
}

ActorCreator &SetResources(std::unordered_map<std::string, double> resources) {
create_options_.resources = std::move(resources);
return *this;
Expand Down
3 changes: 3 additions & 0 deletions cpp/include/ray/api/ray_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class RayConfig {

// A specific flag for internal `default_worker`. Please don't use it in user code.
bool is_worker_ = false;

// A namespace is a logical grouping of jobs and named actors.
std::string ray_namespace = "";
};

} // namespace ray
3 changes: 2 additions & 1 deletion cpp/include/ray/api/ray_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class RayRuntime {
const CallOptions &call_options) = 0;
virtual void AddLocalReference(const std::string &id) = 0;
virtual void RemoveLocalReference(const std::string &id) = 0;
virtual std::string GetActorId(const std::string &actor_name) = 0;
virtual std::string GetActorId(const std::string &actor_name,
const std::string &ray_namespace) = 0;
virtual void KillActor(const std::string &str_actor_id, bool no_restart) = 0;
virtual void ExitActor() = 0;
virtual ray::PlacementGroup CreatePlacementGroup(
Expand Down
1 change: 1 addition & 0 deletions cpp/include/ray/api/task_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ struct CallOptions {

struct ActorCreationOptions {
std::string name;
std::string ray_namespace;
std::unordered_map<std::string, double> resources;
int max_restarts = 0;
int max_concurrency = 1;
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/ray/config_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ void ConfigInternal::Init(RayConfig &config, int argc, char **argv) {
code_search_path = absolute_path;
}
}
if (worker_type == WorkerType::DRIVER) {
ray_namespace =
config.ray_namespace.empty() ? GenerateUUIDV4() : config.ray_namespace;
}
};

void ConfigInternal::SetBootstrapAddress(std::string_view address) {
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/ray/config_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class ConfigInternal {
rpc::JobConfig_ActorLifetime default_actor_lifetime =
rpc::JobConfig_ActorLifetime_NON_DETACHED;

std::string ray_namespace = "";

static ConfigInternal &Instance() {
static ConfigInternal config;
return config;
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/ray/runtime/abstract_ray_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,9 @@ void AbstractRayRuntime::RemoveLocalReference(const std::string &id) {
}
}

std::string AbstractRayRuntime::GetActorId(const std::string &actor_name) {
auto actor_id = task_submitter_->GetActor(actor_name);
std::string AbstractRayRuntime::GetActorId(const std::string &actor_name,
const std::string &ray_namespace) {
auto actor_id = task_submitter_->GetActor(actor_name, ray_namespace);
if (actor_id.IsNil()) {
return "";
}
Expand Down Expand Up @@ -350,7 +351,9 @@ PlacementGroup AbstractRayRuntime::GetPlacementGroupById(const std::string &id)
}

PlacementGroup AbstractRayRuntime::GetPlacementGroup(const std::string &name) {
auto str_ptr = global_state_accessor_->GetPlacementGroupByName(name, "");
// TODO(WangTaoTheTonic): Add namespace support for placement group.
auto str_ptr = global_state_accessor_->GetPlacementGroupByName(
name, CoreWorkerProcess::GetCoreWorker().GetJobConfig().ray_namespace());
if (str_ptr == nullptr) {
return {};
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/abstract_ray_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class AbstractRayRuntime : public RayRuntime {

void RemoveLocalReference(const std::string &id);

std::string GetActorId(const std::string &actor_name);
std::string GetActorId(const std::string &actor_name, const std::string &ray_namespace);

void KillActor(const std::string &str_actor_id, bool no_restart);

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/ray/runtime/task/local_mode_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ ObjectID LocalModeTaskSubmitter::SubmitActorTask(InvocationSpec &invocation,
return Submit(invocation, {});
}

ActorID LocalModeTaskSubmitter::GetActor(const std::string &actor_name) const {
ActorID LocalModeTaskSubmitter::GetActor(const std::string &actor_name,
const std::string &ray_namespace) const {
absl::MutexLock lock(&named_actors_mutex_);
auto it = named_actors_.find(actor_name);
if (it == named_actors_.end()) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/task/local_mode_task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class LocalModeTaskSubmitter : public TaskSubmitter {

ObjectID SubmitActorTask(InvocationSpec &invocation, const CallOptions &call_options);

ActorID GetActor(const std::string &actor_name) const;
ActorID GetActor(const std::string &actor_name, const std::string &ray_namespace) const;

ray::PlacementGroup CreatePlacementGroup(
const ray::PlacementGroupCreationOptions &create_options);
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/ray/runtime/task/native_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ ActorID NativeTaskSubmitter::CreateActor(InvocationSpec &invocation,
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
std::unordered_map<std::string, double> resources;
std::string name = create_options.name;
std::string ray_namespace = "";
std::string ray_namespace = create_options.ray_namespace;
BundleID bundle_id = GetBundleID(create_options);
rpc::SchedulingStrategy scheduling_strategy;
scheduling_strategy.mutable_default_scheduling_strategy();
Expand Down Expand Up @@ -145,9 +145,12 @@ ObjectID NativeTaskSubmitter::SubmitActorTask(InvocationSpec &invocation,
return Submit(invocation, task_options);
}

ActorID NativeTaskSubmitter::GetActor(const std::string &actor_name) const {
ActorID NativeTaskSubmitter::GetActor(const std::string &actor_name,
const std::string &ray_namespace) const {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
auto pair = core_worker.GetNamedActorHandle(actor_name, "");
const std::string ns =
ray_namespace.empty() ? core_worker.GetJobConfig().ray_namespace() : ray_namespace;
auto pair = core_worker.GetNamedActorHandle(actor_name, ns);
if (!pair.second.ok()) {
RAY_LOG(WARNING) << pair.second.message();
return ActorID::Nil();
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/task/native_task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class NativeTaskSubmitter : public TaskSubmitter {

ObjectID SubmitActorTask(InvocationSpec &invocation, const CallOptions &call_options);

ActorID GetActor(const std::string &actor_name) const;
ActorID GetActor(const std::string &actor_name, const std::string &ray_namespace) const;

ray::PlacementGroup CreatePlacementGroup(
const ray::PlacementGroupCreationOptions &create_options);
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/ray/runtime/task/task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class TaskSubmitter {
virtual ObjectID SubmitActorTask(InvocationSpec &invocation,
const CallOptions &call_options) = 0;

virtual ActorID GetActor(const std::string &actor_name) const = 0;
virtual ActorID GetActor(const std::string &actor_name,
const std::string &ray_namespace) const = 0;

virtual ray::PlacementGroup CreatePlacementGroup(
const ray::PlacementGroupCreationOptions &create_options) = 0;
Expand Down
35 changes: 35 additions & 0 deletions cpp/src/ray/test/cluster/cluster_mode_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,41 @@ TEST(RayClusterModeTest, TaskWithPlacementGroup) {
ray::RemovePlacementGroup(placement_group.GetID());
}

TEST(RayClusterModeTest, NamespaceTest) {
// Create a named actor in namespace `isolated_ns`.
std::string actor_name_in_isolated_ns = "named_actor_in_isolated_ns";
std::string isolated_ns_name = "isolated_ns";
ray::ActorHandle<Counter> actor =
ray::Actor(RAY_FUNC(Counter::FactoryCreate))
.SetName(actor_name_in_isolated_ns, isolated_ns_name)
.Remote();
auto initialized_obj = actor.Task(&Counter::Initialized).Remote();
EXPECT_TRUE(*initialized_obj.Get());
// It is invisible to job default namespace.
auto actor_optional = ray::GetActor<Counter>(actor_name_in_isolated_ns);
EXPECT_TRUE(!actor_optional);
// It is visible to the namespace it belongs.
actor_optional = ray::GetActor<Counter>(actor_name_in_isolated_ns, isolated_ns_name);
EXPECT_TRUE(actor_optional);
// It is invisible to any other namespaces.
actor_optional = ray::GetActor<Counter>(actor_name_in_isolated_ns, "other_ns");
EXPECT_TRUE(!actor_optional);

// Create a named actor in job default namespace.
std::string actor_name_in_default_ns = "actor_name_in_default_ns";
actor = ray::Actor(RAY_FUNC(Counter::FactoryCreate))
.SetName(actor_name_in_default_ns)
.Remote();
initialized_obj = actor.Task(&Counter::Initialized).Remote();
EXPECT_TRUE(*initialized_obj.Get());
// It is visible to job default namespace.
actor_optional = ray::GetActor<Counter>(actor_name_in_default_ns);
EXPECT_TRUE(actor_optional);
// It is invisible to any other namespaces.
actor_optional = ray::GetActor<Counter>(actor_name_in_default_ns, isolated_ns_name);
EXPECT_TRUE(!actor_optional);
}

int main(int argc, char **argv) {
absl::ParseCommandLine(argc, argv);
cmd_argc = argc;
Expand Down
1 change: 1 addition & 0 deletions cpp/src/ray/util/process_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
for (const auto &path : ConfigInternal::Instance().code_search_path) {
job_config.add_code_search_path(path);
}
job_config.set_ray_namespace(ConfigInternal::Instance().ray_namespace);
std::string serialized_job_config;
RAY_CHECK(job_config.SerializeToString(&serialized_job_config));
options.serialized_job_config = serialized_job_config;
Expand Down
2 changes: 1 addition & 1 deletion python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@ def init(
is true.
log_to_driver: If true, the output from all of the worker
processes on all nodes will be directed to the driver.
namespace: Namespace to use
namespace: A namespace is a logical grouping of jobs and named actors.
runtime_env: The runtime environment to use
for this job (see :ref:`runtime-environments` for details).
storage: [Experimental] Specify a URI for persistent cluster-wide storage.
Expand Down
32 changes: 32 additions & 0 deletions src/ray/util/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,38 @@ inline int64_t current_sys_time_us() {
return mu_since_epoch.count();
}

inline std::string GenerateUUIDV4() {
static std::random_device rd;
static std::mt19937 gen(rd());
static std::uniform_int_distribution<> dis(0, 15);
static std::uniform_int_distribution<> dis2(8, 11);

std::stringstream ss;
int i;
ss << std::hex;
for (i = 0; i < 8; i++) {
ss << dis(gen);
}
ss << "-";
for (i = 0; i < 4; i++) {
ss << dis(gen);
}
ss << "-4";
for (i = 0; i < 3; i++) {
ss << dis(gen);
}
ss << "-";
ss << dis2(gen);
for (i = 0; i < 3; i++) {
ss << dis(gen);
}
ss << "-";
for (i = 0; i < 12; i++) {
ss << dis(gen);
};
return ss.str();
}

/// A helper function to parse command-line arguments in a platform-compatible manner.
///
/// \param cmdline The command-line to split.
Expand Down

0 comments on commit 1de0d35

Please sign in to comment.