From 564ab089d1cb983e81c158ced3611c25d713a205 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 20 Jul 2023 13:19:20 -0700 Subject: [PATCH 01/20] make grpc optional to tls_utils add grpc deps to default and remove from minimal add DrainNode to python gcs_client, and remove grpc from autoscaler.py Signed-off-by: Ruiyang Wang --- dashboard/optional_deps.py | 1 + python/ray/_private/tls_utils.py | 3 +- python/ray/_raylet.pyx | 16 ++++++++ python/ray/autoscaler/_private/autoscaler.py | 43 ++++++-------------- python/ray/includes/common.pxd | 4 ++ python/setup.py | 2 + src/ray/gcs/gcs_client/gcs_client.cc | 28 +++++++++++++ src/ray/gcs/gcs_client/gcs_client.h | 3 ++ 8 files changed, 68 insertions(+), 32 deletions(-) diff --git a/dashboard/optional_deps.py b/dashboard/optional_deps.py index cd792190df788..f9accac717aa7 100644 --- a/dashboard/optional_deps.py +++ b/dashboard/optional_deps.py @@ -16,3 +16,4 @@ from aiohttp.typedefs import PathLike # noqa: F401 from aiohttp.web import RouteDef # noqa: F401 import pydantic # noqa: F401 +import grpc # noqa: F401 \ No newline at end of file diff --git a/python/ray/_private/tls_utils.py b/python/ray/_private/tls_utils.py index 0251b36ba56bc..cfdfde84c1be5 100644 --- a/python/ray/_private/tls_utils.py +++ b/python/ray/_private/tls_utils.py @@ -2,8 +2,6 @@ import os import socket -import grpc - def generate_self_signed_tls_certs(): """Create self-signed key/cert pair for testing. @@ -68,6 +66,7 @@ def generate_self_signed_tls_certs(): def add_port_to_grpc_server(server, address): + import grpc if os.environ.get("RAY_USE_TLS", "0").lower() in ("1", "true"): server_cert_chain, private_key, ca_cert = load_certs_from_env() credentials = grpc.ssl_server_credentials( diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 5366812f2cc29..234d4915c6f4a 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2421,6 +2421,22 @@ cdef class GcsClient: return num_added + @_auto_reconnect + def drain_node(self, node_ids, timeout=None): + cdef: + c_vector[c_string] c_node_ids + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + c_vector[c_string] c_drained_node_ids + for node_id in node_ids: + c_node_ids.push_back(node_id) + with nogil: + check_status(self.inner.get().DrainNode( + c_node_ids, timeout_ms, c_drained_node_ids)) + result = [] + for drain_node_id in c_drained_node_ids: + result.append(drain_node_id) + return result + @_auto_reconnect def internal_kv_del(self, c_string key, c_bool del_by_prefix, namespace=None, timeout=None): diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 3f63b84fbb60e..e5f425d4153c0 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -12,7 +12,6 @@ from enum import Enum from typing import Any, Callable, Dict, FrozenSet, List, Optional, Set, Tuple, Union -import grpc import yaml from ray.autoscaler._private.constants import ( @@ -76,7 +75,8 @@ TAG_RAY_RUNTIME_CONFIG, TAG_RAY_USER_NODE_TYPE, ) -from ray.core.generated import gcs_service_pb2, gcs_service_pb2_grpc +from ray._raylet import GcsClient +from ray.exceptions import RpcError logger = logging.getLogger(__name__) @@ -184,7 +184,7 @@ def __init__( # TODO(ekl): require config reader to be a callable always. config_reader: Union[str, Callable[[], dict]], load_metrics: LoadMetrics, - gcs_node_info_stub: gcs_service_pb2_grpc.NodeInfoGcsServiceStub, + gcs_client: GcsClient, session_name: Optional[str] = None, max_launch_batch: int = AUTOSCALER_MAX_LAUNCH_BATCH, max_concurrent_launches: int = AUTOSCALER_MAX_CONCURRENT_LAUNCHES, @@ -214,8 +214,8 @@ def __init__( prefix_cluster_info: Whether to add the cluster name to info strs. event_summarizer: Utility to consolidate duplicated messages. prom_metrics: Prometheus metrics for autoscaler-related operations. - gcs_node_info_stub: Stub for interactions with Ray nodes via gRPC - request to the GCS. Used to drain nodes before termination. + gcs_client: client for interactions with the GCS. Used to drain nodes + before termination. """ if isinstance(config_reader, str): @@ -355,7 +355,7 @@ def read_fn(): for remote, local in self.config["file_mounts"].items() } - self.gcs_node_info_stub = gcs_node_info_stub + self.gcs_client = gcs_client for local_path in self.config["file_mounts"].values(): assert os.path.exists(local_path) @@ -675,39 +675,22 @@ def drain_nodes_via_gcs(self, provider_node_ids_to_drain: List[NodeID]): logger.info(f"Draining {len(raylet_ids_to_drain)} raylet(s).") try: - request = gcs_service_pb2.DrainNodeRequest( - drain_node_data=[ - gcs_service_pb2.DrainNodeData(node_id=raylet_id) - for raylet_id in raylet_ids_to_drain - ] - ) - # A successful response indicates that the GCS has marked the # desired nodes as "drained." The cloud provider can then terminate # the nodes without the GCS printing an error. - response = self.gcs_node_info_stub.DrainNode(request, timeout=5) - # Check if we succeeded in draining all of the intended nodes by # looking at the RPC response. - drained_raylet_ids = { - status_item.node_id for status_item in response.drain_node_status - } + drained_raylet_ids = set(self.gcs_client.drain_node( + raylet_ids_to_drain, timeout=5)) failed_to_drain = raylet_ids_to_drain - drained_raylet_ids if failed_to_drain: self.prom_metrics.drain_node_exceptions.inc() logger.error(f"Failed to drain {len(failed_to_drain)} raylet(s).") - - # If we get a gRPC error with an UNIMPLEMENTED code, fail silently. - # This error indicates that the GCS is using Ray version < 1.8.0, - # for which DrainNode is not implemented. - except grpc.RpcError as e: - # If the code is UNIMPLEMENTED, pass. - if e.code() == grpc.StatusCode.UNIMPLEMENTED: - pass - # Otherwise, it's a plane old gRPC error and we should log it. - else: - self.prom_metrics.drain_node_exceptions.inc() - logger.exception("Failed to drain Ray nodes. Traceback follows.") + except RpcError as e: + # Otherwise, it's a plane old RPC error and we should log it. + self.prom_metrics.drain_node_exceptions.inc() + logger.exception("Failed to drain Ray nodes. Traceback follows.") + logger.exception("Error: ", e) except Exception: # We don't need to interrupt the autoscaler update with an # exception, but we should log what went wrong and record the diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index dcc4d50357aca..853876fab9f0a 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -391,6 +391,10 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil: CRayStatus InternalKVExists( const c_string &ns, const c_string &key, int64_t timeout_ms, c_bool &exists) + CRayStatus DrainNode( + const c_vector[c_string]& node_ids, + int64_t timeout_ms, + c_vector[c_string]& drained_node_ids) CRayStatus PinRuntimeEnvUri( const c_string &uri, int expiration_s, int64_t timeout_ms) diff --git a/python/setup.py b/python/setup.py index 0e5160f3aba54..1de39b946c662 100644 --- a/python/setup.py +++ b/python/setup.py @@ -256,6 +256,8 @@ def get_packages(self): "py-spy >= 0.2.0", "requests", "gpustat >= 1.0.0", # for windows + "grpcio >= 1.32.0; python_version < '3.10'", # noqa:E501 + "grpcio >= 1.42.0; python_version >= '3.10'", # noqa:E501 "opencensus", "pydantic < 2", # 2.0.0 brings breaking changes "prometheus_client >= 0.7.1", diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index d43f43d411aaf..6d49324b8fb41 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -353,6 +353,34 @@ Status PythonGcsClient::InternalKVExists(const std::string &ns, return Status::RpcError(status.error_message(), status.error_code()); } +Status PythonGcsClient::DrainNode(const std::vector &node_ids, + int64_t timeout_ms, + std::vector &drained_node_ids) { + grpc::ClientContext context; + GrpcClientContextWithTimeoutMs(context, timeout_ms); + + rpc::DrainNodeRequest request; + for (const std::string &node_id : node_ids) { + request.add_drain_node_data()->set_node_id(node_id); + } + + rpc::DrainNodeReply reply; + + grpc::Status status = node_info_stub_->DrainNode(&context, request, &reply); + if (status.ok()) { + if (reply.status().code() == static_cast(StatusCode::OK)) { + drained_node_ids.clear(); + drained_node_ids.reserve(reply.drain_node_status().size()); + for (const auto &node_status : reply.drain_node_status()) { + drained_node_ids.push_back(node_status.node_id()); + } + return Status::OK(); + } + return HandleGcsError(reply.status()); + } + return Status::RpcError(status.error_message(), status.error_code()); +} + Status PythonGcsClient::PinRuntimeEnvUri(const std::string &uri, int expiration_s, int64_t timeout_ms) { diff --git a/src/ray/gcs/gcs_client/gcs_client.h b/src/ray/gcs/gcs_client/gcs_client.h index 8ddb27165b2c1..eee996f34a604 100644 --- a/src/ray/gcs/gcs_client/gcs_client.h +++ b/src/ray/gcs/gcs_client/gcs_client.h @@ -224,6 +224,9 @@ class RAY_EXPORT PythonGcsClient { const std::string &key, int64_t timeout_ms, bool &exists); + Status DrainNode(const std::vector &node_ids, + int64_t timeout_ms, + std::vector &drained_node_ids); Status PinRuntimeEnvUri(const std::string &uri, int expiration_s, int64_t timeout_ms); Status GetAllNodeInfo(int64_t timeout_ms, std::vector &result); From ad5dbb91131830ea91ef1fd3d4c05ff0a9b2b4e2 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Tue, 8 Aug 2023 21:36:19 -0400 Subject: [PATCH 02/20] remove grpc minimal and add empty space Signed-off-by: Ruiyang Wang --- dashboard/optional_deps.py | 2 +- python/setup.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/dashboard/optional_deps.py b/dashboard/optional_deps.py index f9accac717aa7..b46a6fdd22129 100644 --- a/dashboard/optional_deps.py +++ b/dashboard/optional_deps.py @@ -16,4 +16,4 @@ from aiohttp.typedefs import PathLike # noqa: F401 from aiohttp.web import RouteDef # noqa: F401 import pydantic # noqa: F401 -import grpc # noqa: F401 \ No newline at end of file +import grpc # noqa: F401 diff --git a/python/setup.py b/python/setup.py index 1de39b946c662..a8be879bf855b 100644 --- a/python/setup.py +++ b/python/setup.py @@ -326,8 +326,6 @@ def get_packages(self): setup_spec.install_requires = [ "click >= 7.0", "filelock", - "grpcio >= 1.32.0; python_version < '3.10'", # noqa:E501 - "grpcio >= 1.42.0; python_version >= '3.10'", # noqa:E501 "jsonschema", "msgpack >= 1.0.0, < 2.0.0", "numpy >= 1.16; python_version < '3.9'", From 6b518c5892580d16862619d04ba38ca9bc49786c Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Tue, 8 Aug 2023 22:23:02 -0400 Subject: [PATCH 03/20] make client server optional Signed-off-by: Ruiyang Wang --- python/ray/tests/conftest.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 27af4a4abb676..5a2d903513df4 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -20,7 +20,6 @@ import ray import ray._private.ray_constants as ray_constants -import ray.util.client.server.server as ray_client_server from ray._private.conftest_utils import set_override_dashboard_url # noqa: F401 from ray._private.runtime_env.pip import PipProcessor from ray._private.runtime_env.plugin_schema_manager import RuntimeEnvPluginSchemaManager @@ -629,6 +628,8 @@ def call_ray_start_with_external_redis(request): @pytest.fixture def init_and_serve(): + import ray.util.client.server.server as ray_client_server + server_handle, _ = ray_client_server.init_and_serve("localhost:50051") yield server_handle ray_client_server.shutdown_with_server(server_handle.grpc_server) From 630bf27ff6dde89d170ee7476463d2f923b59779 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 9 Aug 2023 09:39:48 -0400 Subject: [PATCH 04/20] remove leftover grpc references Signed-off-by: Ruiyang Wang --- python/ray/_private/gcs_utils.py | 68 ------------------------------- python/ray/_private/test_utils.py | 13 ++++-- 2 files changed, 9 insertions(+), 72 deletions(-) diff --git a/python/ray/_private/gcs_utils.py b/python/ray/_private/gcs_utils.py index 293d21965ebd8..5c406db0f7592 100644 --- a/python/ray/_private/gcs_utils.py +++ b/python/ray/_private/gcs_utils.py @@ -1,13 +1,6 @@ -import enum import logging -import inspect -import os -import asyncio -from functools import wraps from typing import Optional -import grpc - from ray._private import ray_constants import ray._private.gcs_aio_client @@ -98,60 +91,6 @@ def create_gcs_channel(address: str, aio=False): return init_grpc_channel(address, options=_GRPC_OPTIONS, asynchronous=aio) -# This global variable is used for testing only -_called_freq = {} - - -def _auto_reconnect(f): - # This is for testing to count the frequence - # of gcs call - if inspect.iscoroutinefunction(f): - - @wraps(f) - async def wrapper(self, *args, **kwargs): - if "TEST_RAY_COLLECT_KV_FREQUENCY" in os.environ: - global _called_freq - name = f.__name__ - if name not in _called_freq: - _called_freq[name] = 0 - _called_freq[name] += 1 - - remaining_retry = self._nums_reconnect_retry - while True: - try: - return await f(self, *args, **kwargs) - except grpc.RpcError as e: - if e.code() in ( - grpc.StatusCode.UNAVAILABLE, - grpc.StatusCode.UNKNOWN, - ): - if remaining_retry <= 0: - logger.error( - "Failed to connect to GCS. Please check" - " `gcs_server.out` for more details." - ) - raise - logger.debug( - "Failed to send request to gcs, reconnecting. " f"Error {e}" - ) - try: - self._connect() - except Exception: - logger.error(f"Connecting to gcs failed. Error {e}") - await asyncio.sleep(1) - remaining_retry -= 1 - continue - raise - - return wrapper - else: - - raise NotImplementedError( - "This code moved to Cython, see " - "https://github.com/ray-project/ray/pull/33769" - ) - - class GcsChannel: def __init__(self, gcs_address: Optional[str] = None, aio: bool = False): self._gcs_address = gcs_address @@ -171,13 +110,6 @@ def channel(self): return self._channel -class GcsCode(enum.IntEnum): - # corresponding to ray/src/ray/common/status.h - OK = 0 - NotFound = 17 - GrpcUnavailable = 26 - - # re-export GcsAioClient = ray._private.gcs_aio_client.GcsAioClient diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index f6d054a76f15b..d036efde29596 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -24,7 +24,6 @@ import requests from ray._raylet import Config -import grpc import numpy as np import psutil # We must import psutil after ray because we bundle it with ray. from ray._private import ( @@ -32,7 +31,6 @@ ) from ray._private.worker import RayContext import yaml -from grpc._channel import _InactiveRpcError import ray import ray._private.gcs_utils as gcs_utils @@ -45,9 +43,7 @@ from ray.core.generated import ( gcs_pb2, node_manager_pb2, - node_manager_pb2_grpc, gcs_service_pb2, - gcs_service_pb2_grpc, ) from ray.util.queue import Empty, Queue, _QueueActor from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy @@ -1474,6 +1470,9 @@ async def get_total_killed_nodes(self): return self.killed_nodes def _kill_raylet(self, ip, port, graceful=False): + import grpc + from grpc._channel import _InactiveRpcError + from ray.core.generated import node_manager_pb2_grpc raylet_address = f"{ip}:{port}" channel = grpc.insecure_channel(raylet_address) stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) @@ -1694,6 +1693,8 @@ def wandb_setup_api_key_hook(): # Get node stats from node manager. def get_node_stats(raylet, num_retry=5, timeout=2): + import grpc + from ray.core.generated import node_manager_pb2_grpc raylet_address = f'{raylet["NodeManagerAddress"]}:{raylet["NodeManagerPort"]}' channel = ray._private.utils.init_grpc_channel(raylet_address) stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) @@ -1711,6 +1712,7 @@ def get_node_stats(raylet, num_retry=5, timeout=2): # Gets resource usage assuming gcs is local. def get_resource_usage(gcs_address, timeout=10): + from ray.core.generated import gcs_service_pb2_grpc if not gcs_address: gcs_address = ray.worker._global_node.gcs_address @@ -1739,6 +1741,9 @@ def get_load_metrics_report(webui_url): # Send a RPC to the raylet to have it self-destruct its process. def kill_raylet(raylet, graceful=False): + import grpc + from grpc._channel import _InactiveRpcError + from ray.core.generated import node_manager_pb2_grpc raylet_address = f'{raylet["NodeManagerAddress"]}:{raylet["NodeManagerPort"]}' channel = grpc.insecure_channel(raylet_address) stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) From deb92f23259666edafc28f14bfbbedd926bd3b15 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 9 Aug 2023 11:27:58 -0400 Subject: [PATCH 05/20] add get_all_resource_usage to GcsClient; make grpc server in dash agent optional Signed-off-by: Ruiyang Wang --- dashboard/agent.py | 85 ++++++++++++----------- dashboard/utils.py | 4 +- python/ray/_raylet.pyx | 13 ++++ python/ray/autoscaler/_private/monitor.py | 33 ++++----- python/ray/includes/common.pxd | 2 + src/ray/gcs/gcs_client/gcs_client.cc | 21 ++++++ src/ray/gcs/gcs_client/gcs_client.h | 3 +- 7 files changed, 99 insertions(+), 62 deletions(-) diff --git a/dashboard/agent.py b/dashboard/agent.py index d56047b2e3884..bcf0335578cf0 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -16,7 +16,6 @@ import ray.dashboard.consts as dashboard_consts import ray.dashboard.utils as dashboard_utils from ray.dashboard.consts import _PARENT_DEATH_THREASHOLD -from ray._private.gcs_pubsub import GcsAioPublisher from ray._raylet import GcsClient from ray._private.gcs_utils import GcsAioClient from ray._private.ray_logging import ( @@ -31,12 +30,6 @@ # Import psutil after ray so the packaged version is used. import psutil -try: - from grpc import aio as aiogrpc -except ImportError: - from grpc.experimental import aio as aiogrpc - - # Publishes at most this number of lines of Raylet logs, when the Raylet dies # unexpectedly. _RAYLET_LOG_MAX_PUBLISH_LINES = 20 @@ -52,19 +45,6 @@ logger = logging.getLogger(__name__) -# We would want to suppress deprecating warnings from aiogrpc library -# with the usage of asyncio.get_event_loop() in python version >=3.10 -# This could be removed once https://github.com/grpc/grpc/issues/32526 -# is released, and we used higher versions of grpcio that that. -if sys.version_info.major >= 3 and sys.version_info.minor >= 10: - import warnings - - with warnings.catch_warnings(): - warnings.simplefilter("ignore", category=DeprecationWarning) - aiogrpc.init_grpc_aio() -else: - aiogrpc.init_grpc_aio() - class DashboardAgent: def __init__( @@ -117,13 +97,44 @@ def __init__( assert self.ppid > 0 logger.info("Parent pid is %s", self.ppid) - # Setup raylet channel - options = ray_constants.GLOBAL_GRPC_OPTIONS - self.aiogrpc_raylet_channel = ray._private.utils.init_grpc_channel( - f"{self.ip}:{self.node_manager_port}", options, asynchronous=True - ) + # grpc server is None in mininal. + self.server = None + # http_server is None in minimal. + self.http_server = None + + # Used by the agent and sub-modules. + # TODO(architkulkarni): Remove gcs_client once the agent exclusively uses + # gcs_aio_client and not gcs_client. + self.gcs_client = GcsClient(address=self.gcs_address) + _initialize_internal_kv(self.gcs_client) + assert _internal_kv_initialized() + self.gcs_aio_client = GcsAioClient(address=self.gcs_address) + + if not self.minimal: + self._init_non_minimal() + + def _init_non_minimal(self): + from ray._private.gcs_pubsub import GcsAioPublisher + self.aio_publisher = GcsAioPublisher(address=self.gcs_address) + + try: + from grpc import aio as aiogrpc + except ImportError: + from grpc.experimental import aio as aiogrpc + + # We would want to suppress deprecating warnings from aiogrpc library + # with the usage of asyncio.get_event_loop() in python version >=3.10 + # This could be removed once https://github.com/grpc/grpc/issues/32526 + # is released, and we used higher versions of grpcio that that. + if sys.version_info.major >= 3 and sys.version_info.minor >= 10: + import warnings + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=DeprecationWarning) + aiogrpc.init_grpc_aio() + else: + aiogrpc.init_grpc_aio() - # Setup grpc server self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0),)) grpc_ip = "127.0.0.1" if self.ip == "127.0.0.1" else "0.0.0.0" try: @@ -143,19 +154,6 @@ def __init__( else: logger.info("Dashboard agent grpc address: %s:%s", grpc_ip, self.grpc_port) - # If the agent is started as non-minimal version, http server should - # be configured to communicate with the dashboard in a head node. - self.http_server = None - - # Used by the agent and sub-modules. - # TODO(architkulkarni): Remove gcs_client once the agent exclusively uses - # gcs_aio_client and not gcs_client. - self.gcs_client = GcsClient(address=self.gcs_address) - _initialize_internal_kv(self.gcs_client) - assert _internal_kv_initialized() - self.gcs_aio_client = GcsAioClient(address=self.gcs_address) - self.publisher = GcsAioPublisher(address=self.gcs_address) - async def _configure_http_server(self, modules): from ray.dashboard.http_server_agent import HttpServerAgent @@ -180,9 +178,16 @@ def _load_modules(self): @property def http_session(self): - assert self.http_server, "Accessing unsupported API in a minimal ray." + assert self.http_server, \ + "Accessing unsupported API (HttpServerAgent) in a minimal ray." return self.http_server.http_session + @property + def publisher(self): + assert self.aio_publisher, \ + "Accessing unsupported API (GcsAioPublisher) in a minimal ray." + return self.aio_publisher + async def run(self): async def _check_parent(): """Check if raylet is dead and fate-share if it is.""" diff --git a/dashboard/utils.py b/dashboard/utils.py index 6434ce5c1b606..2f22f4af82ba6 100644 --- a/dashboard/utils.py +++ b/dashboard/utils.py @@ -50,7 +50,7 @@ async def run(self, server): """ Run the module in an asyncio loop. An agent module can provide servicers to the server. - :param server: Asyncio GRPC server. + :param server: Asyncio GRPC server, or None if ray is minimal. """ @staticmethod @@ -79,7 +79,7 @@ async def run(self, server): """ Run the module in an asyncio loop. A head module can provide servicers to the server. - :param server: Asyncio GRPC server. + :param server: Asyncio GRPC server, or None if ray is minimal. """ @staticmethod diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 234d4915c6f4a..ef884f4f049c7 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -182,6 +182,7 @@ import ray._private.ray_constants as ray_constants import ray.cloudpickle as ray_pickle from ray.core.generated.common_pb2 import ActorDiedErrorContext from ray.core.generated.gcs_pb2 import JobTableData +from ray.core.generated.gcs_service_pb2 import GetAllResourceUsageReply from ray._private.async_compat import ( sync_to_async, get_new_event_loop, @@ -2528,6 +2529,18 @@ cdef class GcsClient: result[job_info.job_id] = job_info return result + @_auto_reconnect + def get_all_resource_usage(self, timeout=None) -> GetAllResourceUsageReply: + cdef: + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + c_string serialized_reply + + with nogil: + check_status(self.inner.get().GetAllResourceUsage(timeout_ms, serialized_reply)) + + reply = GetAllResourceUsageReply() + reply.ParseFromString(serialized_reply) + return reply ######################################################## # Interface for rpc::autoscaler::AutoscalerStateService ######################################################## diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index 9f22f283b54f8..0c4ef720bda49 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -15,9 +15,10 @@ import ray import ray._private.ray_constants as ray_constants import ray._private.utils +import ray._private.gcs_utils as gcs_utils from ray._private.event.event_logger import get_event_logger from ray._private.ray_logging import setup_component_logger -from ray._raylet import GcsClient +from ray._raylet import GcsClient, GcsClientOptions, GlobalStateAccessor from ray.autoscaler._private.autoscaler import StandardAutoscaler from ray.autoscaler._private.commands import teardown_cluster from ray.autoscaler._private.constants import ( @@ -29,7 +30,7 @@ from ray.autoscaler._private.load_metrics import LoadMetrics from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics from ray.autoscaler._private.util import format_readonly_node_type -from ray.core.generated import gcs_pb2, gcs_service_pb2, gcs_service_pb2_grpc +from ray.core.generated import gcs_pb2 from ray.core.generated.event_pb2 import Event as RayEvent from ray.experimental.internal_kv import ( _initialize_internal_kv, @@ -140,30 +141,21 @@ def __init__( retry_on_failure: bool = True, ): self.gcs_address = address - options = ray_constants.GLOBAL_GRPC_OPTIONS - gcs_channel = ray._private.utils.init_grpc_channel(self.gcs_address, options) - # TODO: Use gcs client for this - self.gcs_node_resources_stub = ( - gcs_service_pb2_grpc.NodeResourceInfoGcsServiceStub(gcs_channel) - ) - self.gcs_node_info_stub = gcs_service_pb2_grpc.NodeInfoGcsServiceStub( - gcs_channel - ) worker = ray._private.worker.global_worker - gcs_client = GcsClient(address=self.gcs_address) + self.gcs_client = GcsClient(address=self.gcs_address) if monitor_ip: monitor_addr = f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}" - gcs_client.internal_kv_put( + self.gcs_client.internal_kv_put( b"AutoscalerMetricsAddress", monitor_addr.encode(), True, None ) - _initialize_internal_kv(gcs_client) + _initialize_internal_kv(self.gcs_client) if monitor_ip: monitor_addr = f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}" - gcs_client.internal_kv_put( + self.gcs_client.internal_kv_put( b"AutoscalerMetricsAddress", monitor_addr.encode(), True, None ) - self._session_name = self.get_session_name(gcs_client) + self._session_name = self.get_session_name(self.gcs_client) logger.info(f"session_name: {self._session_name}") worker.mode = 0 head_node_ip = self.gcs_address.split(":")[0] @@ -179,6 +171,10 @@ def __init__( # simply mirroring what the GCS tells us the cluster node types are. self.readonly_config = None + gcs_options = GcsClientOptions.from_gcs_address(self.gcs_address) + self.global_state_accessor = GlobalStateAccessor(gcs_options) + self.global_state_accessor.connect() + if log_dir: try: self.event_logger = get_event_logger( @@ -239,7 +235,7 @@ def get_latest_readonly_config(): self.autoscaler = StandardAutoscaler( autoscaling_config, self.load_metrics, - self.gcs_node_info_stub, + self.gcs_client, self._session_name, prefix_cluster_info=self.prefix_cluster_info, event_summarizer=self.event_summarizer, @@ -249,8 +245,7 @@ def get_latest_readonly_config(): def update_load_metrics(self): """Fetches resource usage data from GCS and updates load metrics.""" - request = gcs_service_pb2.GetAllResourceUsageRequest() - response = self.gcs_node_resources_stub.GetAllResourceUsage(request, timeout=60) + response = self.gcs_client.GetAllResourceUsage(timeout=60) resources_batch_data = response.resource_usage_data log_resource_batch_data_if_desired(resources_batch_data) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 853876fab9f0a..221f239838fd0 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -402,6 +402,8 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil: int64_t timeout_ms, c_vector[CGcsNodeInfo]& result) CRayStatus GetAllJobInfo( int64_t timeout_ms, c_vector[CJobTableData]& result) + CRayStatus GetAllResourceUsage( + int64_t timeout_ms, c_string& serialized_reply) CRayStatus RequestClusterResourceConstraint( int64_t timeout_ms, const c_vector[unordered_map[c_string, double]] &bundles, diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index 6d49324b8fb41..419a6aa3ac135 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -152,6 +152,7 @@ Status PythonGcsClient::Connect() { runtime_env_stub_ = rpc::RuntimeEnvGcsService::NewStub(channel_); node_info_stub_ = rpc::NodeInfoGcsService::NewStub(channel_); job_info_stub_ = rpc::JobInfoGcsService::NewStub(channel_); + node_resource_info_stub_ = rpc::NodeResourceInfoGcsService::NewStub(channel_); autoscaler_stub_ = rpc::autoscaler::AutoscalerStateService::NewStub(channel_); return Status::OK(); } @@ -450,6 +451,26 @@ Status PythonGcsClient::GetAllJobInfo(int64_t timeout_ms, return Status::RpcError(status.error_message(), status.error_code()); } +Status PythonGcsClient::GetAllResourceUsage(int64_t timeout_ms, + std::string &serialized_reply) { + grpc::ClientContext context; + GrpcClientContextWithTimeoutMs(context, timeout_ms); + + rpc::GetAllResourceUsageRequest request; + rpc::GetAllResourceUsageReply reply; + + grpc::Status status = + node_resource_info_stub_->GetAllResourceUsage(&context, request, &reply); + if (status.ok()) { + if (reply.status().code() == static_cast(StatusCode::OK)) { + serialized_reply = reply.SerializeAsString(); + return Status::OK(); + } + return HandleGcsError(reply.status()); + } + return Status::RpcError(status.error_message(), status.error_code()); +} + Status PythonGcsClient::RequestClusterResourceConstraint( int64_t timeout_ms, const std::vector> &bundles, diff --git a/src/ray/gcs/gcs_client/gcs_client.h b/src/ray/gcs/gcs_client/gcs_client.h index eee996f34a604..b6f391edb1861 100644 --- a/src/ray/gcs/gcs_client/gcs_client.h +++ b/src/ray/gcs/gcs_client/gcs_client.h @@ -231,7 +231,7 @@ class RAY_EXPORT PythonGcsClient { Status PinRuntimeEnvUri(const std::string &uri, int expiration_s, int64_t timeout_ms); Status GetAllNodeInfo(int64_t timeout_ms, std::vector &result); Status GetAllJobInfo(int64_t timeout_ms, std::vector &result); - + Status GetAllResourceUsage(int64_t timeout_ms, std::string &serialized_reply); // For rpc::autoscaler::AutoscalerStateService Status RequestClusterResourceConstraint( int64_t timeout_ms, @@ -244,6 +244,7 @@ class RAY_EXPORT PythonGcsClient { std::unique_ptr kv_stub_; std::unique_ptr runtime_env_stub_; std::unique_ptr node_info_stub_; + std::unique_ptr node_resource_info_stub_; std::unique_ptr job_info_stub_; std::unique_ptr autoscaler_stub_; std::shared_ptr channel_; From d49be1727904c4c23c19e49db53c7bfe028b879e Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 9 Aug 2023 11:38:33 -0400 Subject: [PATCH 06/20] remove unused imports Signed-off-by: Ruiyang Wang --- python/ray/autoscaler/_private/monitor.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index 0c4ef720bda49..e48613a178935 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -15,10 +15,9 @@ import ray import ray._private.ray_constants as ray_constants import ray._private.utils -import ray._private.gcs_utils as gcs_utils from ray._private.event.event_logger import get_event_logger from ray._private.ray_logging import setup_component_logger -from ray._raylet import GcsClient, GcsClientOptions, GlobalStateAccessor +from ray._raylet import GcsClient from ray.autoscaler._private.autoscaler import StandardAutoscaler from ray.autoscaler._private.commands import teardown_cluster from ray.autoscaler._private.constants import ( @@ -171,10 +170,6 @@ def __init__( # simply mirroring what the GCS tells us the cluster node types are. self.readonly_config = None - gcs_options = GcsClientOptions.from_gcs_address(self.gcs_address) - self.global_state_accessor = GlobalStateAccessor(gcs_options) - self.global_state_accessor.connect() - if log_dir: try: self.event_logger = get_event_logger( From bedca2b6244e557103a5ec8dd20d684311342b60 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 9 Aug 2023 12:12:36 -0400 Subject: [PATCH 07/20] remove accidental deps on grpc Signed-off-by: Ruiyang Wang --- dashboard/agent.py | 3 ++- python/ray/tests/test_basic.py | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/dashboard/agent.py b/dashboard/agent.py index bcf0335578cf0..5632026b1605c 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -316,9 +316,10 @@ async def _check_parent(): # TODO: Use async version if performance is an issue # -1 should indicate that http server is not started. http_port = -1 if not self.http_server else self.http_server.http_port + grpc_port = -1 if not self.server else self.grpc_port await self.gcs_aio_client.internal_kv_put( f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}{self.node_id}".encode(), - json.dumps([http_port, self.grpc_port]).encode(), + json.dumps([http_port, grpc_port]).encode(), True, namespace=ray_constants.KV_NAMESPACE_DASHBOARD, ) diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index d7a9a7919e855..f4cbb76c463ea 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1101,8 +1101,10 @@ def f(): def test_import_ray_does_not_import_grpc(): # First unload grpc and ray - del sys.modules["grpc"] - del sys.modules["ray"] + if "grpc" in sys.modules: + del sys.modules["grpc"] + if "ray" in sys.modules: + del sys.modules["ray"] # Then import ray from scratch import ray # noqa: F401 From 8fad1df8ebd8a715a092f18e561e063f820f3115 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 9 Aug 2023 13:34:29 -0400 Subject: [PATCH 08/20] fix the assumption that we have grpc installed in minimal Signed-off-by: Ruiyang Wang --- python/ray/tests/test_basic.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index f4cbb76c463ea..193a847d8b4da 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1113,7 +1113,11 @@ def test_import_ray_does_not_import_grpc(): assert "grpc" not in sys.modules # Load grpc back so other tests will not be affected - import grpc # noqa: F401 + try: + import grpc # noqa: F401 + except ImportError: + # It's ok if we don't have grpc installed. + pass if __name__ == "__main__": From 9c1241a91ef1feee29f7b15f0e2cd6860894698b Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 9 Aug 2023 13:40:37 -0400 Subject: [PATCH 09/20] move tune experiment grpc dep Signed-off-by: Ruiyang Wang --- python/ray/_raylet.pyx | 1 + python/ray/includes/common.pxd | 1 + python/ray/tune/experiment/experiment.py | 4 ++-- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index ef884f4f049c7..9eebf3450180c 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -214,6 +214,7 @@ OPTIMIZED = __OPTIMIZE__ GRPC_STATUS_CODE_UNAVAILABLE = CGrpcStatusCode.UNAVAILABLE GRPC_STATUS_CODE_UNKNOWN = CGrpcStatusCode.UNKNOWN GRPC_STATUS_CODE_DEADLINE_EXCEEDED = CGrpcStatusCode.DEADLINE_EXCEEDED +GRPC_STATUS_CODE_RESOURCE_EXHAUSTED = CGrpcStatusCode.RESOURCE_EXHAUSTED logger = logging.getLogger(__name__) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 221f239838fd0..0cb0a68edc440 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -360,6 +360,7 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil: UNAVAILABLE "grpc::StatusCode::UNAVAILABLE", UNKNOWN "grpc::StatusCode::UNKNOWN", DEADLINE_EXCEEDED "grpc::StatusCode::DEADLINE_EXCEEDED", + RESOURCE_EXHAUSTED "grpc::StatusCode::RESOURCE_EXHAUSTED" cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions": CGcsClientOptions(const c_string &gcs_address) diff --git a/python/ray/tune/experiment/experiment.py b/python/ray/tune/experiment/experiment.py index e7d9497f3a59c..a176aa1aa9dfb 100644 --- a/python/ray/tune/experiment/experiment.py +++ b/python/ray/tune/experiment/experiment.py @@ -2,7 +2,6 @@ import datetime import warnings from functools import partial -import grpc import logging import os from pathlib import Path @@ -22,6 +21,7 @@ TYPE_CHECKING, ) +import ray from ray.air import CheckpointConfig from ray.air._internal.uri_utils import URI from ray.exceptions import RpcError @@ -179,7 +179,7 @@ def __init__( try: self._run_identifier = Experiment.register_if_needed(run) except RpcError as e: - if e.rpc_code == grpc.StatusCode.RESOURCE_EXHAUSTED.value[0]: + if e.rpc_code == ray._raylet.GRPC_STATUS_CODE_RESOURCE_EXHAUSTED: raise TuneError( f"The Trainable/training function is too large for grpc resource " f"limit. Check that its definition is not implicitly capturing a " From efd0f57820c59f7c5ced5d7a21ff63824884731f Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 9 Aug 2023 14:35:33 -0400 Subject: [PATCH 10/20] typo Signed-off-by: Ruiyang Wang --- python/ray/autoscaler/_private/monitor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index e48613a178935..c0ac2fa32ef41 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -240,7 +240,7 @@ def get_latest_readonly_config(): def update_load_metrics(self): """Fetches resource usage data from GCS and updates load metrics.""" - response = self.gcs_client.GetAllResourceUsage(timeout=60) + response = self.gcs_client.get_all_resource_usage(timeout=60) resources_batch_data = response.resource_usage_data log_resource_batch_data_if_desired(resources_batch_data) From f724a1552b25018556643e06d02cd90810fcea5c Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 9 Aug 2023 15:47:54 -0400 Subject: [PATCH 11/20] skip ray client tests in minimal Signed-off-by: Ruiyang Wang --- python/ray/tests/conftest.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 5a2d903513df4..1172b97b39bbc 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -650,6 +650,9 @@ def call_ray_stop_only(): def start_cluster(ray_start_cluster_enabled, request): assert request.param in {"ray_client", "no_ray_client"} use_ray_client: bool = request.param == "ray_client" + if os.environ.get("RAY_MINIMAL") == "1" and use_ray_client: + pytest.skip("Skipping due to we don't have ray client in minimal.") + cluster = ray_start_cluster_enabled cluster.add_node(num_cpus=4, dashboard_agent_listen_port=find_free_port()) if use_ray_client: From bffe8c57336f05429ba4306404f3b537c79217aa Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 9 Aug 2023 21:27:14 -0400 Subject: [PATCH 12/20] rename DrainNode to DrainNodes to disambiguate, and remove incompatible tests Signed-off-by: Ruiyang Wang --- python/ray/_raylet.pyx | 32 +++++------ python/ray/autoscaler/_private/autoscaler.py | 2 +- python/ray/includes/common.pxd | 9 ++- .../ray/tests/test_runtime_env_ray_minimal.py | 35 ------------ src/ray/gcs/gcs_client/gcs_client.cc | 56 +++++++++---------- src/ray/gcs/gcs_client/gcs_client.h | 6 +- 6 files changed, 52 insertions(+), 88 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 11c87bc69624d..60a9033b9e997 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2423,22 +2423,6 @@ cdef class GcsClient: return num_added - @_auto_reconnect - def drain_node(self, node_ids, timeout=None): - cdef: - c_vector[c_string] c_node_ids - int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - c_vector[c_string] c_drained_node_ids - for node_id in node_ids: - c_node_ids.push_back(node_id) - with nogil: - check_status(self.inner.get().DrainNode( - c_node_ids, timeout_ms, c_drained_node_ids)) - result = [] - for drain_node_id in c_drained_node_ids: - result.append(drain_node_id) - return result - @_auto_reconnect def internal_kv_del(self, c_string key, c_bool del_by_prefix, namespace=None, timeout=None): @@ -2590,6 +2574,22 @@ cdef class GcsClient: return is_accepted + @_auto_reconnect + def drain_nodes(self, node_ids, timeout=None): + cdef: + c_vector[c_string] c_node_ids + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + c_vector[c_string] c_drained_node_ids + for node_id in node_ids: + c_node_ids.push_back(node_id) + with nogil: + check_status(self.inner.get().DrainNodes( + c_node_ids, timeout_ms, c_drained_node_ids)) + result = [] + for drain_node_id in c_drained_node_ids: + result.append(drain_node_id) + return result + ############################################################# # Interface for rpc::autoscaler::AutoscalerStateService ends ############################################################# diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index e5f425d4153c0..9d002bac479a2 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -680,7 +680,7 @@ def drain_nodes_via_gcs(self, provider_node_ids_to_drain: List[NodeID]): # the nodes without the GCS printing an error. # Check if we succeeded in draining all of the intended nodes by # looking at the RPC response. - drained_raylet_ids = set(self.gcs_client.drain_node( + drained_raylet_ids = set(self.gcs_client.drain_nodes( raylet_ids_to_drain, timeout=5)) failed_to_drain = raylet_ids_to_drain - drained_raylet_ids if failed_to_drain: diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 6dc9ac43bece7..9b829dcf9170c 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -392,11 +392,6 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil: CRayStatus InternalKVExists( const c_string &ns, const c_string &key, int64_t timeout_ms, c_bool &exists) - CRayStatus DrainNode( - const c_vector[c_string]& node_ids, - int64_t timeout_ms, - c_vector[c_string]& drained_node_ids) - CRayStatus PinRuntimeEnvUri( const c_string &uri, int expiration_s, int64_t timeout_ms) CRayStatus GetAllNodeInfo( @@ -418,6 +413,10 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil: const c_string &reason_message, int64_t timeout_ms, c_bool &is_accepted) + CRayStatus DrainNodes( + const c_vector[c_string]& node_ids, + int64_t timeout_ms, + c_vector[c_string]& drained_node_ids) cdef extern from "ray/gcs/gcs_client/gcs_client.h" namespace "ray::gcs" nogil: diff --git a/python/ray/tests/test_runtime_env_ray_minimal.py b/python/ray/tests/test_runtime_env_ray_minimal.py index 2b3d68e51286e..4dc8c3ea0e225 100644 --- a/python/ray/tests/test_runtime_env_ray_minimal.py +++ b/python/ray/tests/test_runtime_env_ray_minimal.py @@ -36,23 +36,6 @@ def task(self): ray.get(a.task.remote()) -@pytest.mark.skipif( - sys.platform == "win32", reason="runtime_env unsupported on Windows." -) -@pytest.mark.skipif( - os.environ.get("RAY_MINIMAL") != "1", - reason="This test is only run in CI with a minimal Ray installation.", -) -@pytest.mark.parametrize( - "call_ray_start", - ["ray start --head --ray-client-server-port 25553 --port 0"], - indirect=True, -) -def test_ray_client_task_actor(call_ray_start): - ray.init("ray://localhost:25553") - _test_task_and_actor() - - @pytest.mark.skipif( sys.platform == "win32", reason="runtime_env unsupported on Windows." ) @@ -83,24 +66,6 @@ def f(): ray.get(f.remote()) -@pytest.mark.skipif( - sys.platform == "win32", reason="runtime_env unsupported on Windows." -) -@pytest.mark.skipif( - os.environ.get("RAY_MINIMAL") != "1", - reason="This test is only run in CI with a minimal Ray installation.", -) -@pytest.mark.parametrize( - "call_ray_start", - ["ray start --head --ray-client-server-port 25552 --port 0"], - indirect=True, -) -def test_ray_client_init(call_ray_start): - with pytest.raises(ConnectionAbortedError) as excinfo: - ray.init("ray://localhost:25552", runtime_env={"pip": ["requests"]}) - assert "install virtualenv" in str(excinfo.value) - - if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index e40e421ac7598..45d498ff2d776 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -354,34 +354,6 @@ Status PythonGcsClient::InternalKVExists(const std::string &ns, return Status::RpcError(status.error_message(), status.error_code()); } -Status PythonGcsClient::DrainNode(const std::vector &node_ids, - int64_t timeout_ms, - std::vector &drained_node_ids) { - grpc::ClientContext context; - GrpcClientContextWithTimeoutMs(context, timeout_ms); - - rpc::DrainNodeRequest request; - for (const std::string &node_id : node_ids) { - request.add_drain_node_data()->set_node_id(node_id); - } - - rpc::DrainNodeReply reply; - - grpc::Status status = node_info_stub_->DrainNode(&context, request, &reply); - if (status.ok()) { - if (reply.status().code() == static_cast(StatusCode::OK)) { - drained_node_ids.clear(); - drained_node_ids.reserve(reply.drain_node_status().size()); - for (const auto &node_status : reply.drain_node_status()) { - drained_node_ids.push_back(node_status.node_id()); - } - return Status::OK(); - } - return HandleGcsError(reply.status()); - } - return Status::RpcError(status.error_message(), status.error_code()); -} - Status PythonGcsClient::PinRuntimeEnvUri(const std::string &uri, int expiration_s, int64_t timeout_ms) { @@ -544,6 +516,34 @@ Status PythonGcsClient::DrainNode(const std::string &node_id, return Status::RpcError(status.error_message(), status.error_code()); } +Status PythonGcsClient::DrainNodes(const std::vector &node_ids, + int64_t timeout_ms, + std::vector &drained_node_ids) { + grpc::ClientContext context; + GrpcClientContextWithTimeoutMs(context, timeout_ms); + + rpc::DrainNodeRequest request; + for (const std::string &node_id : node_ids) { + request.add_drain_node_data()->set_node_id(node_id); + } + + rpc::DrainNodeReply reply; + + grpc::Status status = node_info_stub_->DrainNode(&context, request, &reply); + if (status.ok()) { + if (reply.status().code() == static_cast(StatusCode::OK)) { + drained_node_ids.clear(); + drained_node_ids.reserve(reply.drain_node_status().size()); + for (const auto &node_status : reply.drain_node_status()) { + drained_node_ids.push_back(node_status.node_id()); + } + return Status::OK(); + } + return HandleGcsError(reply.status()); + } + return Status::RpcError(status.error_message(), status.error_code()); +} + std::unordered_map PythonGetResourcesTotal( const rpc::GcsNodeInfo &node_info) { return std::unordered_map(node_info.resources_total().begin(), diff --git a/src/ray/gcs/gcs_client/gcs_client.h b/src/ray/gcs/gcs_client/gcs_client.h index 26586954522b9..14c2850c84c5f 100644 --- a/src/ray/gcs/gcs_client/gcs_client.h +++ b/src/ray/gcs/gcs_client/gcs_client.h @@ -224,9 +224,6 @@ class RAY_EXPORT PythonGcsClient { const std::string &key, int64_t timeout_ms, bool &exists); - Status DrainNode(const std::vector &node_ids, - int64_t timeout_ms, - std::vector &drained_node_ids); Status PinRuntimeEnvUri(const std::string &uri, int expiration_s, int64_t timeout_ms); Status GetAllNodeInfo(int64_t timeout_ms, std::vector &result); @@ -243,6 +240,9 @@ class RAY_EXPORT PythonGcsClient { const std::string &reason_message, int64_t timeout_ms, bool &is_accepted); + Status DrainNodes(const std::vector &node_ids, + int64_t timeout_ms, + std::vector &drained_node_ids); private: GcsClientOptions options_; From d8f0e63550b65c1fe8acd3887d59b678ca721859 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 10 Aug 2023 10:17:59 -0400 Subject: [PATCH 13/20] remove ray client in ray minimal tests Signed-off-by: Ruiyang Wang --- python/ray/tests/test_usage_stats.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/tests/test_usage_stats.py b/python/ray/tests/test_usage_stats.py index 53f962970a076..039ae53cd1148 100644 --- a/python/ray/tests/test_usage_stats.py +++ b/python/ray/tests/test_usage_stats.py @@ -125,6 +125,8 @@ def reset_ray_version_commit(): def test_get_extra_usage_tags_to_report( monkeypatch, call_ray_start, reset_usage_stats, ray_client, gcs_storage_type ): + if os.environ.get("RAY_MINIMAL") == "1" and ray_client: + pytest.skip("Skipping due to we don't have ray client in minimal.") with monkeypatch.context() as m: # Test a normal case. m.setenv("RAY_USAGE_STATS_EXTRA_TAGS", "key=val;key2=val2") From f42e937395670d880d0eff21f123a942464b521d Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 10 Aug 2023 10:48:37 -0400 Subject: [PATCH 14/20] fix merge bad Signed-off-by: Ruiyang Wang --- src/ray/gcs/gcs_client/gcs_client.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index b6c8fe15b2d76..5bf0c44a9399c 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -459,7 +459,7 @@ Status PythonGcsClient::GetAllJobInfo(int64_t timeout_ms, Status PythonGcsClient::GetAllResourceUsage(int64_t timeout_ms, std::string &serialized_reply) { grpc::ClientContext context; - GrpcClientContextWithTimeoutMs(context, timeout_ms); + PrepareContext(context, timeout_ms); rpc::GetAllResourceUsageRequest request; rpc::GetAllResourceUsageReply reply; @@ -553,7 +553,7 @@ Status PythonGcsClient::DrainNodes(const std::vector &node_ids, int64_t timeout_ms, std::vector &drained_node_ids) { grpc::ClientContext context; - GrpcClientContextWithTimeoutMs(context, timeout_ms); + PrepareContext(context, timeout_ms); rpc::DrainNodeRequest request; for (const std::string &node_id : node_ids) { From 6a44535a5d6904d6c978b42612a02aa8466f95bd Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 10 Aug 2023 12:20:29 -0400 Subject: [PATCH 15/20] lint Signed-off-by: Ruiyang Wang --- src/ray/gcs/gcs_client/gcs_client.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ray/gcs/gcs_client/gcs_client.h b/src/ray/gcs/gcs_client/gcs_client.h index 08991f8002d82..e2d35e36f4c9f 100644 --- a/src/ray/gcs/gcs_client/gcs_client.h +++ b/src/ray/gcs/gcs_client/gcs_client.h @@ -242,8 +242,8 @@ class RAY_EXPORT PythonGcsClient { int64_t timeout_ms, bool &is_accepted); Status DrainNodes(const std::vector &node_ids, - int64_t timeout_ms, - std::vector &drained_node_ids); + int64_t timeout_ms, + std::vector &drained_node_ids); const ClusterID &GetClusterId() const { return cluster_id_; } From 85c8167bbd0e7ff4418143f381c7e408cf82fb45 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 10 Aug 2023 17:32:00 -0400 Subject: [PATCH 16/20] remove redaundant error print Signed-off-by: Ruiyang Wang --- python/ray/autoscaler/_private/autoscaler.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 9d002bac479a2..c6ba033797e3f 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -686,11 +686,10 @@ def drain_nodes_via_gcs(self, provider_node_ids_to_drain: List[NodeID]): if failed_to_drain: self.prom_metrics.drain_node_exceptions.inc() logger.error(f"Failed to drain {len(failed_to_drain)} raylet(s).") - except RpcError as e: + except RpcError: # Otherwise, it's a plane old RPC error and we should log it. self.prom_metrics.drain_node_exceptions.inc() logger.exception("Failed to drain Ray nodes. Traceback follows.") - logger.exception("Error: ", e) except Exception: # We don't need to interrupt the autoscaler update with an # exception, but we should log what went wrong and record the From 67e59a4c39e27515bcd8f9121cbfef225cadd30e Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 10 Aug 2023 18:21:16 -0400 Subject: [PATCH 17/20] revert Signed-off-by: Ruiyang Wang --- src/ray/gcs/gcs_client/gcs_client.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index 20823848880f9..45d498ff2d776 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -426,7 +426,7 @@ Status PythonGcsClient::GetAllJobInfo(int64_t timeout_ms, Status PythonGcsClient::GetAllResourceUsage(int64_t timeout_ms, std::string &serialized_reply) { grpc::ClientContext context; - PrepareContext(context, timeout_ms); + GrpcClientContextWithTimeoutMs(context, timeout_ms); rpc::GetAllResourceUsageRequest request; rpc::GetAllResourceUsageReply reply; @@ -520,7 +520,7 @@ Status PythonGcsClient::DrainNodes(const std::vector &node_ids, int64_t timeout_ms, std::vector &drained_node_ids) { grpc::ClientContext context; - PrepareContext(context, timeout_ms); + GrpcClientContextWithTimeoutMs(context, timeout_ms); rpc::DrainNodeRequest request; for (const std::string &node_id : node_ids) { From 24d0cc1362f1fe20ba14470aad50df820b87a395 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Sat, 12 Aug 2023 00:27:53 -0400 Subject: [PATCH 18/20] added error message Signed-off-by: Ruiyang Wang --- .../job-submission/ray-client.rst | 3 +++ python/ray/_private/parameter.py | 7 +++++++ python/ray/_private/utils.py | 16 ++++++++++++++-- python/ray/_private/worker.py | 2 +- python/ray/client_builder.py | 11 ++++++++++- 5 files changed, 35 insertions(+), 4 deletions(-) diff --git a/doc/source/cluster/running-applications/job-submission/ray-client.rst b/doc/source/cluster/running-applications/job-submission/ray-client.rst index 7a43afc91dbfb..a153e682f4476 100644 --- a/doc/source/cluster/running-applications/job-submission/ray-client.rst +++ b/doc/source/cluster/running-applications/job-submission/ray-client.rst @@ -3,6 +3,9 @@ Ray Client ========== +.. warning:: + Ray Client requires pip package `ray[client]`. If you installed the minimal Ray (e.g. `pip install ray`), please reinstall by executing `pip install ray[client]`. + **What is the Ray Client?** The Ray Client is an API that connects a Python script to a **remote** Ray cluster. Effectively, it allows you to leverage a remote Ray cluster just like you would with Ray running on your local machine. diff --git a/python/ray/_private/parameter.py b/python/ray/_private/parameter.py index d383f76370a45..1759ff5d2a526 100644 --- a/python/ray/_private/parameter.py +++ b/python/ray/_private/parameter.py @@ -6,8 +6,10 @@ import ray._private.ray_constants as ray_constants from ray._private.utils import ( validate_node_labels, + check_ray_client_dependencies_installed, ) + logger = logging.getLogger(__name__) @@ -396,6 +398,11 @@ def _check_usage(self): ) if self.ray_client_server_port is not None: + if not check_ray_client_dependencies_installed(): + raise ValueError( + "Ray Client requires pip package `ray[client]`. " + "If you installed the minimal Ray (e.g. `pip install ray`), " + "please reinstall by executing `pip install ray[client]`.") if ( self.ray_client_server_port < 1024 or self.ray_client_server_port > 65535 diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 000f1520ce97c..aa20945de9500 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -268,7 +268,7 @@ def hex_to_binary(hex_identifier): # once we separate `WorkerID` from `UniqueID`. def compute_job_id_from_driver(driver_id): assert isinstance(driver_id, ray.WorkerID) - return ray.JobID(driver_id.binary()[0 : ray.JobID.size()]) + return ray.JobID(driver_id.binary()[0: ray.JobID.size()]) def compute_driver_id_from_job(job_id): @@ -1191,7 +1191,7 @@ def import_attr(full_path: str): else: last_period_idx = full_path.rfind(".") module_name = full_path[:last_period_idx] - attr_name = full_path[last_period_idx + 1 :] + attr_name = full_path[last_period_idx + 1:] module = importlib.import_module(module_name) return getattr(module, attr_name) @@ -1352,6 +1352,18 @@ def check_dashboard_dependencies_installed() -> bool: return False +def check_ray_client_dependencies_installed() -> bool: + """Returns True if Ray Client dependencies are installed. + + See documents for check_dashboard_dependencies_installed. + """ + try: + import grpc # noqa: F401 + return True + except ImportError: + return False + + connect_error = ( "Unable to connect to GCS (ray head) at {}. " "Check that (1) Ray with matching version started " diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 6b053b43e49db..79f5b1ab54c55 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1153,7 +1153,7 @@ def init( To connect to an existing remote cluster, use this as follows (substituting in the appropriate address). Note the addition of "ray://" at the beginning - of the address. + of the address. This requires `ray[client]`. .. testcode:: :skipif: True diff --git a/python/ray/client_builder.py b/python/ray/client_builder.py index d5cbc03142e10..b4b211b1bc8ae 100644 --- a/python/ray/client_builder.py +++ b/python/ray/client_builder.py @@ -14,7 +14,10 @@ RAY_NAMESPACE_ENVIRONMENT_VARIABLE, RAY_RUNTIME_ENV_ENVIRONMENT_VARIABLE, ) -from ray._private.utils import split_address +from ray._private.utils import ( + split_address, + check_ray_client_dependencies_installed, +) from ray._private.worker import BaseContext from ray._private.worker import init as ray_driver_init from ray.job_config import JobConfig @@ -95,6 +98,12 @@ class ClientBuilder: """ def __init__(self, address: Optional[str]) -> None: + if not check_ray_client_dependencies_installed(): + raise ValueError( + "Ray Client requires pip package `ray[client]`. " + "If you installed the minimal Ray (e.g. `pip install ray`), " + "please reinstall by executing `pip install ray[client]`.") + self.address = address self._job_config = JobConfig() self._remote_init_kwargs = {} From ea58f05653a17582bf08bdd89eea19d0d923c73c Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Mon, 14 Aug 2023 12:36:23 -0400 Subject: [PATCH 19/20] check ray_client_server_port vs ray[client] configs Signed-off-by: Ruiyang Wang --- python/ray/_private/parameter.py | 12 ++++++++++-- python/ray/scripts/scripts.py | 5 +++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/python/ray/_private/parameter.py b/python/ray/_private/parameter.py index 339f1b05792b7..6e52bb9f4ba2e 100644 --- a/python/ray/_private/parameter.py +++ b/python/ray/_private/parameter.py @@ -400,8 +400,16 @@ def _check_usage(self): "max_worker_port must be higher than min_worker_port." ) - if self.ray_client_server_port is not None: - if not check_ray_client_dependencies_installed(): + # no client, no port -> ok + # no port, has client -> default to 10001 + # has port, no client -> value error + # has port, has client -> ok, check port validity + has_ray_client = check_ray_client_dependencies_installed() + if self.ray_client_server_port is None: + if has_ray_client: + self.ray_client_server_port = 10001 + else: + if not has_ray_client: raise ValueError( "Ray Client requires pip package `ray[client]`. " "If you installed the minimal Ray (e.g. `pip install ray`), " diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 448ea899f11ea..f2ae2b2e2b824 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -345,8 +345,9 @@ def debug(address): "--ray-client-server-port", required=False, type=int, - default=10001, - help="the port number the ray client server binds on, default to 10001.", + default=None, + help="the port number the ray client server binds on, default to 10001, " + "or None if ray[client] is not installed.", ) @click.option( "--memory", From 2048cb0eb8a122ec3a57e3998bbbeab1aa5af992 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Mon, 14 Aug 2023 15:50:05 -0400 Subject: [PATCH 20/20] add default port in scripts, not in parameters which messes up with unit tests. Signed-off-by: Ruiyang Wang --- python/ray/_private/parameter.py | 13 ++----------- python/ray/scripts/scripts.py | 10 ++++++++++ 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/python/ray/_private/parameter.py b/python/ray/_private/parameter.py index 6e52bb9f4ba2e..f6d91ee0f1761 100644 --- a/python/ray/_private/parameter.py +++ b/python/ray/_private/parameter.py @@ -399,17 +399,8 @@ def _check_usage(self): raise ValueError( "max_worker_port must be higher than min_worker_port." ) - - # no client, no port -> ok - # no port, has client -> default to 10001 - # has port, no client -> value error - # has port, has client -> ok, check port validity - has_ray_client = check_ray_client_dependencies_installed() - if self.ray_client_server_port is None: - if has_ray_client: - self.ray_client_server_port = 10001 - else: - if not has_ray_client: + if self.ray_client_server_port is not None: + if not check_ray_client_dependencies_installed(): raise ValueError( "Ray Client requires pip package `ray[client]`. " "If you installed the minimal Ray (e.g. `pip install ray`), " diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index f2ae2b2e2b824..8b40537684d2c 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -22,6 +22,7 @@ import ray._private.ray_constants as ray_constants import ray._private.services as services from ray._private.utils import ( + check_ray_client_dependencies_installed, parse_resources_json, parse_node_labels_json, ) @@ -624,6 +625,15 @@ def start( temp_dir = None redirect_output = None if not no_redirect_output else True + + # no client, no port -> ok + # no port, has client -> default to 10001 + # has port, no client -> value error + # has port, has client -> ok, check port validity + has_ray_client = check_ray_client_dependencies_installed() + if has_ray_client and ray_client_server_port is None: + ray_client_server_port = 10001 + ray_params = ray._private.parameter.RayParams( node_ip_address=node_ip_address, node_name=node_name if node_name else node_ip_address,