Skip to content

Commit

Permalink
[core] Don't drop rpc status in favor of reply status (ray-project#35530
Browse files Browse the repository at this point in the history
)

When an RPC returns, it returns a status, and a reply. Usually, the reply also contains a status.

Currently, if the status is not a GRPC error, we only propagate reply.status to the response callback. So, it is possible for the RPC status to be a non-GRPC error, while reply.status is OK. In this case, the RPC status is completely dropped, and the call is treated as successful.

Instead, we should:
1. Propagate the RPC status to the response callback if it is not ok.
2. If it is okay, propagate the reply status.

In this way, RPC status obscures reply status. Ideally we should come up with some way of setting status that doesn't allow one error to cover the other.
  • Loading branch information
vitsai committed May 23, 2023
1 parent 86fab17 commit 26cae68
Showing 1 changed file with 79 additions and 74 deletions.
153 changes: 79 additions & 74 deletions src/ray/rpc/gcs_server/gcs_rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,80 +90,85 @@ class Executor {
///
/// Currently, SyncMETHOD will copy the reply additionally.
/// TODO(sang): Fix it.
#define VOID_GCS_RPC_CLIENT_METHOD( \
SERVICE, METHOD, grpc_client, method_timeout_ms, SPECS) \
void METHOD(const METHOD##Request &request, \
const ClientCallback<METHOD##Reply> &callback, \
const int64_t timeout_ms = method_timeout_ms) SPECS { \
auto executor = new Executor(this, [callback](const ray::Status &status) { \
callback(status, METHOD##Reply()); \
}); \
auto operation_callback = [this, request, callback, executor, timeout_ms]( \
const ray::Status &status, \
const METHOD##Reply &reply) { \
if (status.IsTimedOut()) { \
callback(status, reply); \
delete executor; \
} else if (!status.IsGrpcError()) { \
auto status = \
reply.status().code() == (int)StatusCode::OK \
? Status() \
: Status(StatusCode(reply.status().code()), reply.status().message()); \
callback(status, reply); \
delete executor; \
} else { \
/* In case of GCS failure, we queue the request and these requets will be */ \
/* executed once GCS is back. */ \
gcs_is_down_ = true; \
auto request_bytes = request.ByteSizeLong(); \
if (pending_requests_bytes_ + request_bytes > \
::RayConfig::instance().gcs_grpc_max_request_queued_max_bytes()) { \
RAY_LOG(WARNING) << "Pending queue for failed GCS request has reached the " \
<< "limit. Blocking the current thread until GCS is back"; \
while (gcs_is_down_ && !shutdown_) { \
CheckChannelStatus(false); \
std::this_thread::sleep_for(std::chrono::milliseconds( \
::RayConfig::instance() \
.gcs_client_check_connection_status_interval_milliseconds())); \
} \
if (shutdown_) { \
callback(Status::Disconnected("GCS client has been disconnected."), reply); \
delete executor; \
} else { \
executor->Retry(); \
} \
} else { \
pending_requests_bytes_ += request_bytes; \
auto timeout = timeout_ms == -1 \
? absl::InfiniteFuture() \
: absl::Now() + absl::Milliseconds(timeout_ms); \
pending_requests_.emplace(timeout, std::make_pair(executor, request_bytes)); \
} \
} \
}; \
auto operation = \
[request, operation_callback, timeout_ms](GcsRpcClient *gcs_rpc_client) { \
RAY_UNUSED(INVOKE_RPC_CALL(SERVICE, \
METHOD, \
request, \
operation_callback, \
gcs_rpc_client->grpc_client, \
timeout_ms)); \
}; \
executor->Execute(std::move(operation)); \
} \
ray::Status Sync##METHOD(const METHOD##Request &request, \
METHOD##Reply *reply_in, \
const int64_t timeout_ms = method_timeout_ms) { \
std::promise<Status> promise; \
METHOD( \
request, \
[&promise, reply_in](const Status &status, const METHOD##Reply &reply) { \
reply_in->CopyFrom(reply); \
promise.set_value(status); \
}, \
timeout_ms); \
return promise.get_future().get(); \
#define VOID_GCS_RPC_CLIENT_METHOD( \
SERVICE, METHOD, grpc_client, method_timeout_ms, SPECS) \
void METHOD(const METHOD##Request &request, \
const ClientCallback<METHOD##Reply> &callback, \
const int64_t timeout_ms = method_timeout_ms) SPECS { \
auto executor = new Executor(this, [callback](const ray::Status &status) { \
callback(status, METHOD##Reply()); \
}); \
auto operation_callback = [this, request, callback, executor, timeout_ms]( \
const ray::Status &status, \
const METHOD##Reply &reply) { \
if (status.IsTimedOut()) { \
callback(status, reply); \
delete executor; \
} else if (!status.IsGrpcError()) { \
/* We prioritize RPC status over reply.status when propagating. */ \
if (!status.ok()) { \
callback(status, reply); \
} else { \
auto st = \
reply.status().code() == (int)StatusCode::OK \
? Status() \
: Status(StatusCode(reply.status().code()), reply.status().message()); \
callback(st, reply); \
} \
delete executor; \
} else { \
/* In case of GCS failure, we queue the request and these requets will be */ \
/* executed once GCS is back. */ \
gcs_is_down_ = true; \
auto request_bytes = request.ByteSizeLong(); \
if (pending_requests_bytes_ + request_bytes > \
::RayConfig::instance().gcs_grpc_max_request_queued_max_bytes()) { \
RAY_LOG(WARNING) << "Pending queue for failed GCS request has reached the " \
<< "limit. Blocking the current thread until GCS is back"; \
while (gcs_is_down_ && !shutdown_) { \
CheckChannelStatus(false); \
std::this_thread::sleep_for(std::chrono::milliseconds( \
::RayConfig::instance() \
.gcs_client_check_connection_status_interval_milliseconds())); \
} \
if (shutdown_) { \
callback(Status::Disconnected("GCS client has been disconnected."), reply); \
delete executor; \
} else { \
executor->Retry(); \
} \
} else { \
pending_requests_bytes_ += request_bytes; \
auto timeout = timeout_ms == -1 \
? absl::InfiniteFuture() \
: absl::Now() + absl::Milliseconds(timeout_ms); \
pending_requests_.emplace(timeout, std::make_pair(executor, request_bytes)); \
} \
} \
}; \
auto operation = \
[request, operation_callback, timeout_ms](GcsRpcClient *gcs_rpc_client) { \
RAY_UNUSED(INVOKE_RPC_CALL(SERVICE, \
METHOD, \
request, \
operation_callback, \
gcs_rpc_client->grpc_client, \
timeout_ms)); \
}; \
executor->Execute(std::move(operation)); \
} \
ray::Status Sync##METHOD(const METHOD##Request &request, \
METHOD##Reply *reply_in, \
const int64_t timeout_ms = method_timeout_ms) { \
std::promise<Status> promise; \
METHOD( \
request, \
[&promise, reply_in](const Status &status, const METHOD##Reply &reply) { \
reply_in->CopyFrom(reply); \
promise.set_value(status); \
}, \
timeout_ms); \
return promise.get_future().get(); \
}

/// Client used for communicating with gcs server.
Expand Down

0 comments on commit 26cae68

Please sign in to comment.