Skip to content

Commit

Permalink
Revert "[Core] Fix the race condition where grpc requests are handled…
Browse files Browse the repository at this point in the history
… while core worker not yet initialized (ray-project#37117)" (ray-project#37343)

This reverts commit 5e8fd37.
  • Loading branch information
scv119 committed Jul 12, 2023
1 parent 0d38248 commit 59a15fc
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 42 deletions.
1 change: 0 additions & 1 deletion BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,6 @@ cc_library(
"//src/ray/protobuf:worker_cc_proto",
"@boost//:circular_buffer",
"@boost//:fiber",
"@com_google_absl//absl/cleanup:cleanup",
"@com_google_absl//absl/container:btree",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
Expand Down
7 changes: 0 additions & 7 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

#include <google/protobuf/util/json_util.h>

#include "absl/cleanup/cleanup.h"
#include "absl/strings/str_format.h"
#include "boost/fiber/all.hpp"
#include "ray/common/bundle_spec.h"
Expand Down Expand Up @@ -121,12 +120,6 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
task_execution_service_work_(task_execution_service_),
exiting_detail_(std::nullopt),
pid_(getpid()) {
// Notify that core worker is initialized.
auto initialzed_scope_guard = absl::MakeCleanup([this] {
absl::MutexLock lock(&initialize_mutex_);
initialized_ = true;
intialize_cv_.SignalAll();
});
RAY_LOG(DEBUG) << "Constructing CoreWorker, worker_id: " << worker_id;

// Initialize task receivers.
Expand Down
13 changes: 0 additions & 13 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -1520,14 +1520,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
}
}

/// Wait until the worker is initialized.
void WaitUntilInitialized() override {
absl::MutexLock lock(&initialize_mutex_);
while (!initialized_) {
intialize_cv_.WaitWithTimeout(&initialize_mutex_, absl::Seconds(1));
}
}

const CoreWorkerOptions options_;

/// Callback to get the current language (e.g., Python) call site.
Expand Down Expand Up @@ -1556,11 +1548,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {

std::string main_thread_task_name_ GUARDED_BY(mutex_);

/// States that used for initialization.
absl::Mutex initialize_mutex_;
absl::CondVar intialize_cv_;
bool initialized_ GUARDED_BY(initialize_mutex_) = false;

/// Event loop where the IO events are handled. e.g. async GCS operations.
instrumented_io_context io_service_;

Expand Down
18 changes: 1 addition & 17 deletions src/ray/rpc/server_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,6 @@ enum class ServerCallState {

class ServerCallFactory;

/// Represents a service handler that might not
/// be ready to serve RPCs immediately after construction.
class DelayedServiceHandler {
public:
virtual ~DelayedServiceHandler() = default;

/// Blocks until the service is ready to serve RPCs.
virtual void WaitUntilInitialized() = 0;
};

/// Represents an incoming request of a gRPC server.
///
/// The lifecycle and state transition of a `ServerCall` is as follows:
Expand Down Expand Up @@ -158,17 +148,14 @@ class ServerCallImpl : public ServerCall {
/// \param[in] io_service The event loop.
/// \param[in] call_name The name of the RPC call.
/// \param[in] record_metrics If true, it records and exports the gRPC server metrics.
/// \param[in] preprocess_function If not nullptr, it will be called before handling
/// request.
ServerCallImpl(
const ServerCallFactory &factory,
ServiceHandler &service_handler,
HandleRequestFunction<ServiceHandler, Request, Reply> handle_request_function,
instrumented_io_context &io_service,
std::string call_name,
const ClusterID &cluster_id,
bool record_metrics,
std::function<void()> preprocess_function = nullptr)
bool record_metrics)
: state_(ServerCallState::PENDING),
factory_(factory),
service_handler_(service_handler),
Expand Down Expand Up @@ -209,9 +196,6 @@ class ServerCallImpl : public ServerCall {
}

void HandleRequestImpl() {
if constexpr (std::is_base_of_v<DelayedServiceHandler, ServiceHandler>) {
service_handler_.WaitUntilInitialized();
}
state_ = ServerCallState::PROCESSING;
// NOTE(hchen): This `factory` local variable is needed. Because `SendReply` runs in
// a different thread, and will cause `this` to be deleted.
Expand Down
6 changes: 2 additions & 4 deletions src/ray/rpc/worker/core_worker_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,9 @@ namespace rpc {
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(NumPendingTasks)

/// Interface of the `CoreWorkerServiceHandler`, see `src/ray/protobuf/core_worker.proto`.
class CoreWorkerServiceHandler : public DelayedServiceHandler {
class CoreWorkerServiceHandler {
public:
/// Blocks until the service is ready to serve RPCs.
virtual void WaitUntilInitialized() = 0;

virtual ~CoreWorkerServiceHandler() {}
/// Handlers. For all of the following handlers, the implementations can
/// handle the request asynchronously. When handling is done, the
/// `send_reply_callback` should be called. See
Expand Down

0 comments on commit 59a15fc

Please sign in to comment.