Skip to content

Commit

Permalink
[core] Deflakey test_placement_group_3.py when ray syncer is turned…
Browse files Browse the repository at this point in the history
… on. (ray-project#34687)

## Why are these changes needed?
The new communication protocol will only deliver the message when necessary. But some part of ray assumes that even no new message generated, it'll still get the delivery and thus it ends up incorrect resource number.

For example, in GCS's view
- GCS's view has Node: {CPU: 1} 
- Then GCS tries to schedule a task => Node: {CPU: 0}
- Then GCS got a snapshot from that node, and that node has PG scheduled,
- Then it becomes {CPU:0, PG:1}, and the previous pg scheduling failed. So, GCS'll add it back and in the end it's {CPU:1, PG:1} which is incorrect.
- Later GCS will try to schedule task to this node and this node will reject since it doesn't have any CPU left.



This issue happens in both raylet and GCS. In raylet, distributed task scheduling can suffer from this issue. In GCS, placement group can suffer from this issue.

In test_placement_group_3.py, it's because the scheduling of a placement group bundle failed and in the same time, it get the snapshot. So in the end it has one cpu in the GCS's view. PG's scheduling algorithm is deterministic, so it'll try to reschedule the bundles infinitely.

The raylet issue was fixed in [29905](ray-project#29905). This PR move the logic from node manager to cluster resource manager.

The logic is not added into ray syncer is because it's more related to application's logical about how to handle the update.
Besides, only one module needs to update. The future receiver needs to not assume the eventually delivery of the repeat messages.
  • Loading branch information
fishbone committed Apr 26, 2023
1 parent d3b9d26 commit aba2971
Show file tree
Hide file tree
Showing 23 changed files with 259 additions and 160 deletions.
13 changes: 13 additions & 0 deletions python/ray/tests/test_placement_group_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,19 @@ def f():
assert len(gpu_ids_res) == 2


@pytest.mark.parametrize(
"ray_start_cluster",
[
generate_system_config_map(
use_ray_syncer=True,
),
generate_system_config_map(
use_ray_syncer=False,
),
],
indirect=True,
)
@pytest.mark.repeat(3)
def test_actor_scheduling_not_block_with_placement_group(ray_start_cluster):
"""Tests the scheduling of lots of actors will not be blocked
when using placement groups.
Expand Down
16 changes: 8 additions & 8 deletions src/mock/ray/gcs/gcs_server/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@

namespace ray {
namespace gcs {

static instrumented_io_context __mock_io_context_;
static ClusterResourceManager __mock_cluster_resource_manager_(__mock_io_context_);
class MockGcsResourceManager : public GcsResourceManager {
public:
using GcsResourceManager::GcsResourceManager;
explicit MockGcsResourceManager()
: GcsResourceManager(
io_context_, cluster_resource_manager_, NodeID::FromRandom(), nullptr) {}
: GcsResourceManager(__mock_io_context_,
__mock_cluster_resource_manager_,
NodeID::FromRandom(),
nullptr) {}
explicit MockGcsResourceManager(ClusterResourceManager &cluster_resource_manager)
: GcsResourceManager(
io_context_, cluster_resource_manager, NodeID::FromRandom(), nullptr) {}
__mock_io_context_, cluster_resource_manager, NodeID::FromRandom(), nullptr) {
}

MOCK_METHOD(void,
HandleGetResources,
Expand All @@ -51,10 +55,6 @@ class MockGcsResourceManager : public GcsResourceManager {
rpc::GetAllResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));

private:
instrumented_io_context io_context_;
ClusterResourceManager cluster_resource_manager_;
};

} // namespace gcs
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) {

void GcsServer::InitClusterResourceScheduler() {
cluster_resource_scheduler_ = std::make_shared<ClusterResourceScheduler>(
main_service_,
scheduling::NodeID(kGCSNodeID.Binary()),
NodeResources(),
/*is_node_available_fn=*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class GcsActorSchedulerMockTest : public Test {
[this](const rpc::Address &) { return raylet_client; });
local_node_id = NodeID::FromRandom();
auto cluster_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
io_context,
scheduling::NodeID(local_node_id.Binary()),
NodeResources(),
/*is_node_available_fn=*/
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class GcsActorSchedulerTest : public ::testing::Test {
std::make_shared<GcsServerMocker::MockedGcsActorTable>(store_client_);
local_node_id_ = NodeID::FromRandom();
auto cluster_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
io_service_,
scheduling::NodeID(local_node_id_.Binary()),
NodeResources(),
/*is_node_available_fn=*/
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/test/gcs_monitor_server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class GcsMonitorServerTest : public ::testing::Test {
public:
GcsMonitorServerTest()
: mock_node_manager_(std::make_shared<gcs::MockGcsNodeManager>()),
cluster_resource_manager_(),
cluster_resource_manager_(io_context_),
mock_resource_manager_(
std::make_shared<gcs::MockGcsResourceManager>(cluster_resource_manager_)),
mock_placement_group_manager_(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ namespace gcs {

class GcsPlacementGroupManagerMockTest : public Test {
public:
GcsPlacementGroupManagerMockTest() : cluster_resource_manager_(io_context_) {}

void SetUp() override {
store_client_ = std::make_shared<MockStoreClient>();
gcs_table_storage_ = std::make_shared<GcsTableStorage>(store_client_);
Expand All @@ -50,14 +52,14 @@ class GcsPlacementGroupManagerMockTest : public Test {
counter_.reset(new CounterMap<rpc::PlacementGroupTableData::PlacementGroupState>());
}

instrumented_io_context io_context_;
std::unique_ptr<GcsPlacementGroupManager> gcs_placement_group_manager_;
std::shared_ptr<MockGcsPlacementGroupSchedulerInterface> gcs_placement_group_scheduler_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<MockStoreClient> store_client_;
ClusterResourceManager cluster_resource_manager_;
std::shared_ptr<GcsResourceManager> resource_manager_;
std::shared_ptr<CounterMap<rpc::PlacementGroupTableData::PlacementGroupState>> counter_;
instrumented_io_context io_context_;
};

TEST_F(GcsPlacementGroupManagerMockTest, PendingQueuePriorityReschedule) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterf
class GcsPlacementGroupManagerTest : public ::testing::Test {
public:
GcsPlacementGroupManagerTest()
: mock_placement_group_scheduler_(new MockPlacementGroupScheduler()) {
: mock_placement_group_scheduler_(new MockPlacementGroupScheduler()),
cluster_resource_manager_(io_service_) {
gcs_publisher_ =
std::make_shared<GcsPublisher>(std::make_unique<ray::pubsub::MockPublisher>());
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
std::make_unique<ray::pubsub::MockPublisher>());
auto local_node_id = NodeID::FromRandom();
cluster_resource_scheduler_ = std::make_shared<ClusterResourceScheduler>(
io_service_,
scheduling::NodeID(local_node_id.Binary()),
NodeResources(),
/*is_node_available_fn=*/
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ using ::testing::_;

class GcsResourceManagerTest : public ::testing::Test {
public:
GcsResourceManagerTest() {
GcsResourceManagerTest() : cluster_resource_manager_(io_service_) {
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>(
io_service_, cluster_resource_manager_, NodeID::FromRandom());
}
Expand Down
23 changes: 1 addition & 22 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service,
CreateMemoryUsageRefreshCallback())) {
RAY_LOG(INFO) << "Initializing NodeManager with ID " << self_node_id_;
cluster_resource_scheduler_ = std::make_shared<ClusterResourceScheduler>(
io_service,
scheduling::NodeID(self_node_id_.Binary()),
config.resource_config.ToResourceMap(),
/*is_node_available_fn*/
Expand Down Expand Up @@ -373,23 +374,6 @@ NodeManager::NodeManager(instrumented_io_context &io_service,
node_manager_server_.RegisterService(node_manager_service_);
node_manager_server_.RegisterService(agent_manager_service_);
if (RayConfig::instance().use_ray_syncer()) {
periodical_runner_.RunFnPeriodically(
[this]() {
auto now = absl::Now();
auto threshold =
now - absl::Milliseconds(
RayConfig::instance().ray_syncer_message_refresh_interval_ms());
auto &resource_manager =
cluster_resource_scheduler_->GetClusterResourceManager();
for (auto &[node_id, resource] : resource_message_udpated_) {
auto modified_ts = resource_manager.GetNodeResourceModifiedTs(
scheduling::NodeID(node_id.Binary()));
if (modified_ts && *modified_ts < threshold) {
UpdateResourceUsage(node_id, resource);
}
}
},
RayConfig::instance().ray_syncer_message_refresh_interval_ms());
node_manager_server_.RegisterService(ray_syncer_service_);
}
node_manager_server_.Run();
Expand Down Expand Up @@ -1048,10 +1032,6 @@ void NodeManager::NodeRemoved(const NodeID &node_id) {
// Below, when we remove node_id from all of these data structures, we could
// check that it is actually removed, or log a warning otherwise, but that may
// not be necessary.

// Remove the messages received
resource_message_udpated_.erase(node_id);

// Remove the node from the resource map.
if (!cluster_resource_scheduler_->GetClusterResourceManager().RemoveNode(
scheduling::NodeID(node_id.Binary()))) {
Expand Down Expand Up @@ -2790,7 +2770,6 @@ void NodeManager::ConsumeSyncMessage(
}
// Message view shouldn't carry this field.
RAY_CHECK(!data.should_global_gc());
resource_message_udpated_[node_id] = std::move(data);
} else if (message->message_type() == syncer::MessageType::COMMANDS) {
rpc::ResourcesData data;
data.ParseFromString(message->sync_message());
Expand Down
3 changes: 0 additions & 3 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -827,9 +827,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// Ray syncer for synchronization
syncer::RaySyncer ray_syncer_;

/// Resource message updated
absl::flat_hash_map<NodeID, rpc::ResourcesData> resource_message_udpated_;

/// RaySyncerService for gRPC
syncer::RaySyncerService ray_syncer_service_;

Expand Down
75 changes: 53 additions & 22 deletions src/ray/raylet/placement_group_resource_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test {
void InitLocalAvailableResource(
absl::flat_hash_map<std::string, double> &unit_resource) {
cluster_resource_scheduler_ = std::make_shared<ClusterResourceScheduler>(
scheduling::NodeID("local"), unit_resource, is_node_available_fn_);
io_context, scheduling::NodeID("local"), unit_resource, is_node_available_fn_);
new_placement_group_resource_manager_ =
std::make_unique<raylet::NewPlacementGroupResourceManager>(
cluster_resource_scheduler_);
Expand All @@ -73,6 +73,7 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test {
std::make_shared<const BundleSpecification>(std::move(bundle_spec)));
return bundle_specs;
}
instrumented_io_context io_context;
};

TEST_F(NewPlacementGroupResourceManagerTest,
Expand Down Expand Up @@ -186,8 +187,11 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) {
{"CPU", 1.0},
{"bundle_group_1_" + group_id.Hex(), 1000},
{"bundle_group_" + group_id.Hex(), 1000}};
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
scheduling::NodeID("remaining"), remaining_resources, is_node_available_fn_);
auto remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>(io_context,
scheduling::NodeID("remaining"),
remaining_resources,
is_node_available_fn_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(
Expand Down Expand Up @@ -216,7 +220,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) {
new_placement_group_resource_manager_->ReturnBundle(bundle_spec);
/// 5. check remaining resources is correct.
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
scheduling::NodeID("remaining"), unit_resource, is_node_available_fn_);
io_context, scheduling::NodeID("remaining"), unit_resource, is_node_available_fn_);
auto remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
scheduling::NodeID("remaining"));
Expand Down Expand Up @@ -252,8 +256,11 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu
{"bundle_group_1_" + group_id.Hex(), 1000},
{"bundle_group_2_" + group_id.Hex(), 1000},
{"bundle_group_" + group_id.Hex(), 2000}};
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
scheduling::NodeID("remaining"), remaining_resources, is_node_available_fn_);
auto remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>(io_context,
scheduling::NodeID("remaining"),
remaining_resources,
is_node_available_fn_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(
Expand All @@ -272,8 +279,11 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu
{"CPU", 2.0},
{"bundle_group_1_" + group_id.Hex(), 1000},
{"bundle_group_" + group_id.Hex(), 2000}};
remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
scheduling::NodeID("remaining"), remaining_resources, is_node_available_fn_);
remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>(io_context,
scheduling::NodeID("remaining"),
remaining_resources,
is_node_available_fn_);
ASSERT_TRUE(
remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources(
{{"CPU_group_" + group_id.Hex(), 1.0},
Expand All @@ -288,8 +298,11 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu
new_placement_group_resource_manager_->ReturnBundle(first_bundle_spec);
/// 8. check remaining resources is correct after all bundle returned.
remaining_resources = {{"CPU", 2.0}};
remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
scheduling::NodeID("remaining"), remaining_resources, is_node_available_fn_);
remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>(io_context,
scheduling::NodeID("remaining"),
remaining_resources,
is_node_available_fn_);
remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
scheduling::NodeID("remaining"));
Expand All @@ -312,8 +325,11 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare)
}
/// 4. check remaining resources is correct.
absl::flat_hash_map<std::string, double> remaining_resources = {{"CPU", 3.0}};
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
scheduling::NodeID("remaining"), remaining_resources, is_node_available_fn_);
auto remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>(io_context,
scheduling::NodeID("remaining"),
remaining_resources,
is_node_available_fn_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(
Expand Down Expand Up @@ -349,8 +365,11 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder)
{"CPU", 3.0},
{"bundle_group_1_" + group_id.Hex(), 1000},
{"bundle_group_" + group_id.Hex(), 1000}};
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
scheduling::NodeID("remaining"), remaining_resources, is_node_available_fn_);
auto remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>(io_context,
scheduling::NodeID("remaining"),
remaining_resources,
is_node_available_fn_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(
Expand Down Expand Up @@ -378,8 +397,11 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder)
new_placement_group_resource_manager_->CommitBundles(
ConvertSingleSpecToVectorPtrs(bundle_spec));
// 8. check remaining resources is correct.
remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
scheduling::NodeID("remaining"), available_resource, is_node_available_fn_);
remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>(io_context,
scheduling::NodeID("remaining"),
available_resource,
is_node_available_fn_);
remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
scheduling::NodeID("remaining"));
Expand All @@ -402,8 +424,11 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) {
ASSERT_FALSE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs));
// 4. check remaining resources is correct.
absl::flat_hash_map<std::string, double> remaining_resources = {{"CPU", 3.0}};
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
scheduling::NodeID("remaining"), remaining_resources, is_node_available_fn_);
auto remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>(io_context,
scheduling::NodeID("remaining"),
remaining_resources,
is_node_available_fn_);
auto remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
scheduling::NodeID("remaining"));
Expand All @@ -428,8 +453,11 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) {
{"bundle_group_3_" + group_id.Hex(), 1000},
{"bundle_group_4_" + group_id.Hex(), 1000},
{"bundle_group_" + group_id.Hex(), 4000}};
remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
scheduling::NodeID("remaining"), remaining_resources, is_node_available_fn_);
remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>(io_context,
scheduling::NodeID("remaining"),
remaining_resources,
is_node_available_fn_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
absl::flat_hash_map<std::string, double> allocating_resource;
Expand Down Expand Up @@ -474,8 +502,11 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestCommiteResourceBatched) {
{"bundle_group_3_" + group_id.Hex(), 1000},
{"bundle_group_4_" + group_id.Hex(), 1000},
{"bundle_group_" + group_id.Hex(), 4000}};
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
scheduling::NodeID("remaining"), remaining_resources, is_node_available_fn_);
auto remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>(io_context,
scheduling::NodeID("remaining"),
remaining_resources,
is_node_available_fn_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
absl::flat_hash_map<std::string, double> allocating_resource;
Expand Down
Loading

0 comments on commit aba2971

Please sign in to comment.