Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][c++ worker]Add namespace support for c++ worker #26327

Merged
merged 8 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions cpp/include/ray/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,25 @@ ray::internal::ActorCreator<PyActorClass> Actor(PyActorClass func);
ray::internal::ActorCreator<JavaActorClass> Actor(JavaActorClass func);

/// Get a handle to a named actor in current namespace.
/// Gets a handle to a named actor with the given name. The actor must have been created
/// with name specified.
/// The actor must have been created with name specified.
///
/// \param[in] actor_name The name of the named actor.
/// \return An ActorHandle to the actor if the actor of specified name exists or an
/// empty optional object.
template <typename T>
boost::optional<ActorHandle<T>> GetActor(const std::string &actor_name);

/// Get a handle to a named actor in the given namespace.
/// The actor must have been created with name specified.
///
/// \param[in] actor_name The name of the named actor.
/// \param[in] namespace The namespace of the actor.
/// \return An ActorHandle to the actor if the actor of specified name exists in
/// specifiled namespace or an empty optional object.
template <typename T>
boost::optional<ActorHandle<T>> GetActor(const std::string &actor_name,
const std::string &ray_namespace);

/// Intentionally exit the current actor.
/// It is used to disconnect an actor and exit the worker.
/// \Throws RayException if the current process is a driver or the current worker is not
Expand Down Expand Up @@ -271,11 +281,17 @@ inline ray::internal::ActorCreator<F> Actor(F create_func) {

template <typename T>
boost::optional<ActorHandle<T>> GetActor(const std::string &actor_name) {
return GetActor<T>(actor_name, "");
}

template <typename T>
boost::optional<ActorHandle<T>> GetActor(const std::string &actor_name,
const std::string &ray_namespace) {
if (actor_name.empty()) {
return {};
}

auto actor_id = ray::internal::GetRayRuntime()->GetActorId(actor_name);
auto actor_id = ray::internal::GetRayRuntime()->GetActorId(actor_name, ray_namespace);
if (actor_id.empty()) {
return {};
}
Expand Down
6 changes: 6 additions & 0 deletions cpp/include/ray/api/actor_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class ActorCreator {
return *this;
}

ActorCreator &SetName(std::string name, std::string ray_namespace) {
create_options_.name = std::move(name);
create_options_.ray_namespace = std::move(ray_namespace);
return *this;
}

ActorCreator &SetResources(std::unordered_map<std::string, double> resources) {
create_options_.resources = std::move(resources);
return *this;
Expand Down
3 changes: 3 additions & 0 deletions cpp/include/ray/api/ray_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class RayConfig {

// A specific flag for internal `default_worker`. Please don't use it in user code.
bool is_worker_ = false;

WangTaoTheTonic marked this conversation as resolved.
Show resolved Hide resolved
// A namespace is a logical grouping of jobs and named actors.
std::string ray_namespace = "";
};

} // namespace ray
3 changes: 2 additions & 1 deletion cpp/include/ray/api/ray_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class RayRuntime {
const CallOptions &call_options) = 0;
virtual void AddLocalReference(const std::string &id) = 0;
virtual void RemoveLocalReference(const std::string &id) = 0;
virtual std::string GetActorId(const std::string &actor_name) = 0;
virtual std::string GetActorId(const std::string &actor_name,
const std::string &ray_namespace) = 0;
virtual void KillActor(const std::string &str_actor_id, bool no_restart) = 0;
virtual void ExitActor() = 0;
virtual ray::PlacementGroup CreatePlacementGroup(
Expand Down
1 change: 1 addition & 0 deletions cpp/include/ray/api/task_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ struct CallOptions {

struct ActorCreationOptions {
std::string name;
std::string ray_namespace;
std::unordered_map<std::string, double> resources;
int max_restarts = 0;
int max_concurrency = 1;
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/ray/config_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ void ConfigInternal::Init(RayConfig &config, int argc, char **argv) {
code_search_path = absolute_path;
}
}
if (worker_type == WorkerType::DRIVER) {
ray_namespace =
config.ray_namespace.empty() ? GenerateUUIDV4() : config.ray_namespace;
}
};

void ConfigInternal::SetBootstrapAddress(std::string_view address) {
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/ray/config_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class ConfigInternal {
rpc::JobConfig_ActorLifetime default_actor_lifetime =
rpc::JobConfig_ActorLifetime_NON_DETACHED;

std::string ray_namespace = "";

static ConfigInternal &Instance() {
static ConfigInternal config;
return config;
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/ray/runtime/abstract_ray_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,9 @@ void AbstractRayRuntime::RemoveLocalReference(const std::string &id) {
}
}

std::string AbstractRayRuntime::GetActorId(const std::string &actor_name) {
auto actor_id = task_submitter_->GetActor(actor_name);
std::string AbstractRayRuntime::GetActorId(const std::string &actor_name,
const std::string &ray_namespace) {
auto actor_id = task_submitter_->GetActor(actor_name, ray_namespace);
if (actor_id.IsNil()) {
return "";
}
Expand Down Expand Up @@ -350,7 +351,9 @@ PlacementGroup AbstractRayRuntime::GetPlacementGroupById(const std::string &id)
}

PlacementGroup AbstractRayRuntime::GetPlacementGroup(const std::string &name) {
auto str_ptr = global_state_accessor_->GetPlacementGroupByName(name, "");
// TODO(WangTaoTheTonic): Add namespace support for placement group.
auto str_ptr = global_state_accessor_->GetPlacementGroupByName(
name, CoreWorkerProcess::GetCoreWorker().GetJobConfig().ray_namespace());
if (str_ptr == nullptr) {
return {};
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/abstract_ray_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class AbstractRayRuntime : public RayRuntime {

void RemoveLocalReference(const std::string &id);

std::string GetActorId(const std::string &actor_name);
std::string GetActorId(const std::string &actor_name, const std::string &ray_namespace);

void KillActor(const std::string &str_actor_id, bool no_restart);

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/ray/runtime/task/local_mode_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ ObjectID LocalModeTaskSubmitter::SubmitActorTask(InvocationSpec &invocation,
return Submit(invocation, {});
}

ActorID LocalModeTaskSubmitter::GetActor(const std::string &actor_name) const {
ActorID LocalModeTaskSubmitter::GetActor(const std::string &actor_name,
const std::string &ray_namespace) const {
absl::MutexLock lock(&named_actors_mutex_);
auto it = named_actors_.find(actor_name);
if (it == named_actors_.end()) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/task/local_mode_task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class LocalModeTaskSubmitter : public TaskSubmitter {

ObjectID SubmitActorTask(InvocationSpec &invocation, const CallOptions &call_options);

ActorID GetActor(const std::string &actor_name) const;
ActorID GetActor(const std::string &actor_name, const std::string &ray_namespace) const;

ray::PlacementGroup CreatePlacementGroup(
const ray::PlacementGroupCreationOptions &create_options);
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/ray/runtime/task/native_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ ActorID NativeTaskSubmitter::CreateActor(InvocationSpec &invocation,
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
std::unordered_map<std::string, double> resources;
std::string name = create_options.name;
std::string ray_namespace = "";
std::string ray_namespace = create_options.ray_namespace;
BundleID bundle_id = GetBundleID(create_options);
rpc::SchedulingStrategy scheduling_strategy;
scheduling_strategy.mutable_default_scheduling_strategy();
Expand Down Expand Up @@ -145,9 +145,12 @@ ObjectID NativeTaskSubmitter::SubmitActorTask(InvocationSpec &invocation,
return Submit(invocation, task_options);
}

ActorID NativeTaskSubmitter::GetActor(const std::string &actor_name) const {
ActorID NativeTaskSubmitter::GetActor(const std::string &actor_name,
const std::string &ray_namespace) const {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
auto pair = core_worker.GetNamedActorHandle(actor_name, "");
const std::string ns =
ray_namespace.empty() ? core_worker.GetJobConfig().ray_namespace() : ray_namespace;
auto pair = core_worker.GetNamedActorHandle(actor_name, ns);
if (!pair.second.ok()) {
RAY_LOG(WARNING) << pair.second.message();
return ActorID::Nil();
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/task/native_task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class NativeTaskSubmitter : public TaskSubmitter {

ObjectID SubmitActorTask(InvocationSpec &invocation, const CallOptions &call_options);

ActorID GetActor(const std::string &actor_name) const;
ActorID GetActor(const std::string &actor_name, const std::string &ray_namespace) const;

ray::PlacementGroup CreatePlacementGroup(
const ray::PlacementGroupCreationOptions &create_options);
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/ray/runtime/task/task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class TaskSubmitter {
virtual ObjectID SubmitActorTask(InvocationSpec &invocation,
const CallOptions &call_options) = 0;

virtual ActorID GetActor(const std::string &actor_name) const = 0;
virtual ActorID GetActor(const std::string &actor_name,
const std::string &ray_namespace) const = 0;

virtual ray::PlacementGroup CreatePlacementGroup(
const ray::PlacementGroupCreationOptions &create_options) = 0;
Expand Down
19 changes: 19 additions & 0 deletions cpp/src/ray/test/cluster/cluster_mode_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,25 @@ TEST(RayClusterModeTest, TaskWithPlacementGroup) {
ray::RemovePlacementGroup(placement_group.GetID());
}

TEST(RayClusterModeTest, NamespaceTest) {
// Create a named actor in namespace `isolated_ns`.
std::string actor_name = "named_actor";
std::string ns_name = "isolated_ns";
ray::ActorHandle<Counter> actor =
ray::Actor(RAY_FUNC(Counter::FactoryCreate)).SetName(actor_name, ns_name).Remote();
auto initialized_obj = actor.Task(&Counter::Initialized).Remote();
EXPECT_TRUE(*initialized_obj.Get());
// It is invisible to job default namespace.
auto actor_optional = ray::GetActor<Counter>(actor_name);
EXPECT_TRUE(!actor_optional);
// It is visible to the namespace it belongs.
actor_optional = ray::GetActor<Counter>(actor_name, ns_name);
EXPECT_TRUE(actor_optional);
// It is invisible to any other namespaces.
actor_optional = ray::GetActor<Counter>(actor_name, "other_ns");
EXPECT_TRUE(!actor_optional);
}

int main(int argc, char **argv) {
absl::ParseCommandLine(argc, argv);
cmd_argc = argc;
Expand Down
1 change: 1 addition & 0 deletions cpp/src/ray/util/process_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
for (const auto &path : ConfigInternal::Instance().code_search_path) {
job_config.add_code_search_path(path);
}
job_config.set_ray_namespace(ConfigInternal::Instance().ray_namespace);
std::string serialized_job_config;
RAY_CHECK(job_config.SerializeToString(&serialized_job_config));
options.serialized_job_config = serialized_job_config;
Expand Down
2 changes: 1 addition & 1 deletion python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,7 @@ def init(
is true.
log_to_driver: If true, the output from all of the worker
processes on all nodes will be directed to the driver.
namespace: Namespace to use
namespace: A namespace is a logical grouping of jobs and named actors.
runtime_env: The runtime environment to use
for this job (see :ref:`runtime-environments` for details).
storage: [Experimental] Specify a URI for persistent cluster-wide storage.
Expand Down
32 changes: 32 additions & 0 deletions src/ray/util/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,38 @@ inline int64_t current_sys_time_us() {
return mu_since_epoch.count();
}

inline std::string GenerateUUIDV4() {
static std::random_device rd;
static std::mt19937 gen(rd());
static std::uniform_int_distribution<> dis(0, 15);
static std::uniform_int_distribution<> dis2(8, 11);

std::stringstream ss;
int i;
ss << std::hex;
for (i = 0; i < 8; i++) {
ss << dis(gen);
}
ss << "-";
for (i = 0; i < 4; i++) {
ss << dis(gen);
}
ss << "-4";
for (i = 0; i < 3; i++) {
ss << dis(gen);
}
ss << "-";
ss << dis2(gen);
for (i = 0; i < 3; i++) {
ss << dis(gen);
}
ss << "-";
for (i = 0; i < 12; i++) {
ss << dis(gen);
};
return ss.str();
}

/// A helper function to parse command-line arguments in a platform-compatible manner.
///
/// \param cmdline The command-line to split.
Expand Down