Skip to content

Commit

Permalink
[namespaces] Isolation for named placement groups (ray-project#16000)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Wu authored Jun 1, 2021
1 parent bfa8ebc commit de0f856
Show file tree
Hide file tree
Showing 19 changed files with 225 additions and 67 deletions.
4 changes: 3 additions & 1 deletion python/ray/includes/global_state_accessor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
unique_ptr[c_string] GetPlacementGroupInfo(
const CPlacementGroupID &placement_group_id)
unique_ptr[c_string] GetPlacementGroupByName(
const c_string &placement_group_name)
const c_string &placement_group_name,
const c_string &ray_namespace,
)
c_vector[c_string] GetAllPlacementGroupInfo()
5 changes: 3 additions & 2 deletions python/ray/includes/global_state_accessor.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,13 @@ cdef class GlobalStateAccessor:
return c_string(result.get().data(), result.get().size())
return None

def get_placement_group_by_name(self, placement_group_name):
def get_placement_group_by_name(self, placement_group_name, ray_namespace):
cdef unique_ptr[c_string] result
cdef c_string cplacement_group_name = placement_group_name
cdef c_string cray_namespace = ray_namespace
with nogil:
result = self.inner.get().GetPlacementGroupByName(
cplacement_group_name)
cplacement_group_name, cray_namespace)
if result:
return c_string(result.get().data(), result.get().size())
return None
4 changes: 2 additions & 2 deletions python/ray/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,12 @@ def profile_table(self):

return dict(result)

def get_placement_group_by_name(self, placement_group_name):
def get_placement_group_by_name(self, placement_group_name, ray_namespace):
self._check_connected()

placement_group_info = (
self.global_state_accessor.get_placement_group_by_name(
placement_group_name))
placement_group_name, ray_namespace))
if placement_group_info is None:
return None
else:
Expand Down
42 changes: 42 additions & 0 deletions python/ray/tests/test_namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,48 @@ def ping(self):
Actor.options(name="Pinger", lifetime="detached").remote()


def test_placement_groups(shutdown_only):
info = ray.init(namespace="namespace")

address = info["redis_address"]

# First param of template is the namespace. Second is the redis address.
driver_template = """
import ray
ray.init(address="{}", namespace="{}")
pg = ray.util.placement_group(bundles=[dict(CPU=1)], name="hello",
lifetime="detached")
ray.get(pg.ready())
"""

# Start a detached placement group in a different namespace.
run_string_as_driver(driver_template.format(address, "different"))

# Create an actor. This should succeed because the other actor is in a
# different namespace.
probe = ray.util.placement_group(bundles=[{"CPU": 1}], name="hello")
ray.get(probe.ready())
ray.util.remove_placement_group(probe)

removed = False
for _ in range(50): # Timeout after 5s
try:
ray.util.get_placement_group("hello")
except ValueError:
removed = True
# This means the actor was removed.
break
else:
time.sleep(0.1)

assert removed, "This is an anti-flakey test measure"

# Now make the actor in this namespace, from a different job.
run_string_as_driver(driver_template.format(address, "namespace"))


def test_default_namespace(shutdown_only):
info = ray.init(namespace="namespace")

Expand Down
4 changes: 2 additions & 2 deletions python/ray/tests/test_placement_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -1509,14 +1509,14 @@ def test_named_placement_group(ray_start_cluster):
for _ in range(2):
cluster.add_node(num_cpus=3)
cluster.wait_for_nodes()
info = ray.init(address=cluster.address)
info = ray.init(address=cluster.address, namespace="")
global_placement_group_name = "named_placement_group"

# Create a detached placement group with name.
driver_code = f"""
import ray
ray.init(address="{info["redis_address"]}")
ray.init(address="{info["redis_address"]}", namespace="")
pg = ray.util.placement_group(
[{{"CPU": 1}} for _ in range(2)],
Expand Down
2 changes: 1 addition & 1 deletion python/ray/util/placement_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def get_placement_group(placement_group_name: str) -> PlacementGroup:
worker = ray.worker.global_worker
worker.check_connected()
placement_group_info = ray.state.state.get_placement_group_by_name(
placement_group_name)
placement_group_name, worker.namespace)
if placement_group_info is None:
raise ValueError(
f"Failed to look up actor with name: {placement_group_name}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetPlacementGroupInfoByName(
auto full_name = GetFullName(global, placement_group_name);
auto *gcs_accessor =
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto placement_group = gcs_accessor->GetPlacementGroupByName(full_name);
// Java doesn't support namespaces.
auto placement_group = gcs_accessor->GetPlacementGroupByName(full_name, "");
if (placement_group) {
return NativeStringToJavaByteArray(env, *placement_group);
}
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ class PlacementGroupInfoAccessor {
/// \param placement_group_name The name of a placement group to obtain from GCS.
/// \return Status.
virtual Status AsyncGetByName(
const std::string &placement_group_name,
const std::string &placement_group_name, const std::string &ray_namespace,
const OptionalItemCallback<rpc::PlacementGroupTableData> &callback) = 0;

/// Get all placement group info from GCS asynchronously.
Expand Down
5 changes: 3 additions & 2 deletions src/ray/gcs/gcs_client/global_state_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,12 @@ std::unique_ptr<std::string> GlobalStateAccessor::GetPlacementGroupInfo(
}

std::unique_ptr<std::string> GlobalStateAccessor::GetPlacementGroupByName(
const std::string &placement_group_name) {
const std::string &placement_group_name, const std::string &ray_namespace) {
std::unique_ptr<std::string> placement_group_table_data;
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->PlacementGroups().AsyncGetByName(
placement_group_name,
placement_group_name, ray_namespace,

TransformForOptionalItemCallback<rpc::PlacementGroupTableData>(
placement_group_table_data, promise)));
promise.get_future().get();
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_client/global_state_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class GlobalStateAccessor {
/// PlacementGroupTableData and return the serialized string. Where used, it needs to be
/// deserialized with protobuf function.
std::unique_ptr<std::string> GetPlacementGroupByName(
const std::string &placement_group_name);
const std::string &placement_group_name, const std::string &ray_namespace);

private:
/// MultiItem transformation helper in template style.
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_client/service_based_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1485,11 +1485,12 @@ Status ServiceBasedPlacementGroupInfoAccessor::AsyncGet(
}

Status ServiceBasedPlacementGroupInfoAccessor::AsyncGetByName(
const std::string &name,
const std::string &name, const std::string &ray_namespace,
const OptionalItemCallback<rpc::PlacementGroupTableData> &callback) {
RAY_LOG(DEBUG) << "Getting named placement group info, name = " << name;
rpc::GetNamedPlacementGroupRequest request;
request.set_name(name);
request.set_ray_namespace(ray_namespace);
client_impl_->GetGcsRpcClient().GetNamedPlacementGroup(
request, [name, callback](const Status &status,
const rpc::GetNamedPlacementGroupReply &reply) {
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_client/service_based_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ class ServiceBasedPlacementGroupInfoAccessor : public PlacementGroupInfoAccessor
const OptionalItemCallback<rpc::PlacementGroupTableData> &callback) override;

Status AsyncGetByName(
const std::string &name,
const std::string &name, const std::string &ray_namespace,
const OptionalItemCallback<rpc::PlacementGroupTableData> &callback) override;

Status AsyncGetAll(
Expand Down
61 changes: 42 additions & 19 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ std::string GcsPlacementGroup::GetName() const {
return placement_group_table_data_.name();
}

std::string GcsPlacementGroup::GetRayNamespace() const {
return placement_group_table_data_.ray_namespace();
}

std::vector<std::shared_ptr<BundleSpecification>> GcsPlacementGroup::GetBundles() const {
const auto &bundles = placement_group_table_data_.bundles();
std::vector<std::shared_ptr<BundleSpecification>> ret_bundles;
Expand Down Expand Up @@ -113,11 +117,13 @@ GcsPlacementGroupManager::GcsPlacementGroupManager(
instrumented_io_context &io_context,
std::shared_ptr<GcsPlacementGroupSchedulerInterface> scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
GcsResourceManager &gcs_resource_manager)
GcsResourceManager &gcs_resource_manager,
std::function<std::string(const JobID &)> get_ray_namespace)
: io_context_(io_context),
gcs_placement_group_scheduler_(std::move(scheduler)),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_resource_manager_(gcs_resource_manager) {
gcs_resource_manager_(gcs_resource_manager),
get_ray_namespace_(get_ray_namespace) {
Tick();
}

Expand Down Expand Up @@ -150,10 +156,11 @@ void GcsPlacementGroupManager::RegisterPlacementGroup(
return;
}
if (!placement_group->GetName().empty()) {
auto it = named_placement_groups_.find(placement_group->GetName());
if (it == named_placement_groups_.end()) {
named_placement_groups_.emplace(placement_group->GetName(),
placement_group->GetPlacementGroupID());
auto &pgs_in_namespace = named_placement_groups_[placement_group->GetRayNamespace()];
auto it = pgs_in_namespace.find(placement_group->GetName());
if (it == pgs_in_namespace.end()) {
pgs_in_namespace.emplace(placement_group->GetName(),
placement_group->GetPlacementGroupID());
} else {
std::stringstream stream;
stream << "Failed to create placement group '"
Expand Down Expand Up @@ -201,11 +208,14 @@ void GcsPlacementGroupManager::RegisterPlacementGroup(
}

PlacementGroupID GcsPlacementGroupManager::GetPlacementGroupIDByName(
const std::string &name) {
const std::string &name, const std::string &ray_namespace) {
PlacementGroupID placement_group_id = PlacementGroupID::Nil();
auto it = named_placement_groups_.find(name);
if (it != named_placement_groups_.end()) {
placement_group_id = it->second;
auto namespace_it = named_placement_groups_.find(ray_namespace);
if (namespace_it != named_placement_groups_.end()) {
auto it = namespace_it->second.find(name);
if (it != namespace_it->second.end()) {
placement_group_id = it->second;
}
}
return placement_group_id;
}
Expand Down Expand Up @@ -288,7 +298,10 @@ void GcsPlacementGroupManager::HandleCreatePlacementGroup(
const ray::rpc::CreatePlacementGroupRequest &request,
ray::rpc::CreatePlacementGroupReply *reply,
ray::rpc::SendReplyCallback send_reply_callback) {
auto placement_group = std::make_shared<GcsPlacementGroup>(request);
const JobID &job_id =
JobID::FromBinary(request.placement_group_spec().creator_job_id());
auto placement_group =
std::make_shared<GcsPlacementGroup>(request, get_ray_namespace_(job_id));
RAY_LOG(DEBUG) << "Registering placement group, " << placement_group->DebugString();
RegisterPlacementGroup(placement_group, [reply, send_reply_callback,
placement_group](Status status) {
Expand Down Expand Up @@ -337,10 +350,16 @@ void GcsPlacementGroupManager::RemovePlacementGroup(

// Remove placement group from `named_placement_groups_` if its name is not empty.
if (!placement_group->GetName().empty()) {
auto it = named_placement_groups_.find(placement_group->GetName());
if (it != named_placement_groups_.end() &&
it->second == placement_group->GetPlacementGroupID()) {
named_placement_groups_.erase(it);
auto namespace_it = named_placement_groups_.find(placement_group->GetRayNamespace());
if (namespace_it != named_placement_groups_.end()) {
auto it = namespace_it->second.find(placement_group->GetName());
if (it != namespace_it->second.end() &&
it->second == placement_group->GetPlacementGroupID()) {
namespace_it->second.erase(it);
}
if (namespace_it->second.empty()) {
named_placement_groups_.erase(namespace_it);
}
}
}

Expand Down Expand Up @@ -419,7 +438,7 @@ void GcsPlacementGroupManager::HandleGetNamedPlacementGroup(
RAY_LOG(DEBUG) << "Getting named placement group info, name = " << name;

// Try to look up the placement Group ID for the named placement group.
auto placement_group_id = GetPlacementGroupIDByName(name);
auto placement_group_id = GetPlacementGroupIDByName(name, request.ray_namespace());

if (placement_group_id.IsNil()) {
// The placement group was not found.
Expand Down Expand Up @@ -611,8 +630,8 @@ void GcsPlacementGroupManager::Initialize(const GcsInitData &gcs_init_data) {
if (item.second.state() != rpc::PlacementGroupTableData::REMOVED) {
registered_placement_groups_.emplace(item.first, placement_group);
if (!placement_group->GetName().empty()) {
named_placement_groups_.emplace(placement_group->GetName(),
placement_group->GetPlacementGroupID());
named_placement_groups_[placement_group->GetRayNamespace()].emplace(
placement_group->GetName(), placement_group->GetPlacementGroupID());
}

if (item.second.state() == rpc::PlacementGroupTableData::PENDING ||
Expand All @@ -639,6 +658,10 @@ void GcsPlacementGroupManager::Initialize(const GcsInitData &gcs_init_data) {
}

std::string GcsPlacementGroupManager::DebugString() const {
uint64_t num_pgs = 0;
for (auto it : named_placement_groups_) {
num_pgs += it.second.size();
}
std::ostringstream stream;
stream << "GcsPlacementGroupManager: {CreatePlacementGroup request count: "
<< counts_[CountType::CREATE_PLACEMENT_GROUP_REQUEST]
Expand All @@ -651,7 +674,7 @@ std::string GcsPlacementGroupManager::DebugString() const {
<< ", WaitPlacementGroupUntilReady request count: "
<< counts_[CountType::WAIT_PLACEMENT_GROUP_UNTIL_READY_REQUEST]
<< ", Registered placement groups count: " << registered_placement_groups_.size()
<< ", Named placement group count: " << named_placement_groups_.size()
<< ", Named placement group count: " << num_pgs
<< ", Pending placement groups count: " << pending_placement_groups_.size()
<< "}";
return stream.str();
Expand Down
22 changes: 17 additions & 5 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class GcsPlacementGroup {
/// Create a GcsPlacementGroup by CreatePlacementGroupRequest.
///
/// \param request Contains the placement group creation task specification.
explicit GcsPlacementGroup(const ray::rpc::CreatePlacementGroupRequest &request) {
explicit GcsPlacementGroup(const ray::rpc::CreatePlacementGroupRequest &request,
std::string ray_namespace) {
const auto &placement_group_spec = request.placement_group_spec();
placement_group_table_data_.set_placement_group_id(
placement_group_spec.placement_group_id());
Expand All @@ -63,6 +64,7 @@ class GcsPlacementGroup {
placement_group_table_data_.set_creator_actor_dead(
placement_group_spec.creator_actor_dead());
placement_group_table_data_.set_is_detached(placement_group_spec.is_detached());
placement_group_table_data_.set_ray_namespace(ray_namespace);
}

/// Get the immutable PlacementGroupTableData of this placement group.
Expand All @@ -83,6 +85,9 @@ class GcsPlacementGroup {
/// Get the name of this placement_group.
std::string GetName() const;

/// Get the name of this placement_group.
std::string GetRayNamespace() const;

/// Get the bundles of this placement_group (including unplaced).
std::vector<std::shared_ptr<BundleSpecification>> GetBundles() const;

Expand Down Expand Up @@ -140,7 +145,8 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
instrumented_io_context &io_context,
std::shared_ptr<GcsPlacementGroupSchedulerInterface> scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
GcsResourceManager &gcs_resource_manager);
GcsResourceManager &gcs_resource_manager,
std::function<std::string(const JobID &)> get_ray_namespace);

~GcsPlacementGroupManager() = default;

Expand Down Expand Up @@ -194,7 +200,8 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// \param name The name of the placement_group to look up.
/// \returns PlacementGroupID The ID of the placement_group. Nil if the
/// placement_group was not found.
PlacementGroupID GetPlacementGroupIDByName(const std::string &name);
PlacementGroupID GetPlacementGroupIDByName(const std::string &name,
const std::string &ray_namespace);

/// Handle placement_group creation task failure. This should be called when scheduling
/// an placement_group creation task is infeasible.
Expand Down Expand Up @@ -329,8 +336,13 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// Reference of GcsResourceManager.
GcsResourceManager &gcs_resource_manager_;

/// Maps placement group names to their placement group ID for lookups by name.
absl::flat_hash_map<std::string, PlacementGroupID> named_placement_groups_;
/// Get ray namespace.
std::function<std::string(const JobID &)> get_ray_namespace_;

/// Maps placement group names to their placement group ID for lookups by
/// name, first keyed by namespace.
absl::flat_hash_map<std::string, absl::flat_hash_map<std::string, PlacementGroupID>>
named_placement_groups_;

// Debug info.
enum CountType {
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) {
*gcs_resource_scheduler_, raylet_client_pool_);

gcs_placement_group_manager_ = std::make_shared<GcsPlacementGroupManager>(
main_service_, scheduler, gcs_table_storage_, *gcs_resource_manager_);
main_service_, scheduler, gcs_table_storage_, *gcs_resource_manager_,
[this](const JobID &job_id) { return gcs_job_manager_->GetRayNamespace(job_id); });
// Initialize by gcs tables data.
gcs_placement_group_manager_->Initialize(gcs_init_data);
// Register service.
Expand Down
Loading

0 comments on commit de0f856

Please sign in to comment.