forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
/
grpc_server.cc
203 lines (184 loc) · 8.13 KB
/
grpc_server.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/rpc/grpc_server.h"
#include <grpcpp/impl/service_type.h>
#include <boost/asio/detail/socket_holder.hpp>
#include "ray/common/ray_config.h"
#include "ray/rpc/common.h"
#include "ray/rpc/grpc_server.h"
#include "ray/stats/metric.h"
#include "ray/util/util.h"
DEFINE_stats(grpc_server_req_latency_ms, "Request latency in grpc server", ("Method"), (),
ray::stats::GAUGE);
DEFINE_stats(grpc_server_req_new, "New request number in grpc server", ("Method"), (),
ray::stats::COUNT);
DEFINE_stats(grpc_server_req_handling, "Request number are handling in grpc server",
("Method"), (), ray::stats::COUNT);
DEFINE_stats(grpc_server_req_finished, "Finished request number in grpc server",
("Method"), (), ray::stats::COUNT);
namespace ray {
namespace rpc {
GrpcServer::GrpcServer(std::string name, const uint32_t port,
bool listen_to_localhost_only, int num_threads,
int64_t keepalive_time_ms)
: name_(std::move(name)),
port_(port),
listen_to_localhost_only_(listen_to_localhost_only),
is_closed_(true),
num_threads_(num_threads),
keepalive_time_ms_(keepalive_time_ms) {
cqs_.resize(num_threads_);
}
void GrpcServer::Run() {
uint32_t specified_port = port_;
std::string server_address((listen_to_localhost_only_ ? "127.0.0.1:" : "0.0.0.0:") +
std::to_string(port_));
grpc::ServerBuilder builder;
// Disable the SO_REUSEPORT option. We don't need it in ray. If the option is enabled
// (default behavior in grpc), we may see multiple workers listen on the same port and
// the requests sent to this port may be handled by any of the workers.
builder.AddChannelArgument(GRPC_ARG_ALLOW_REUSEPORT, 0);
builder.AddChannelArgument(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH,
RayConfig::instance().max_grpc_message_size());
builder.AddChannelArgument(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH,
RayConfig::instance().max_grpc_message_size());
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, keepalive_time_ms_);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS,
RayConfig::instance().grpc_keepalive_timeout_ms());
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 0);
if (RayConfig::instance().USE_TLS()) {
// Create credentials from locations specified in config
std::string rootcert = ReadCert(RayConfig::instance().TLS_CA_CERT());
std::string servercert = ReadCert(RayConfig::instance().TLS_SERVER_CERT());
std::string serverkey = ReadCert(RayConfig::instance().TLS_SERVER_KEY());
grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {serverkey, servercert};
grpc::SslServerCredentialsOptions ssl_opts(
GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY);
ssl_opts.pem_root_certs = rootcert;
ssl_opts.pem_key_cert_pairs.push_back(pkcp);
// Create server credentials
std::shared_ptr<grpc::ServerCredentials> server_creds;
server_creds = grpc::SslServerCredentials(ssl_opts);
builder.AddListeningPort(server_address, server_creds, &port_);
} else {
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(), &port_);
}
// Register all the services to this server.
if (services_.empty()) {
RAY_LOG(WARNING) << "No service is found when start grpc server " << name_;
}
for (auto &entry : services_) {
builder.RegisterService(&entry.get());
}
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
for (int i = 0; i < num_threads_; i++) {
cqs_[i] = builder.AddCompletionQueue();
}
// Build and start server.
server_ = builder.BuildAndStart();
RAY_CHECK(server_)
<< "Failed to start the grpc server. The specified port is " << specified_port
<< ". This means that Ray's core components will not be able to function "
<< "correctly. If the server startup error message is `Address already in use`, "
<< "it indicates the server fails to start because the port is already used by "
<< "other processes (such as --node-manager-port, --object-manager-port, "
<< "--gcs-server-port, and ports between --min-worker-port, --max-worker-port). "
<< "Try running lsof -i :" << specified_port
<< " to check if there are other processes listening to the port.";
RAY_CHECK(port_ > 0);
RAY_LOG(INFO) << name_ << " server started, listening on port " << port_ << ".";
// Create calls for all the server call factories.
for (auto &entry : server_call_factories_) {
for (int i = 0; i < num_threads_; i++) {
// Create a buffer of 100 calls for each RPC handler.
// TODO(edoakes): a small buffer should be fine and seems to have better
// performance, but we don't currently handle backpressure on the client.
int buffer_size = 100;
if (entry->GetMaxActiveRPCs() != -1) {
buffer_size = entry->GetMaxActiveRPCs();
}
for (int j = 0; j < buffer_size; j++) {
entry->CreateCall();
}
}
}
// Start threads that polls incoming requests.
for (int i = 0; i < num_threads_; i++) {
polling_threads_.emplace_back(&GrpcServer::PollEventsFromCompletionQueue, this, i);
}
// Set the server as running.
is_closed_ = false;
}
void GrpcServer::RegisterService(GrpcService &service) {
services_.emplace_back(service.GetGrpcService());
for (int i = 0; i < num_threads_; i++) {
service.InitServerCallFactories(cqs_[i], &server_call_factories_);
}
}
void GrpcServer::PollEventsFromCompletionQueue(int index) {
SetThreadName("server.poll" + std::to_string(index));
void *tag;
bool ok;
// Keep reading events from the `CompletionQueue` until it's shutdown.
while (cqs_[index]->Next(&tag, &ok)) {
auto *server_call = static_cast<ServerCall *>(tag);
bool delete_call = false;
// A new call is needed after the server sends a reply, no matter the reply is
// successful or failed.
bool need_new_call = false;
if (ok) {
switch (server_call->GetState()) {
case ServerCallState::PENDING:
// We've received a new incoming request. Now this call object is used to
// track this request.
server_call->SetState(ServerCallState::PROCESSING);
server_call->HandleRequest();
break;
case ServerCallState::SENDING_REPLY:
// GRPC has sent reply successfully, invoking the callback.
server_call->OnReplySent();
// The rpc call has finished and can be deleted now.
delete_call = true;
// A new call should be suplied.
need_new_call = true;
break;
default:
RAY_LOG(FATAL) << "Shouldn't reach here.";
break;
}
} else {
// `ok == false` will occur in two situations:
// First, server has sent reply to client and failed, the server call's status is
// SENDING_REPLY.
if (server_call->GetState() == ServerCallState::SENDING_REPLY) {
server_call->OnReplyFailed();
// A new call should be suplied.
need_new_call = true;
}
// Second, the server has been shut down, the server call's status is PENDING.
// And don't need to do anything other than deleting this call.
delete_call = true;
}
if (delete_call) {
if (need_new_call && server_call->GetServerCallFactory().GetMaxActiveRPCs() != -1) {
// Create a new `ServerCall` to accept the next incoming request.
server_call->GetServerCallFactory().CreateCall();
}
delete server_call;
}
}
}
} // namespace rpc
} // namespace ray