Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove grpcio from minimal dependency #38243

Merged
merged 25 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
564ab08
make grpc optional to tls_utils
rynewang Jul 20, 2023
ad5dbb9
remove grpc minimal and add empty space
rynewang Aug 9, 2023
6b518c5
make client server optional
rynewang Aug 9, 2023
630bf27
remove leftover grpc references
rynewang Aug 9, 2023
deb92f2
add get_all_resource_usage to GcsClient; make grpc server in dash age…
rynewang Aug 9, 2023
d49be17
remove unused imports
rynewang Aug 9, 2023
bedca2b
remove accidental deps on grpc
rynewang Aug 9, 2023
8fad1df
fix the assumption that we have grpc installed in minimal
rynewang Aug 9, 2023
9c1241a
move tune experiment grpc dep
rynewang Aug 9, 2023
efd0f57
typo
rynewang Aug 9, 2023
f724a15
skip ray client tests in minimal
rynewang Aug 9, 2023
aea77ca
Merge branch 'master' into remove-grpc
rynewang Aug 9, 2023
bffe8c5
rename DrainNode to DrainNodes to disambiguate, and remove incompatib…
rynewang Aug 10, 2023
d8f0e63
remove ray client in ray minimal tests
rynewang Aug 10, 2023
0d7db9c
Merge remote-tracking branch 'origin/master' into remove-grpc
rynewang Aug 10, 2023
f42e937
fix merge bad
rynewang Aug 10, 2023
6a44535
lint
rynewang Aug 10, 2023
51c4d14
Merge branch 'master' into remove-grpc
rynewang Aug 10, 2023
85c8167
remove redaundant error print
rynewang Aug 10, 2023
daf119e
Merge remote-tracking branch 'origin/master' into remove-grpc
rynewang Aug 10, 2023
67e59a4
revert
rynewang Aug 10, 2023
24d0cc1
added error message
rynewang Aug 12, 2023
fcf6be0
Merge remote-tracking branch 'origin/master' into remove-grpc
rynewang Aug 14, 2023
ea58f05
check ray_client_server_port vs ray[client] configs
rynewang Aug 14, 2023
2048cb0
add default port in scripts, not in parameters which messes up with u…
rynewang Aug 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
make grpc optional to tls_utils
add grpc deps to default and remove from minimal

add DrainNode to python gcs_client, and remove grpc from autoscaler.py

Signed-off-by: Ruiyang Wang <[email protected]>
  • Loading branch information
rynewang committed Aug 9, 2023
commit 564ab089d1cb983e81c158ced3611c25d713a205
1 change: 1 addition & 0 deletions dashboard/optional_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
from aiohttp.typedefs import PathLike # noqa: F401
from aiohttp.web import RouteDef # noqa: F401
import pydantic # noqa: F401
import grpc # noqa: F401
3 changes: 1 addition & 2 deletions python/ray/_private/tls_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
import os
import socket

import grpc


def generate_self_signed_tls_certs():
"""Create self-signed key/cert pair for testing.
Expand Down Expand Up @@ -68,6 +66,7 @@ def generate_self_signed_tls_certs():


def add_port_to_grpc_server(server, address):
import grpc
if os.environ.get("RAY_USE_TLS", "0").lower() in ("1", "true"):
server_cert_chain, private_key, ca_cert = load_certs_from_env()
credentials = grpc.ssl_server_credentials(
Expand Down
16 changes: 16 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2421,6 +2421,22 @@ cdef class GcsClient:

return num_added

@_auto_reconnect
def drain_node(self, node_ids, timeout=None):
cdef:
c_vector[c_string] c_node_ids
int64_t timeout_ms = round(1000 * timeout) if timeout else -1
c_vector[c_string] c_drained_node_ids
for node_id in node_ids:
c_node_ids.push_back(node_id)
with nogil:
check_status(self.inner.get().DrainNode(
c_node_ids, timeout_ms, c_drained_node_ids))
result = []
for drain_node_id in c_drained_node_ids:
result.append(drain_node_id)
return result

@_auto_reconnect
def internal_kv_del(self, c_string key, c_bool del_by_prefix,
namespace=None, timeout=None):
Expand Down
43 changes: 13 additions & 30 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from enum import Enum
from typing import Any, Callable, Dict, FrozenSet, List, Optional, Set, Tuple, Union

import grpc
import yaml

from ray.autoscaler._private.constants import (
Expand Down Expand Up @@ -76,7 +75,8 @@
TAG_RAY_RUNTIME_CONFIG,
TAG_RAY_USER_NODE_TYPE,
)
from ray.core.generated import gcs_service_pb2, gcs_service_pb2_grpc
from ray._raylet import GcsClient
from ray.exceptions import RpcError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -184,7 +184,7 @@ def __init__(
# TODO(ekl): require config reader to be a callable always.
config_reader: Union[str, Callable[[], dict]],
load_metrics: LoadMetrics,
gcs_node_info_stub: gcs_service_pb2_grpc.NodeInfoGcsServiceStub,
gcs_client: GcsClient,
session_name: Optional[str] = None,
max_launch_batch: int = AUTOSCALER_MAX_LAUNCH_BATCH,
max_concurrent_launches: int = AUTOSCALER_MAX_CONCURRENT_LAUNCHES,
Expand Down Expand Up @@ -214,8 +214,8 @@ def __init__(
prefix_cluster_info: Whether to add the cluster name to info strs.
event_summarizer: Utility to consolidate duplicated messages.
prom_metrics: Prometheus metrics for autoscaler-related operations.
gcs_node_info_stub: Stub for interactions with Ray nodes via gRPC
request to the GCS. Used to drain nodes before termination.
gcs_client: client for interactions with the GCS. Used to drain nodes
before termination.
"""

if isinstance(config_reader, str):
Expand Down Expand Up @@ -355,7 +355,7 @@ def read_fn():
for remote, local in self.config["file_mounts"].items()
}

self.gcs_node_info_stub = gcs_node_info_stub
self.gcs_client = gcs_client

for local_path in self.config["file_mounts"].values():
assert os.path.exists(local_path)
Expand Down Expand Up @@ -675,39 +675,22 @@ def drain_nodes_via_gcs(self, provider_node_ids_to_drain: List[NodeID]):

logger.info(f"Draining {len(raylet_ids_to_drain)} raylet(s).")
try:
request = gcs_service_pb2.DrainNodeRequest(
drain_node_data=[
gcs_service_pb2.DrainNodeData(node_id=raylet_id)
for raylet_id in raylet_ids_to_drain
]
)

# A successful response indicates that the GCS has marked the
# desired nodes as "drained." The cloud provider can then terminate
# the nodes without the GCS printing an error.
response = self.gcs_node_info_stub.DrainNode(request, timeout=5)

# Check if we succeeded in draining all of the intended nodes by
# looking at the RPC response.
drained_raylet_ids = {
status_item.node_id for status_item in response.drain_node_status
}
drained_raylet_ids = set(self.gcs_client.drain_node(
raylet_ids_to_drain, timeout=5))
failed_to_drain = raylet_ids_to_drain - drained_raylet_ids
if failed_to_drain:
self.prom_metrics.drain_node_exceptions.inc()
logger.error(f"Failed to drain {len(failed_to_drain)} raylet(s).")

# If we get a gRPC error with an UNIMPLEMENTED code, fail silently.
# This error indicates that the GCS is using Ray version < 1.8.0,
# for which DrainNode is not implemented.
except grpc.RpcError as e:
# If the code is UNIMPLEMENTED, pass.
if e.code() == grpc.StatusCode.UNIMPLEMENTED:
pass
# Otherwise, it's a plane old gRPC error and we should log it.
else:
self.prom_metrics.drain_node_exceptions.inc()
logger.exception("Failed to drain Ray nodes. Traceback follows.")
except RpcError as e:
# Otherwise, it's a plane old RPC error and we should log it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to handle UNIMPLEMENTED here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The document mentioned it used to deal with Ray version < 1.8.0, and now it's 2.7.0 so I guess we would not ever handle the UNIMPLEMENTED case?

self.prom_metrics.drain_node_exceptions.inc()
logger.exception("Failed to drain Ray nodes. Traceback follows.")
logger.exception("Error: ", e)
rynewang marked this conversation as resolved.
Show resolved Hide resolved
except Exception:
# We don't need to interrupt the autoscaler update with an
# exception, but we should log what went wrong and record the
Expand Down
4 changes: 4 additions & 0 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,10 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil:
CRayStatus InternalKVExists(
const c_string &ns, const c_string &key,
int64_t timeout_ms, c_bool &exists)
CRayStatus DrainNode(
const c_vector[c_string]& node_ids,
int64_t timeout_ms,
c_vector[c_string]& drained_node_ids)

CRayStatus PinRuntimeEnvUri(
const c_string &uri, int expiration_s, int64_t timeout_ms)
Expand Down
2 changes: 2 additions & 0 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ def get_packages(self):
"py-spy >= 0.2.0",
"requests",
"gpustat >= 1.0.0", # for windows
"grpcio >= 1.32.0; python_version < '3.10'", # noqa:E501
"grpcio >= 1.42.0; python_version >= '3.10'", # noqa:E501
"opencensus",
"pydantic < 2", # 2.0.0 brings breaking changes
"prometheus_client >= 0.7.1",
Expand Down
28 changes: 28 additions & 0 deletions src/ray/gcs/gcs_client/gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,34 @@ Status PythonGcsClient::InternalKVExists(const std::string &ns,
return Status::RpcError(status.error_message(), status.error_code());
}

Status PythonGcsClient::DrainNode(const std::vector<std::string> &node_ids,
int64_t timeout_ms,
std::vector<std::string> &drained_node_ids) {
grpc::ClientContext context;
GrpcClientContextWithTimeoutMs(context, timeout_ms);

rpc::DrainNodeRequest request;
for (const std::string &node_id : node_ids) {
request.add_drain_node_data()->set_node_id(node_id);
}

rpc::DrainNodeReply reply;

grpc::Status status = node_info_stub_->DrainNode(&context, request, &reply);
if (status.ok()) {
if (reply.status().code() == static_cast<int>(StatusCode::OK)) {
drained_node_ids.clear();
drained_node_ids.reserve(reply.drain_node_status().size());
for (const auto &node_status : reply.drain_node_status()) {
drained_node_ids.push_back(node_status.node_id());
}
return Status::OK();
}
return HandleGcsError(reply.status());
}
return Status::RpcError(status.error_message(), status.error_code());
}

Status PythonGcsClient::PinRuntimeEnvUri(const std::string &uri,
int expiration_s,
int64_t timeout_ms) {
Expand Down
3 changes: 3 additions & 0 deletions src/ray/gcs/gcs_client/gcs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ class RAY_EXPORT PythonGcsClient {
const std::string &key,
int64_t timeout_ms,
bool &exists);
Status DrainNode(const std::vector<std::string> &node_ids,
int64_t timeout_ms,
std::vector<std::string> &drained_node_ids);

Status PinRuntimeEnvUri(const std::string &uri, int expiration_s, int64_t timeout_ms);
Status GetAllNodeInfo(int64_t timeout_ms, std::vector<rpc::GcsNodeInfo> &result);
Expand Down
Loading