From ef2a0402a3a147cb127d7d15fd48b9411db25c64 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> Date: Mon, 14 Aug 2023 22:42:32 -0700 Subject: [PATCH] Remove grpcio from minimal dependency (#38243) Removes grpcio from ray dependency while adds it to ray[default] dependency. Removed use of grpc client: gcs_node_info_stub.DrainNode in autoscaler. Moved it into raylet.pyx. Removed use of grpc client: gcs_node_resources_stub.GetAllResourceUsage in autoscaler. Moved it into raylet.pyx. Removed use of grpc status code. Moved it into raylet.pyx. Removed Ray Client tests in test_runtime_env_ray_minimal.py --------- Signed-off-by: Ruiyang Wang --- dashboard/agent.py | 88 ++++++++++--------- dashboard/optional_deps.py | 1 + dashboard/utils.py | 4 +- .../job-submission/ray-client.rst | 3 + python/ray/_private/gcs_utils.py | 68 -------------- python/ray/_private/parameter.py | 8 +- python/ray/_private/test_utils.py | 13 ++- python/ray/_private/tls_utils.py | 3 +- python/ray/_private/utils.py | 16 +++- python/ray/_private/worker.py | 2 +- python/ray/_raylet.pyx | 30 +++++++ python/ray/autoscaler/_private/autoscaler.py | 42 +++------ python/ray/autoscaler/_private/monitor.py | 26 ++---- python/ray/client_builder.py | 11 ++- python/ray/includes/common.pxd | 8 +- python/ray/scripts/scripts.py | 15 +++- python/ray/tests/conftest.py | 6 +- python/ray/tests/test_basic.py | 12 ++- .../ray/tests/test_runtime_env_ray_minimal.py | 35 -------- python/ray/tests/test_usage_stats.py | 2 + python/ray/tune/experiment/experiment.py | 4 +- python/setup.py | 4 +- src/ray/gcs/gcs_client/gcs_client.cc | 49 +++++++++++ src/ray/gcs/gcs_client/gcs_client.h | 6 +- 24 files changed, 239 insertions(+), 217 deletions(-) diff --git a/dashboard/agent.py b/dashboard/agent.py index d56047b2e3884..5632026b1605c 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -16,7 +16,6 @@ import ray.dashboard.consts as dashboard_consts import ray.dashboard.utils as dashboard_utils from ray.dashboard.consts import _PARENT_DEATH_THREASHOLD -from ray._private.gcs_pubsub import GcsAioPublisher from ray._raylet import GcsClient from ray._private.gcs_utils import GcsAioClient from ray._private.ray_logging import ( @@ -31,12 +30,6 @@ # Import psutil after ray so the packaged version is used. import psutil -try: - from grpc import aio as aiogrpc -except ImportError: - from grpc.experimental import aio as aiogrpc - - # Publishes at most this number of lines of Raylet logs, when the Raylet dies # unexpectedly. _RAYLET_LOG_MAX_PUBLISH_LINES = 20 @@ -52,19 +45,6 @@ logger = logging.getLogger(__name__) -# We would want to suppress deprecating warnings from aiogrpc library -# with the usage of asyncio.get_event_loop() in python version >=3.10 -# This could be removed once https://github.com/grpc/grpc/issues/32526 -# is released, and we used higher versions of grpcio that that. -if sys.version_info.major >= 3 and sys.version_info.minor >= 10: - import warnings - - with warnings.catch_warnings(): - warnings.simplefilter("ignore", category=DeprecationWarning) - aiogrpc.init_grpc_aio() -else: - aiogrpc.init_grpc_aio() - class DashboardAgent: def __init__( @@ -117,13 +97,44 @@ def __init__( assert self.ppid > 0 logger.info("Parent pid is %s", self.ppid) - # Setup raylet channel - options = ray_constants.GLOBAL_GRPC_OPTIONS - self.aiogrpc_raylet_channel = ray._private.utils.init_grpc_channel( - f"{self.ip}:{self.node_manager_port}", options, asynchronous=True - ) + # grpc server is None in mininal. + self.server = None + # http_server is None in minimal. + self.http_server = None + + # Used by the agent and sub-modules. + # TODO(architkulkarni): Remove gcs_client once the agent exclusively uses + # gcs_aio_client and not gcs_client. + self.gcs_client = GcsClient(address=self.gcs_address) + _initialize_internal_kv(self.gcs_client) + assert _internal_kv_initialized() + self.gcs_aio_client = GcsAioClient(address=self.gcs_address) + + if not self.minimal: + self._init_non_minimal() + + def _init_non_minimal(self): + from ray._private.gcs_pubsub import GcsAioPublisher + self.aio_publisher = GcsAioPublisher(address=self.gcs_address) + + try: + from grpc import aio as aiogrpc + except ImportError: + from grpc.experimental import aio as aiogrpc + + # We would want to suppress deprecating warnings from aiogrpc library + # with the usage of asyncio.get_event_loop() in python version >=3.10 + # This could be removed once https://github.com/grpc/grpc/issues/32526 + # is released, and we used higher versions of grpcio that that. + if sys.version_info.major >= 3 and sys.version_info.minor >= 10: + import warnings + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=DeprecationWarning) + aiogrpc.init_grpc_aio() + else: + aiogrpc.init_grpc_aio() - # Setup grpc server self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0),)) grpc_ip = "127.0.0.1" if self.ip == "127.0.0.1" else "0.0.0.0" try: @@ -143,19 +154,6 @@ def __init__( else: logger.info("Dashboard agent grpc address: %s:%s", grpc_ip, self.grpc_port) - # If the agent is started as non-minimal version, http server should - # be configured to communicate with the dashboard in a head node. - self.http_server = None - - # Used by the agent and sub-modules. - # TODO(architkulkarni): Remove gcs_client once the agent exclusively uses - # gcs_aio_client and not gcs_client. - self.gcs_client = GcsClient(address=self.gcs_address) - _initialize_internal_kv(self.gcs_client) - assert _internal_kv_initialized() - self.gcs_aio_client = GcsAioClient(address=self.gcs_address) - self.publisher = GcsAioPublisher(address=self.gcs_address) - async def _configure_http_server(self, modules): from ray.dashboard.http_server_agent import HttpServerAgent @@ -180,9 +178,16 @@ def _load_modules(self): @property def http_session(self): - assert self.http_server, "Accessing unsupported API in a minimal ray." + assert self.http_server, \ + "Accessing unsupported API (HttpServerAgent) in a minimal ray." return self.http_server.http_session + @property + def publisher(self): + assert self.aio_publisher, \ + "Accessing unsupported API (GcsAioPublisher) in a minimal ray." + return self.aio_publisher + async def run(self): async def _check_parent(): """Check if raylet is dead and fate-share if it is.""" @@ -311,9 +316,10 @@ async def _check_parent(): # TODO: Use async version if performance is an issue # -1 should indicate that http server is not started. http_port = -1 if not self.http_server else self.http_server.http_port + grpc_port = -1 if not self.server else self.grpc_port await self.gcs_aio_client.internal_kv_put( f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}{self.node_id}".encode(), - json.dumps([http_port, self.grpc_port]).encode(), + json.dumps([http_port, grpc_port]).encode(), True, namespace=ray_constants.KV_NAMESPACE_DASHBOARD, ) diff --git a/dashboard/optional_deps.py b/dashboard/optional_deps.py index cd792190df788..b46a6fdd22129 100644 --- a/dashboard/optional_deps.py +++ b/dashboard/optional_deps.py @@ -16,3 +16,4 @@ from aiohttp.typedefs import PathLike # noqa: F401 from aiohttp.web import RouteDef # noqa: F401 import pydantic # noqa: F401 +import grpc # noqa: F401 diff --git a/dashboard/utils.py b/dashboard/utils.py index 6434ce5c1b606..2f22f4af82ba6 100644 --- a/dashboard/utils.py +++ b/dashboard/utils.py @@ -50,7 +50,7 @@ async def run(self, server): """ Run the module in an asyncio loop. An agent module can provide servicers to the server. - :param server: Asyncio GRPC server. + :param server: Asyncio GRPC server, or None if ray is minimal. """ @staticmethod @@ -79,7 +79,7 @@ async def run(self, server): """ Run the module in an asyncio loop. A head module can provide servicers to the server. - :param server: Asyncio GRPC server. + :param server: Asyncio GRPC server, or None if ray is minimal. """ @staticmethod diff --git a/doc/source/cluster/running-applications/job-submission/ray-client.rst b/doc/source/cluster/running-applications/job-submission/ray-client.rst index 7a43afc91dbfb..a153e682f4476 100644 --- a/doc/source/cluster/running-applications/job-submission/ray-client.rst +++ b/doc/source/cluster/running-applications/job-submission/ray-client.rst @@ -3,6 +3,9 @@ Ray Client ========== +.. warning:: + Ray Client requires pip package `ray[client]`. If you installed the minimal Ray (e.g. `pip install ray`), please reinstall by executing `pip install ray[client]`. + **What is the Ray Client?** The Ray Client is an API that connects a Python script to a **remote** Ray cluster. Effectively, it allows you to leverage a remote Ray cluster just like you would with Ray running on your local machine. diff --git a/python/ray/_private/gcs_utils.py b/python/ray/_private/gcs_utils.py index 293d21965ebd8..5c406db0f7592 100644 --- a/python/ray/_private/gcs_utils.py +++ b/python/ray/_private/gcs_utils.py @@ -1,13 +1,6 @@ -import enum import logging -import inspect -import os -import asyncio -from functools import wraps from typing import Optional -import grpc - from ray._private import ray_constants import ray._private.gcs_aio_client @@ -98,60 +91,6 @@ def create_gcs_channel(address: str, aio=False): return init_grpc_channel(address, options=_GRPC_OPTIONS, asynchronous=aio) -# This global variable is used for testing only -_called_freq = {} - - -def _auto_reconnect(f): - # This is for testing to count the frequence - # of gcs call - if inspect.iscoroutinefunction(f): - - @wraps(f) - async def wrapper(self, *args, **kwargs): - if "TEST_RAY_COLLECT_KV_FREQUENCY" in os.environ: - global _called_freq - name = f.__name__ - if name not in _called_freq: - _called_freq[name] = 0 - _called_freq[name] += 1 - - remaining_retry = self._nums_reconnect_retry - while True: - try: - return await f(self, *args, **kwargs) - except grpc.RpcError as e: - if e.code() in ( - grpc.StatusCode.UNAVAILABLE, - grpc.StatusCode.UNKNOWN, - ): - if remaining_retry <= 0: - logger.error( - "Failed to connect to GCS. Please check" - " `gcs_server.out` for more details." - ) - raise - logger.debug( - "Failed to send request to gcs, reconnecting. " f"Error {e}" - ) - try: - self._connect() - except Exception: - logger.error(f"Connecting to gcs failed. Error {e}") - await asyncio.sleep(1) - remaining_retry -= 1 - continue - raise - - return wrapper - else: - - raise NotImplementedError( - "This code moved to Cython, see " - "https://github.com/ray-project/ray/pull/33769" - ) - - class GcsChannel: def __init__(self, gcs_address: Optional[str] = None, aio: bool = False): self._gcs_address = gcs_address @@ -171,13 +110,6 @@ def channel(self): return self._channel -class GcsCode(enum.IntEnum): - # corresponding to ray/src/ray/common/status.h - OK = 0 - NotFound = 17 - GrpcUnavailable = 26 - - # re-export GcsAioClient = ray._private.gcs_aio_client.GcsAioClient diff --git a/python/ray/_private/parameter.py b/python/ray/_private/parameter.py index 538fe09a505d0..f6d91ee0f1761 100644 --- a/python/ray/_private/parameter.py +++ b/python/ray/_private/parameter.py @@ -6,8 +6,10 @@ import ray._private.ray_constants as ray_constants from ray._private.utils import ( validate_node_labels, + check_ray_client_dependencies_installed, ) + logger = logging.getLogger(__name__) @@ -397,8 +399,12 @@ def _check_usage(self): raise ValueError( "max_worker_port must be higher than min_worker_port." ) - if self.ray_client_server_port is not None: + if not check_ray_client_dependencies_installed(): + raise ValueError( + "Ray Client requires pip package `ray[client]`. " + "If you installed the minimal Ray (e.g. `pip install ray`), " + "please reinstall by executing `pip install ray[client]`.") if ( self.ray_client_server_port < 1024 or self.ray_client_server_port > 65535 diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index f6d054a76f15b..d036efde29596 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -24,7 +24,6 @@ import requests from ray._raylet import Config -import grpc import numpy as np import psutil # We must import psutil after ray because we bundle it with ray. from ray._private import ( @@ -32,7 +31,6 @@ ) from ray._private.worker import RayContext import yaml -from grpc._channel import _InactiveRpcError import ray import ray._private.gcs_utils as gcs_utils @@ -45,9 +43,7 @@ from ray.core.generated import ( gcs_pb2, node_manager_pb2, - node_manager_pb2_grpc, gcs_service_pb2, - gcs_service_pb2_grpc, ) from ray.util.queue import Empty, Queue, _QueueActor from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy @@ -1474,6 +1470,9 @@ async def get_total_killed_nodes(self): return self.killed_nodes def _kill_raylet(self, ip, port, graceful=False): + import grpc + from grpc._channel import _InactiveRpcError + from ray.core.generated import node_manager_pb2_grpc raylet_address = f"{ip}:{port}" channel = grpc.insecure_channel(raylet_address) stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) @@ -1694,6 +1693,8 @@ def wandb_setup_api_key_hook(): # Get node stats from node manager. def get_node_stats(raylet, num_retry=5, timeout=2): + import grpc + from ray.core.generated import node_manager_pb2_grpc raylet_address = f'{raylet["NodeManagerAddress"]}:{raylet["NodeManagerPort"]}' channel = ray._private.utils.init_grpc_channel(raylet_address) stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) @@ -1711,6 +1712,7 @@ def get_node_stats(raylet, num_retry=5, timeout=2): # Gets resource usage assuming gcs is local. def get_resource_usage(gcs_address, timeout=10): + from ray.core.generated import gcs_service_pb2_grpc if not gcs_address: gcs_address = ray.worker._global_node.gcs_address @@ -1739,6 +1741,9 @@ def get_load_metrics_report(webui_url): # Send a RPC to the raylet to have it self-destruct its process. def kill_raylet(raylet, graceful=False): + import grpc + from grpc._channel import _InactiveRpcError + from ray.core.generated import node_manager_pb2_grpc raylet_address = f'{raylet["NodeManagerAddress"]}:{raylet["NodeManagerPort"]}' channel = grpc.insecure_channel(raylet_address) stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) diff --git a/python/ray/_private/tls_utils.py b/python/ray/_private/tls_utils.py index 0251b36ba56bc..cfdfde84c1be5 100644 --- a/python/ray/_private/tls_utils.py +++ b/python/ray/_private/tls_utils.py @@ -2,8 +2,6 @@ import os import socket -import grpc - def generate_self_signed_tls_certs(): """Create self-signed key/cert pair for testing. @@ -68,6 +66,7 @@ def generate_self_signed_tls_certs(): def add_port_to_grpc_server(server, address): + import grpc if os.environ.get("RAY_USE_TLS", "0").lower() in ("1", "true"): server_cert_chain, private_key, ca_cert = load_certs_from_env() credentials = grpc.ssl_server_credentials( diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 000f1520ce97c..aa20945de9500 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -268,7 +268,7 @@ def hex_to_binary(hex_identifier): # once we separate `WorkerID` from `UniqueID`. def compute_job_id_from_driver(driver_id): assert isinstance(driver_id, ray.WorkerID) - return ray.JobID(driver_id.binary()[0 : ray.JobID.size()]) + return ray.JobID(driver_id.binary()[0: ray.JobID.size()]) def compute_driver_id_from_job(job_id): @@ -1191,7 +1191,7 @@ def import_attr(full_path: str): else: last_period_idx = full_path.rfind(".") module_name = full_path[:last_period_idx] - attr_name = full_path[last_period_idx + 1 :] + attr_name = full_path[last_period_idx + 1:] module = importlib.import_module(module_name) return getattr(module, attr_name) @@ -1352,6 +1352,18 @@ def check_dashboard_dependencies_installed() -> bool: return False +def check_ray_client_dependencies_installed() -> bool: + """Returns True if Ray Client dependencies are installed. + + See documents for check_dashboard_dependencies_installed. + """ + try: + import grpc # noqa: F401 + return True + except ImportError: + return False + + connect_error = ( "Unable to connect to GCS (ray head) at {}. " "Check that (1) Ray with matching version started " diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 045b4182f8536..32cf18a575db8 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1153,7 +1153,7 @@ def init( To connect to an existing remote cluster, use this as follows (substituting in the appropriate address). Note the addition of "ray://" at the beginning - of the address. + of the address. This requires `ray[client]`. .. testcode:: :skipif: True diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 2ff4ec49df0ba..56325bac702e5 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -184,6 +184,7 @@ import ray._private.ray_constants as ray_constants import ray.cloudpickle as ray_pickle from ray.core.generated.common_pb2 import ActorDiedErrorContext from ray.core.generated.gcs_pb2 import JobTableData +from ray.core.generated.gcs_service_pb2 import GetAllResourceUsageReply from ray._private.async_compat import ( sync_to_async, get_new_event_loop, @@ -215,6 +216,7 @@ OPTIMIZED = __OPTIMIZE__ GRPC_STATUS_CODE_UNAVAILABLE = CGrpcStatusCode.UNAVAILABLE GRPC_STATUS_CODE_UNKNOWN = CGrpcStatusCode.UNKNOWN GRPC_STATUS_CODE_DEADLINE_EXCEEDED = CGrpcStatusCode.DEADLINE_EXCEEDED +GRPC_STATUS_CODE_RESOURCE_EXHAUSTED = CGrpcStatusCode.RESOURCE_EXHAUSTED logger = logging.getLogger(__name__) @@ -2538,6 +2540,18 @@ cdef class GcsClient: result[job_info.job_id] = job_info return result + @_auto_reconnect + def get_all_resource_usage(self, timeout=None) -> GetAllResourceUsageReply: + cdef: + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + c_string serialized_reply + + with nogil: + check_status(self.inner.get().GetAllResourceUsage(timeout_ms, serialized_reply)) + + reply = GetAllResourceUsageReply() + reply.ParseFromString(serialized_reply) + return reply ######################################################## # Interface for rpc::autoscaler::AutoscalerStateService ######################################################## @@ -2586,6 +2600,22 @@ cdef class GcsClient: return is_accepted + @_auto_reconnect + def drain_nodes(self, node_ids, timeout=None): + cdef: + c_vector[c_string] c_node_ids + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + c_vector[c_string] c_drained_node_ids + for node_id in node_ids: + c_node_ids.push_back(node_id) + with nogil: + check_status(self.inner.get().DrainNodes( + c_node_ids, timeout_ms, c_drained_node_ids)) + result = [] + for drain_node_id in c_drained_node_ids: + result.append(drain_node_id) + return result + ############################################################# # Interface for rpc::autoscaler::AutoscalerStateService ends ############################################################# diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 3f63b84fbb60e..c6ba033797e3f 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -12,7 +12,6 @@ from enum import Enum from typing import Any, Callable, Dict, FrozenSet, List, Optional, Set, Tuple, Union -import grpc import yaml from ray.autoscaler._private.constants import ( @@ -76,7 +75,8 @@ TAG_RAY_RUNTIME_CONFIG, TAG_RAY_USER_NODE_TYPE, ) -from ray.core.generated import gcs_service_pb2, gcs_service_pb2_grpc +from ray._raylet import GcsClient +from ray.exceptions import RpcError logger = logging.getLogger(__name__) @@ -184,7 +184,7 @@ def __init__( # TODO(ekl): require config reader to be a callable always. config_reader: Union[str, Callable[[], dict]], load_metrics: LoadMetrics, - gcs_node_info_stub: gcs_service_pb2_grpc.NodeInfoGcsServiceStub, + gcs_client: GcsClient, session_name: Optional[str] = None, max_launch_batch: int = AUTOSCALER_MAX_LAUNCH_BATCH, max_concurrent_launches: int = AUTOSCALER_MAX_CONCURRENT_LAUNCHES, @@ -214,8 +214,8 @@ def __init__( prefix_cluster_info: Whether to add the cluster name to info strs. event_summarizer: Utility to consolidate duplicated messages. prom_metrics: Prometheus metrics for autoscaler-related operations. - gcs_node_info_stub: Stub for interactions with Ray nodes via gRPC - request to the GCS. Used to drain nodes before termination. + gcs_client: client for interactions with the GCS. Used to drain nodes + before termination. """ if isinstance(config_reader, str): @@ -355,7 +355,7 @@ def read_fn(): for remote, local in self.config["file_mounts"].items() } - self.gcs_node_info_stub = gcs_node_info_stub + self.gcs_client = gcs_client for local_path in self.config["file_mounts"].values(): assert os.path.exists(local_path) @@ -675,39 +675,21 @@ def drain_nodes_via_gcs(self, provider_node_ids_to_drain: List[NodeID]): logger.info(f"Draining {len(raylet_ids_to_drain)} raylet(s).") try: - request = gcs_service_pb2.DrainNodeRequest( - drain_node_data=[ - gcs_service_pb2.DrainNodeData(node_id=raylet_id) - for raylet_id in raylet_ids_to_drain - ] - ) - # A successful response indicates that the GCS has marked the # desired nodes as "drained." The cloud provider can then terminate # the nodes without the GCS printing an error. - response = self.gcs_node_info_stub.DrainNode(request, timeout=5) - # Check if we succeeded in draining all of the intended nodes by # looking at the RPC response. - drained_raylet_ids = { - status_item.node_id for status_item in response.drain_node_status - } + drained_raylet_ids = set(self.gcs_client.drain_nodes( + raylet_ids_to_drain, timeout=5)) failed_to_drain = raylet_ids_to_drain - drained_raylet_ids if failed_to_drain: self.prom_metrics.drain_node_exceptions.inc() logger.error(f"Failed to drain {len(failed_to_drain)} raylet(s).") - - # If we get a gRPC error with an UNIMPLEMENTED code, fail silently. - # This error indicates that the GCS is using Ray version < 1.8.0, - # for which DrainNode is not implemented. - except grpc.RpcError as e: - # If the code is UNIMPLEMENTED, pass. - if e.code() == grpc.StatusCode.UNIMPLEMENTED: - pass - # Otherwise, it's a plane old gRPC error and we should log it. - else: - self.prom_metrics.drain_node_exceptions.inc() - logger.exception("Failed to drain Ray nodes. Traceback follows.") + except RpcError: + # Otherwise, it's a plane old RPC error and we should log it. + self.prom_metrics.drain_node_exceptions.inc() + logger.exception("Failed to drain Ray nodes. Traceback follows.") except Exception: # We don't need to interrupt the autoscaler update with an # exception, but we should log what went wrong and record the diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index 34db56f511f9f..d01193bf5b816 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -29,7 +29,7 @@ from ray.autoscaler._private.load_metrics import LoadMetrics from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics from ray.autoscaler._private.util import format_readonly_node_type -from ray.core.generated import gcs_pb2, gcs_service_pb2, gcs_service_pb2_grpc +from ray.core.generated import gcs_pb2 from ray.core.generated.event_pb2 import Event as RayEvent from ray.experimental.internal_kv import ( _initialize_internal_kv, @@ -140,31 +140,22 @@ def __init__( retry_on_failure: bool = True, ): self.gcs_address = address - options = ray_constants.GLOBAL_GRPC_OPTIONS - gcs_channel = ray._private.utils.init_grpc_channel(self.gcs_address, options) - # TODO: Use gcs client for this - self.gcs_node_resources_stub = ( - gcs_service_pb2_grpc.NodeResourceInfoGcsServiceStub(gcs_channel) - ) - self.gcs_node_info_stub = gcs_service_pb2_grpc.NodeInfoGcsServiceStub( - gcs_channel - ) worker = ray._private.worker.global_worker # TODO: eventually plumb ClusterID through to here - gcs_client = GcsClient(address=self.gcs_address) + self.gcs_client = GcsClient(address=self.gcs_address) if monitor_ip: monitor_addr = f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}" - gcs_client.internal_kv_put( + self.gcs_client.internal_kv_put( b"AutoscalerMetricsAddress", monitor_addr.encode(), True, None ) - _initialize_internal_kv(gcs_client) + _initialize_internal_kv(self.gcs_client) if monitor_ip: monitor_addr = f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}" - gcs_client.internal_kv_put( + self.gcs_client.internal_kv_put( b"AutoscalerMetricsAddress", monitor_addr.encode(), True, None ) - self._session_name = self.get_session_name(gcs_client) + self._session_name = self.get_session_name(self.gcs_client) logger.info(f"session_name: {self._session_name}") worker.mode = 0 head_node_ip = self.gcs_address.split(":")[0] @@ -240,7 +231,7 @@ def get_latest_readonly_config(): self.autoscaler = StandardAutoscaler( autoscaling_config, self.load_metrics, - self.gcs_node_info_stub, + self.gcs_client, self._session_name, prefix_cluster_info=self.prefix_cluster_info, event_summarizer=self.event_summarizer, @@ -250,8 +241,7 @@ def get_latest_readonly_config(): def update_load_metrics(self): """Fetches resource usage data from GCS and updates load metrics.""" - request = gcs_service_pb2.GetAllResourceUsageRequest() - response = self.gcs_node_resources_stub.GetAllResourceUsage(request, timeout=60) + response = self.gcs_client.get_all_resource_usage(timeout=60) resources_batch_data = response.resource_usage_data log_resource_batch_data_if_desired(resources_batch_data) diff --git a/python/ray/client_builder.py b/python/ray/client_builder.py index d5cbc03142e10..b4b211b1bc8ae 100644 --- a/python/ray/client_builder.py +++ b/python/ray/client_builder.py @@ -14,7 +14,10 @@ RAY_NAMESPACE_ENVIRONMENT_VARIABLE, RAY_RUNTIME_ENV_ENVIRONMENT_VARIABLE, ) -from ray._private.utils import split_address +from ray._private.utils import ( + split_address, + check_ray_client_dependencies_installed, +) from ray._private.worker import BaseContext from ray._private.worker import init as ray_driver_init from ray.job_config import JobConfig @@ -95,6 +98,12 @@ class ClientBuilder: """ def __init__(self, address: Optional[str]) -> None: + if not check_ray_client_dependencies_installed(): + raise ValueError( + "Ray Client requires pip package `ray[client]`. " + "If you installed the minimal Ray (e.g. `pip install ray`), " + "please reinstall by executing `pip install ray[client]`.") + self.address = address self._job_config = JobConfig() self._remote_init_kwargs = {} diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 1f8c25ee525a3..0d12508e68772 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -361,6 +361,7 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil: UNAVAILABLE "grpc::StatusCode::UNAVAILABLE", UNKNOWN "grpc::StatusCode::UNKNOWN", DEADLINE_EXCEEDED "grpc::StatusCode::DEADLINE_EXCEEDED", + RESOURCE_EXHAUSTED "grpc::StatusCode::RESOURCE_EXHAUSTED" cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions": CGcsClientOptions(const c_string &gcs_address) @@ -394,13 +395,14 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil: CRayStatus InternalKVExists( const c_string &ns, const c_string &key, int64_t timeout_ms, c_bool &exists) - CRayStatus PinRuntimeEnvUri( const c_string &uri, int expiration_s, int64_t timeout_ms) CRayStatus GetAllNodeInfo( int64_t timeout_ms, c_vector[CGcsNodeInfo]& result) CRayStatus GetAllJobInfo( int64_t timeout_ms, c_vector[CJobTableData]& result) + CRayStatus GetAllResourceUsage( + int64_t timeout_ms, c_string& serialized_reply) CRayStatus RequestClusterResourceConstraint( int64_t timeout_ms, const c_vector[unordered_map[c_string, double]] &bundles, @@ -415,6 +417,10 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil: const c_string &reason_message, int64_t timeout_ms, c_bool &is_accepted) + CRayStatus DrainNodes( + const c_vector[c_string]& node_ids, + int64_t timeout_ms, + c_vector[c_string]& drained_node_ids) cdef extern from "ray/gcs/gcs_client/gcs_client.h" namespace "ray::gcs" nogil: unordered_map[c_string, double] PythonGetResourcesTotal( diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 448ea899f11ea..8b40537684d2c 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -22,6 +22,7 @@ import ray._private.ray_constants as ray_constants import ray._private.services as services from ray._private.utils import ( + check_ray_client_dependencies_installed, parse_resources_json, parse_node_labels_json, ) @@ -345,8 +346,9 @@ def debug(address): "--ray-client-server-port", required=False, type=int, - default=10001, - help="the port number the ray client server binds on, default to 10001.", + default=None, + help="the port number the ray client server binds on, default to 10001, " + "or None if ray[client] is not installed.", ) @click.option( "--memory", @@ -623,6 +625,15 @@ def start( temp_dir = None redirect_output = None if not no_redirect_output else True + + # no client, no port -> ok + # no port, has client -> default to 10001 + # has port, no client -> value error + # has port, has client -> ok, check port validity + has_ray_client = check_ray_client_dependencies_installed() + if has_ray_client and ray_client_server_port is None: + ray_client_server_port = 10001 + ray_params = ray._private.parameter.RayParams( node_ip_address=node_ip_address, node_name=node_name if node_name else node_ip_address, diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 27af4a4abb676..1172b97b39bbc 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -20,7 +20,6 @@ import ray import ray._private.ray_constants as ray_constants -import ray.util.client.server.server as ray_client_server from ray._private.conftest_utils import set_override_dashboard_url # noqa: F401 from ray._private.runtime_env.pip import PipProcessor from ray._private.runtime_env.plugin_schema_manager import RuntimeEnvPluginSchemaManager @@ -629,6 +628,8 @@ def call_ray_start_with_external_redis(request): @pytest.fixture def init_and_serve(): + import ray.util.client.server.server as ray_client_server + server_handle, _ = ray_client_server.init_and_serve("localhost:50051") yield server_handle ray_client_server.shutdown_with_server(server_handle.grpc_server) @@ -649,6 +650,9 @@ def call_ray_stop_only(): def start_cluster(ray_start_cluster_enabled, request): assert request.param in {"ray_client", "no_ray_client"} use_ray_client: bool = request.param == "ray_client" + if os.environ.get("RAY_MINIMAL") == "1" and use_ray_client: + pytest.skip("Skipping due to we don't have ray client in minimal.") + cluster = ray_start_cluster_enabled cluster.add_node(num_cpus=4, dashboard_agent_listen_port=find_free_port()) if use_ray_client: diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index d7a9a7919e855..193a847d8b4da 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1101,8 +1101,10 @@ def f(): def test_import_ray_does_not_import_grpc(): # First unload grpc and ray - del sys.modules["grpc"] - del sys.modules["ray"] + if "grpc" in sys.modules: + del sys.modules["grpc"] + if "ray" in sys.modules: + del sys.modules["ray"] # Then import ray from scratch import ray # noqa: F401 @@ -1111,7 +1113,11 @@ def test_import_ray_does_not_import_grpc(): assert "grpc" not in sys.modules # Load grpc back so other tests will not be affected - import grpc # noqa: F401 + try: + import grpc # noqa: F401 + except ImportError: + # It's ok if we don't have grpc installed. + pass if __name__ == "__main__": diff --git a/python/ray/tests/test_runtime_env_ray_minimal.py b/python/ray/tests/test_runtime_env_ray_minimal.py index 2b3d68e51286e..4dc8c3ea0e225 100644 --- a/python/ray/tests/test_runtime_env_ray_minimal.py +++ b/python/ray/tests/test_runtime_env_ray_minimal.py @@ -36,23 +36,6 @@ def task(self): ray.get(a.task.remote()) -@pytest.mark.skipif( - sys.platform == "win32", reason="runtime_env unsupported on Windows." -) -@pytest.mark.skipif( - os.environ.get("RAY_MINIMAL") != "1", - reason="This test is only run in CI with a minimal Ray installation.", -) -@pytest.mark.parametrize( - "call_ray_start", - ["ray start --head --ray-client-server-port 25553 --port 0"], - indirect=True, -) -def test_ray_client_task_actor(call_ray_start): - ray.init("ray://localhost:25553") - _test_task_and_actor() - - @pytest.mark.skipif( sys.platform == "win32", reason="runtime_env unsupported on Windows." ) @@ -83,24 +66,6 @@ def f(): ray.get(f.remote()) -@pytest.mark.skipif( - sys.platform == "win32", reason="runtime_env unsupported on Windows." -) -@pytest.mark.skipif( - os.environ.get("RAY_MINIMAL") != "1", - reason="This test is only run in CI with a minimal Ray installation.", -) -@pytest.mark.parametrize( - "call_ray_start", - ["ray start --head --ray-client-server-port 25552 --port 0"], - indirect=True, -) -def test_ray_client_init(call_ray_start): - with pytest.raises(ConnectionAbortedError) as excinfo: - ray.init("ray://localhost:25552", runtime_env={"pip": ["requests"]}) - assert "install virtualenv" in str(excinfo.value) - - if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/python/ray/tests/test_usage_stats.py b/python/ray/tests/test_usage_stats.py index 53f962970a076..039ae53cd1148 100644 --- a/python/ray/tests/test_usage_stats.py +++ b/python/ray/tests/test_usage_stats.py @@ -125,6 +125,8 @@ def reset_ray_version_commit(): def test_get_extra_usage_tags_to_report( monkeypatch, call_ray_start, reset_usage_stats, ray_client, gcs_storage_type ): + if os.environ.get("RAY_MINIMAL") == "1" and ray_client: + pytest.skip("Skipping due to we don't have ray client in minimal.") with monkeypatch.context() as m: # Test a normal case. m.setenv("RAY_USAGE_STATS_EXTRA_TAGS", "key=val;key2=val2") diff --git a/python/ray/tune/experiment/experiment.py b/python/ray/tune/experiment/experiment.py index d6f92ce6e9186..e168952b53a81 100644 --- a/python/ray/tune/experiment/experiment.py +++ b/python/ray/tune/experiment/experiment.py @@ -2,7 +2,6 @@ import datetime import warnings from functools import partial -import grpc import logging import os from pathlib import Path @@ -22,6 +21,7 @@ TYPE_CHECKING, ) +import ray from ray.air import CheckpointConfig from ray.air._internal.uri_utils import URI from ray.exceptions import RpcError @@ -179,7 +179,7 @@ def __init__( try: self._run_identifier = Experiment.register_if_needed(run) except RpcError as e: - if e.rpc_code == grpc.StatusCode.RESOURCE_EXHAUSTED.value[0]: + if e.rpc_code == ray._raylet.GRPC_STATUS_CODE_RESOURCE_EXHAUSTED: raise TuneError( f"The Trainable/training function is too large for grpc resource " f"limit. Check that its definition is not implicitly capturing a " diff --git a/python/setup.py b/python/setup.py index 0e5160f3aba54..a8be879bf855b 100644 --- a/python/setup.py +++ b/python/setup.py @@ -256,6 +256,8 @@ def get_packages(self): "py-spy >= 0.2.0", "requests", "gpustat >= 1.0.0", # for windows + "grpcio >= 1.32.0; python_version < '3.10'", # noqa:E501 + "grpcio >= 1.42.0; python_version >= '3.10'", # noqa:E501 "opencensus", "pydantic < 2", # 2.0.0 brings breaking changes "prometheus_client >= 0.7.1", @@ -324,8 +326,6 @@ def get_packages(self): setup_spec.install_requires = [ "click >= 7.0", "filelock", - "grpcio >= 1.32.0; python_version < '3.10'", # noqa:E501 - "grpcio >= 1.42.0; python_version >= '3.10'", # noqa:E501 "jsonschema", "msgpack >= 1.0.0, < 2.0.0", "numpy >= 1.16; python_version < '3.9'", diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index 4fcc2fad191f8..5bf0c44a9399c 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -153,6 +153,7 @@ PythonGcsClient::PythonGcsClient(const GcsClientOptions &options) : options_(opt runtime_env_stub_ = rpc::RuntimeEnvGcsService::NewStub(channel_); node_info_stub_ = rpc::NodeInfoGcsService::NewStub(channel_); job_info_stub_ = rpc::JobInfoGcsService::NewStub(channel_); + node_resource_info_stub_ = rpc::NodeResourceInfoGcsService::NewStub(channel_); autoscaler_stub_ = rpc::autoscaler::AutoscalerStateService::NewStub(channel_); } @@ -455,6 +456,26 @@ Status PythonGcsClient::GetAllJobInfo(int64_t timeout_ms, return Status::RpcError(status.error_message(), status.error_code()); } +Status PythonGcsClient::GetAllResourceUsage(int64_t timeout_ms, + std::string &serialized_reply) { + grpc::ClientContext context; + PrepareContext(context, timeout_ms); + + rpc::GetAllResourceUsageRequest request; + rpc::GetAllResourceUsageReply reply; + + grpc::Status status = + node_resource_info_stub_->GetAllResourceUsage(&context, request, &reply); + if (status.ok()) { + if (reply.status().code() == static_cast(StatusCode::OK)) { + serialized_reply = reply.SerializeAsString(); + return Status::OK(); + } + return HandleGcsError(reply.status()); + } + return Status::RpcError(status.error_message(), status.error_code()); +} + Status PythonGcsClient::RequestClusterResourceConstraint( int64_t timeout_ms, const std::vector> &bundles, @@ -528,6 +549,34 @@ Status PythonGcsClient::DrainNode(const std::string &node_id, return Status::RpcError(status.error_message(), status.error_code()); } +Status PythonGcsClient::DrainNodes(const std::vector &node_ids, + int64_t timeout_ms, + std::vector &drained_node_ids) { + grpc::ClientContext context; + PrepareContext(context, timeout_ms); + + rpc::DrainNodeRequest request; + for (const std::string &node_id : node_ids) { + request.add_drain_node_data()->set_node_id(node_id); + } + + rpc::DrainNodeReply reply; + + grpc::Status status = node_info_stub_->DrainNode(&context, request, &reply); + if (status.ok()) { + if (reply.status().code() == static_cast(StatusCode::OK)) { + drained_node_ids.clear(); + drained_node_ids.reserve(reply.drain_node_status().size()); + for (const auto &node_status : reply.drain_node_status()) { + drained_node_ids.push_back(node_status.node_id()); + } + return Status::OK(); + } + return HandleGcsError(reply.status()); + } + return Status::RpcError(status.error_message(), status.error_code()); +} + std::unordered_map PythonGetResourcesTotal( const rpc::GcsNodeInfo &node_info) { return std::unordered_map(node_info.resources_total().begin(), diff --git a/src/ray/gcs/gcs_client/gcs_client.h b/src/ray/gcs/gcs_client/gcs_client.h index 5773a0229b0b8..e2d35e36f4c9f 100644 --- a/src/ray/gcs/gcs_client/gcs_client.h +++ b/src/ray/gcs/gcs_client/gcs_client.h @@ -229,7 +229,7 @@ class RAY_EXPORT PythonGcsClient { Status PinRuntimeEnvUri(const std::string &uri, int expiration_s, int64_t timeout_ms); Status GetAllNodeInfo(int64_t timeout_ms, std::vector &result); Status GetAllJobInfo(int64_t timeout_ms, std::vector &result); - + Status GetAllResourceUsage(int64_t timeout_ms, std::string &serialized_reply); // For rpc::autoscaler::AutoscalerStateService Status RequestClusterResourceConstraint( int64_t timeout_ms, @@ -241,6 +241,9 @@ class RAY_EXPORT PythonGcsClient { const std::string &reason_message, int64_t timeout_ms, bool &is_accepted); + Status DrainNodes(const std::vector &node_ids, + int64_t timeout_ms, + std::vector &drained_node_ids); const ClusterID &GetClusterId() const { return cluster_id_; } @@ -260,6 +263,7 @@ class RAY_EXPORT PythonGcsClient { std::unique_ptr kv_stub_; std::unique_ptr runtime_env_stub_; std::unique_ptr node_info_stub_; + std::unique_ptr node_resource_info_stub_; std::unique_ptr job_info_stub_; std::unique_ptr autoscaler_stub_; std::shared_ptr channel_;