From 3cf40243b4c4e92d8e70a768139f9ddcafd2a107 Mon Sep 17 00:00:00 2001 From: Dmitri Gekhtman <62982571+DmitriGekhtman@users.noreply.github.com> Date: Tue, 19 Jul 2022 16:25:47 -0700 Subject: [PATCH] Re-revert skipping version check in Ray status (#26741) * Revert "Revert "[KubeRay][Autoscaler][Core] Add a flag to disable ray status version check (#26584)" (#26597)" (Fixes test issue.) Signed-off-by: Rohan138 --- python/ray/_private/gcs_utils.py | 10 +++++++++- .../autoscaler/_private/kuberay/run_autoscaler.py | 12 +++++++++++- python/ray/scripts/scripts.py | 14 ++++++++++++-- python/ray/tests/test_advanced_4.py | 10 ++++++++++ 4 files changed, 42 insertions(+), 4 deletions(-) diff --git a/python/ray/_private/gcs_utils.py b/python/ray/_private/gcs_utils.py index b3bcd75e5839e..2891ad1c02859 100644 --- a/python/ray/_private/gcs_utils.py +++ b/python/ray/_private/gcs_utils.py @@ -1,6 +1,7 @@ import enum import logging import time +import traceback import inspect import asyncio from functools import wraps @@ -98,13 +99,15 @@ def create_gcs_channel(address: str, aio=False): return init_grpc_channel(address, options=_GRPC_OPTIONS, asynchronous=aio) -def check_health(address: str, timeout=2) -> bool: +def check_health(address: str, timeout=2, skip_version_check=False) -> bool: """Checks Ray cluster health, before / without actually connecting to the cluster via ray.init(). Args: address: Ray cluster / GCS address string, e.g. ip:port. timeout: request timeout. + skip_version_check: If True, will skip comparision of GCS Ray version with local + Ray version. If False (default), will raise exception on mismatch. Returns: Returns True if the cluster is running and has matching Ray version. Returns False if no service is running. @@ -116,9 +119,14 @@ def check_health(address: str, timeout=2) -> bool: stub = gcs_service_pb2_grpc.HeartbeatInfoGcsServiceStub(channel) resp = stub.CheckAlive(req, timeout=timeout) except grpc.RpcError: + traceback.print_exc() return False if resp.status.code != GcsCode.OK: raise RuntimeError(f"GCS running at {address} is unhealthy: {resp.status}") + + if skip_version_check: + return True + # Otherwise, continue to check for Ray version match. if resp.ray_version is None: resp.ray_version = "<= 1.12" if resp.ray_version != ray.__version__: diff --git a/python/ray/autoscaler/_private/kuberay/run_autoscaler.py b/python/ray/autoscaler/_private/kuberay/run_autoscaler.py index b4d5cab0a7bb1..422345d1a15b0 100644 --- a/python/ray/autoscaler/_private/kuberay/run_autoscaler.py +++ b/python/ray/autoscaler/_private/kuberay/run_autoscaler.py @@ -21,7 +21,17 @@ def run_kuberay_autoscaler(cluster_name: str, cluster_namespace: str): ray_address = f"{head_ip}:6379" while True: try: - subprocess.check_call(["ray", "health-check", "--address", ray_address]) + # Autoscaler Ray version might not exactly match GCS version, so skip the + # version check when checking GCS status. + subprocess.check_call( + [ + "ray", + "health-check", + "--address", + ray_address, + "--skip-version-check", + ] + ) # Logging is not ready yet. Print to stdout for now. print("The Ray head is ready. Starting the autoscaler.") break diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 6b9f45dbacdd3..d88c3587a8d33 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -6,6 +6,7 @@ import subprocess import sys import time +import traceback import urllib import urllib.parse from datetime import datetime @@ -2324,7 +2325,13 @@ def kuberay_autoscaler(cluster_name: str, cluster_namespace: str) -> None: help="Health check for a specific component. Currently supports: " "[ray_client_server]", ) -def healthcheck(address, redis_password, component): +@click.option( + "--skip-version-check", + is_flag=True, + default=False, + help="Skip comparison of GCS version with local Ray version.", +) +def healthcheck(address, redis_password, component, skip_version_check): """ This is NOT a public api. @@ -2335,9 +2342,12 @@ def healthcheck(address, redis_password, component): if not component: try: - if ray._private.gcs_utils.check_health(address): + if ray._private.gcs_utils.check_health( + address, skip_version_check=skip_version_check + ): sys.exit(0) except Exception: + traceback.print_exc() pass sys.exit(1) diff --git a/python/ray/tests/test_advanced_4.py b/python/ray/tests/test_advanced_4.py index f23744c696cb9..5ead198e50b27 100644 --- a/python/ray/tests/test_advanced_4.py +++ b/python/ray/tests/test_advanced_4.py @@ -1,3 +1,4 @@ +import mock import subprocess import sys @@ -112,6 +113,15 @@ def test_check_health(shutdown_only): assert check_health(addr) +def test_check_health_version_check(shutdown_only): + with mock.patch("ray.__version__", "FOO-VERSION"): + conn = ray.init() + addr = conn.address_info["address"] + assert check_health(addr, skip_version_check=True) + with pytest.raises(RuntimeError): + check_health(addr) + + def test_back_pressure(shutdown_only_with_initialization_check): ray.init()