Skip to content

Commit

Permalink
[core] Remove gcs ray syncer. (ray-project#38489)
Browse files Browse the repository at this point in the history
This PR remove the GCS Ray Syncer given the new ray syncer has been turn on by default for a while.
  • Loading branch information
fishbone committed Aug 17, 2023
1 parent c6ad28a commit 4d86646
Show file tree
Hide file tree
Showing 18 changed files with 77 additions and 516 deletions.
2 changes: 0 additions & 2 deletions python/ray/includes/ray_config.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ cdef extern from "ray/common/ray_config.h" nogil:

c_bool start_python_importer_thread() const

c_bool use_ray_syncer() const

c_string REDIS_CA_CERT() const

c_string REDIS_CA_PATH() const
Expand Down
4 changes: 0 additions & 4 deletions python/ray/includes/ray_config.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,6 @@ cdef class Config:
def start_python_importer_thread():
return RayConfig.instance().start_python_importer_thread()

@staticmethod
def use_ray_syncer():
return RayConfig.instance().use_ray_syncer()

@staticmethod
def REDIS_CA_CERT():
return RayConfig.instance().REDIS_CA_CERT()
Expand Down
10 changes: 0 additions & 10 deletions python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1237,16 +1237,6 @@ def set_runtime_env_plugin_schemas(request):
del os.environ["RAY_RUNTIME_ENV_PLUGIN_SCHEMAS"]


@pytest.fixture(params=[True, False])
def enable_syncer_test(request, monkeypatch):
with_syncer = request.param
monkeypatch.setenv("RAY_use_ray_syncer", "true" if with_syncer else "false")
ray._raylet.Config.initialize("")
yield
monkeypatch.delenv("RAY_use_ray_syncer")
ray._raylet.Config.initialize("")


@pytest.fixture(scope="function")
def temp_file(request):
with tempfile.NamedTemporaryFile("r+b") as fp:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_actor_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def get_location_and_ids(self):
assert ready_ids == []


def test_actors_and_tasks_with_gpus(enable_syncer_test, ray_start_cluster):
def test_actors_and_tasks_with_gpus(ray_start_cluster):
cluster = ray_start_cluster
num_nodes = 3
num_gpus_per_raylet = 2
Expand Down
3 changes: 0 additions & 3 deletions python/ray/tests/test_metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@
"ray_gcs_actors_count",
]

if not ray._raylet.Config.use_ray_syncer():
_METRICS.append("ray_outbound_heartbeat_size_kb_sum")

# This list of metrics should be kept in sync with
# ray/python/ray/autoscaler/_private/prom_metrics.py
_AUTOSCALER_METRICS = [
Expand Down
4 changes: 1 addition & 3 deletions python/ray/tests/test_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,7 @@ def foo():
],
indirect=True,
)
def test_autoscaler_warn_deadlock(
enable_syncer_test, ray_start_cluster_head_with_env_vars
):
def test_autoscaler_warn_deadlock(ray_start_cluster_head_with_env_vars):
script = """
import ray
import time
Expand Down
12 changes: 0 additions & 12 deletions python/ray/tests/test_placement_group_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,18 +458,6 @@ def f():
assert len(gpu_ids_res) == 2


@pytest.mark.parametrize(
"ray_start_cluster",
[
generate_system_config_map(
use_ray_syncer=True,
),
generate_system_config_map(
use_ray_syncer=False,
),
],
indirect=True,
)
@pytest.mark.repeat(3)
def test_actor_scheduling_not_block_with_placement_group(ray_start_cluster):
"""Tests the scheduling of lots of actors will not be blocked
Expand Down
2 changes: 0 additions & 2 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,6 @@ RAY_CONFIG(uint64_t, gcs_grpc_max_request_queued_max_bytes, 1024UL * 1024 * 1024
/// The duration between two checks for grpc status.
RAY_CONFIG(int32_t, gcs_client_check_connection_status_interval_milliseconds, 1000)

/// Feature flag to use the ray syncer for resource synchronization
RAY_CONFIG(bool, use_ray_syncer, true)
/// Due to the protocol drawback, raylet needs to refresh the message if
/// no message is received for a while.
/// Refer to https://tinyurl.com/n6kvsp87 for more details
Expand Down
29 changes: 4 additions & 25 deletions src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler(
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
const gcs::GcsNodeManager &gcs_node_manager,
ClusterResourceScheduler &cluster_resource_scheduler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
gcs_syncer::RaySyncer *ray_syncer)
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool)
: return_timer_(io_context),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_node_manager_(gcs_node_manager),
cluster_resource_scheduler_(cluster_resource_scheduler),
raylet_client_pool_(raylet_client_pool),
ray_syncer_(ray_syncer) {}
raylet_client_pool_(raylet_client_pool) {}

void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
std::shared_ptr<GcsPlacementGroup> placement_group,
Expand Down Expand Up @@ -243,19 +241,10 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(

return_client->CancelResourceReserve(
*bundle_spec,
[this, bundle_spec, node_id](const Status &status,
const rpc::CancelResourceReserveReply &reply) {
[bundle_spec, node_id](const Status &status,
const rpc::CancelResourceReserveReply &reply) {
RAY_LOG(DEBUG) << "Finished cancelling the resource reserved for bundle: "
<< bundle_spec->DebugString() << " at node " << node_id;
if (ray_syncer_ != nullptr) {
auto &resources = bundle_spec->GetFormattedResources();
rpc::NodeResourceChange node_resource_change;
for (const auto &iter : resources) {
node_resource_change.add_deleted_resources(iter.first);
}
node_resource_change.set_node_id(node_id.Binary());
ray_syncer_->Update(std::move(node_resource_change));
}
});
}

Expand Down Expand Up @@ -306,16 +295,6 @@ void GcsPlacementGroupScheduler::CommitAllBundles(
for (const auto &bundle : bundles_per_node) {
lease_status_tracker->MarkCommitRequestReturned(node_id, bundle, status);
(*commited_bundle_locations)[bundle->BundleId()] = {node_id, bundle};

if (ray_syncer_ != nullptr) {
auto &resources = bundle->GetFormattedResources();
// Push the message to syncer so that it can be broadcasted to all other nodes
rpc::NodeResourceChange node_resource_change;
node_resource_change.set_node_id(node_id.Binary());
node_resource_change.mutable_updated_resources()->insert(resources.begin(),
resources.end());
ray_syncer_->Update(std::move(node_resource_change));
}
}
// Commit the bundle resources on the remote node to the cluster resources.
CommitBundleResources(commited_bundle_locations);
Expand Down
8 changes: 1 addition & 7 deletions src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "ray/common/scheduling/scheduling_ids.h"
#include "ray/gcs/gcs_server/gcs_node_manager.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/gcs/gcs_server/ray_syncer.h"
#include "ray/raylet/scheduling/cluster_resource_scheduler.h"
#include "ray/raylet/scheduling/policy/scheduling_context.h"
#include "ray/raylet_client/raylet_client.h"
Expand Down Expand Up @@ -287,8 +286,7 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
const GcsNodeManager &gcs_node_manager,
ClusterResourceScheduler &cluster_resource_scheduler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
gcs_syncer::RaySyncer *ray_syncer);
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool);

virtual ~GcsPlacementGroupScheduler() = default;

Expand Down Expand Up @@ -493,10 +491,6 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// The nodes which are releasing unused bundles.
absl::flat_hash_set<NodeID> nodes_of_releasing_unused_bundles_;

/// The syncer of resource. This is used to report placement group updates.
/// TODO (iycheng): Remove this one from pg once we finish the refactor
gcs_syncer::RaySyncer *ray_syncer_;

/// The resources changed listeners.
std::vector<std::function<void()>> resources_changed_listeners_;

Expand Down
82 changes: 24 additions & 58 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,9 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
void GcsServer::Stop() {
if (!is_stopped_) {
RAY_LOG(INFO) << "Stopping GCS server.";
if (RayConfig::instance().use_ray_syncer()) {
ray_syncer_io_context_.stop();
ray_syncer_thread_->join();
ray_syncer_.reset();
} else {
gcs_ray_syncer_->Stop();
}
ray_syncer_io_context_.stop();
ray_syncer_thread_->join();
ray_syncer_.reset();

gcs_task_manager_->Stop();

Expand Down Expand Up @@ -497,8 +493,7 @@ void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) {
gcs_table_storage_,
*gcs_node_manager_,
*cluster_resource_scheduler_,
raylet_client_pool_,
gcs_ray_syncer_.get());
raylet_client_pool_);

gcs_placement_group_manager_ = std::make_shared<GcsPlacementGroupManager>(
main_service_,
Expand Down Expand Up @@ -535,30 +530,18 @@ GcsServer::StorageType GcsServer::GetStorageType() const {
}

void GcsServer::InitRaySyncer(const GcsInitData &gcs_init_data) {
if (RayConfig::instance().use_ray_syncer()) {
ray_syncer_ =
std::make_unique<syncer::RaySyncer>(ray_syncer_io_context_, kGCSNodeID.Binary());
ray_syncer_->Register(
syncer::MessageType::RESOURCE_VIEW, nullptr, gcs_resource_manager_.get());
ray_syncer_->Register(
syncer::MessageType::COMMANDS, nullptr, gcs_resource_manager_.get());
ray_syncer_thread_ = std::make_unique<std::thread>([this]() {
boost::asio::io_service::work work(ray_syncer_io_context_);
ray_syncer_io_context_.run();
});
ray_syncer_service_ = std::make_unique<syncer::RaySyncerService>(*ray_syncer_);
rpc_server_.RegisterService(*ray_syncer_service_);
} else {
/*
The current synchronization flow is:
raylet -> syncer::poller --> syncer::update -> gcs_resource_manager
gcs_placement_scheduler --/
*/
gcs_ray_syncer_ = std::make_unique<gcs_syncer::RaySyncer>(
main_service_, raylet_client_pool_, *gcs_resource_manager_);
gcs_ray_syncer_->Initialize(gcs_init_data);
gcs_ray_syncer_->Start();
}
ray_syncer_ =
std::make_unique<syncer::RaySyncer>(ray_syncer_io_context_, kGCSNodeID.Binary());
ray_syncer_->Register(
syncer::MessageType::RESOURCE_VIEW, nullptr, gcs_resource_manager_.get());
ray_syncer_->Register(
syncer::MessageType::COMMANDS, nullptr, gcs_resource_manager_.get());
ray_syncer_thread_ = std::make_unique<std::thread>([this]() {
boost::asio::io_service::work work(ray_syncer_io_context_);
ray_syncer_io_context_.run();
});
ray_syncer_service_ = std::make_unique<syncer::RaySyncerService>(*ray_syncer_);
rpc_server_.RegisterService(*ray_syncer_service_);
}

void GcsServer::InitFunctionManager() {
Expand Down Expand Up @@ -736,10 +719,6 @@ void GcsServer::InstallEventListeners() {
gcs_healthcheck_manager_->AddNode(node_id, channel);
}
cluster_task_manager_->ScheduleAndDispatchTasks();

if (!RayConfig::instance().use_ray_syncer()) {
gcs_ray_syncer_->AddNode(*node);
}
});
gcs_node_manager_->AddNodeRemovedListener(
[this](std::shared_ptr<rpc::GcsNodeInfo> node) {
Expand All @@ -753,10 +732,6 @@ void GcsServer::InstallEventListeners() {
raylet_client_pool_->Disconnect(node_id);
gcs_healthcheck_manager_->RemoveNode(node_id);
pubsub_handler_->RemoveSubscriberFrom(node_id.Binary());

if (!RayConfig::instance().use_ray_syncer()) {
gcs_ray_syncer_->RemoveNode(*node);
}
});

// Install worker event listener.
Expand Down Expand Up @@ -843,9 +818,6 @@ std::string GcsServer::GetDebugState() const {
<< gcs_publisher_->DebugString() << "\n\n"
<< runtime_env_manager_->DebugString() << "\n\n"
<< gcs_task_manager_->DebugString() << "\n\n";
if (gcs_ray_syncer_) {
stream << gcs_ray_syncer_->DebugString();
}
return stream.str();
}

Expand Down Expand Up @@ -889,20 +861,14 @@ void GcsServer::TryGlobalGC() {
rpc::ResourcesData resources_data;
resources_data.set_should_global_gc(true);

if (RayConfig::instance().use_ray_syncer()) {
auto msg = std::make_shared<syncer::RaySyncMessage>();
msg->set_version(absl::GetCurrentTimeNanos());
msg->set_node_id(kGCSNodeID.Binary());
msg->set_message_type(syncer::MessageType::COMMANDS);
std::string serialized_msg;
RAY_CHECK(resources_data.SerializeToString(&serialized_msg));
msg->set_sync_message(std::move(serialized_msg));
ray_syncer_->BroadcastRaySyncMessage(std::move(msg));
} else {
resources_data.set_node_id(kGCSNodeID.Binary());
gcs_ray_syncer_->Update(resources_data);
}

auto msg = std::make_shared<syncer::RaySyncMessage>();
msg->set_version(absl::GetCurrentTimeNanos());
msg->set_node_id(kGCSNodeID.Binary());
msg->set_message_type(syncer::MessageType::COMMANDS);
std::string serialized_msg;
RAY_CHECK(resources_data.SerializeToString(&serialized_msg));
msg->set_sync_message(std::move(serialized_msg));
ray_syncer_->BroadcastRaySyncMessage(std::move(msg));
global_gc_throttler_->RunNow();
}
}
Expand Down
6 changes: 0 additions & 6 deletions src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "ray/gcs/gcs_server/gcs_task_manager.h"
#include "ray/gcs/gcs_server/grpc_based_resource_broadcaster.h"
#include "ray/gcs/gcs_server/pubsub_handler.h"
#include "ray/gcs/gcs_server/ray_syncer.h"
#include "ray/gcs/gcs_server/runtime_env_handler.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/gcs/redis_client.h"
Expand Down Expand Up @@ -249,11 +248,6 @@ class GcsServer {
/// Monitor service for monitor server
std::unique_ptr<rpc::MonitorGrpcService> monitor_grpc_service_;

/// Synchronization service for ray.
/// TODO(iycheng): Deprecate this gcs_ray_syncer_ one once we roll out
/// to ray_syncer_.
std::unique_ptr<gcs_syncer::RaySyncer> gcs_ray_syncer_;

/// Ray Syncer realted fields.
std::unique_ptr<syncer::RaySyncer> ray_syncer_;
std::unique_ptr<syncer::RaySyncerService> ray_syncer_service_;
Expand Down
Loading

0 comments on commit 4d86646

Please sign in to comment.