Skip to content

Commit

Permalink
Re-revert skipping version check in Ray status (ray-project#26741)
Browse files Browse the repository at this point in the history
* Revert "Revert "[KubeRay][Autoscaler][Core] Add a flag to disable ray status version check (ray-project#26584)" (ray-project#26597)"

(Fixes test issue.)

Signed-off-by: Rohan138 <[email protected]>
  • Loading branch information
DmitriGekhtman authored and Rohan138 committed Jul 28, 2022
1 parent 1aa6c73 commit 3cf4024
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 4 deletions.
10 changes: 9 additions & 1 deletion python/ray/_private/gcs_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import enum
import logging
import time
import traceback
import inspect
import asyncio
from functools import wraps
Expand Down Expand Up @@ -98,13 +99,15 @@ def create_gcs_channel(address: str, aio=False):
return init_grpc_channel(address, options=_GRPC_OPTIONS, asynchronous=aio)


def check_health(address: str, timeout=2) -> bool:
def check_health(address: str, timeout=2, skip_version_check=False) -> bool:
"""Checks Ray cluster health, before / without actually connecting to the
cluster via ray.init().
Args:
address: Ray cluster / GCS address string, e.g. ip:port.
timeout: request timeout.
skip_version_check: If True, will skip comparision of GCS Ray version with local
Ray version. If False (default), will raise exception on mismatch.
Returns:
Returns True if the cluster is running and has matching Ray version.
Returns False if no service is running.
Expand All @@ -116,9 +119,14 @@ def check_health(address: str, timeout=2) -> bool:
stub = gcs_service_pb2_grpc.HeartbeatInfoGcsServiceStub(channel)
resp = stub.CheckAlive(req, timeout=timeout)
except grpc.RpcError:
traceback.print_exc()
return False
if resp.status.code != GcsCode.OK:
raise RuntimeError(f"GCS running at {address} is unhealthy: {resp.status}")

if skip_version_check:
return True
# Otherwise, continue to check for Ray version match.
if resp.ray_version is None:
resp.ray_version = "<= 1.12"
if resp.ray_version != ray.__version__:
Expand Down
12 changes: 11 additions & 1 deletion python/ray/autoscaler/_private/kuberay/run_autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@ def run_kuberay_autoscaler(cluster_name: str, cluster_namespace: str):
ray_address = f"{head_ip}:6379"
while True:
try:
subprocess.check_call(["ray", "health-check", "--address", ray_address])
# Autoscaler Ray version might not exactly match GCS version, so skip the
# version check when checking GCS status.
subprocess.check_call(
[
"ray",
"health-check",
"--address",
ray_address,
"--skip-version-check",
]
)
# Logging is not ready yet. Print to stdout for now.
print("The Ray head is ready. Starting the autoscaler.")
break
Expand Down
14 changes: 12 additions & 2 deletions python/ray/scripts/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import subprocess
import sys
import time
import traceback
import urllib
import urllib.parse
from datetime import datetime
Expand Down Expand Up @@ -2324,7 +2325,13 @@ def kuberay_autoscaler(cluster_name: str, cluster_namespace: str) -> None:
help="Health check for a specific component. Currently supports: "
"[ray_client_server]",
)
def healthcheck(address, redis_password, component):
@click.option(
"--skip-version-check",
is_flag=True,
default=False,
help="Skip comparison of GCS version with local Ray version.",
)
def healthcheck(address, redis_password, component, skip_version_check):
"""
This is NOT a public api.
Expand All @@ -2335,9 +2342,12 @@ def healthcheck(address, redis_password, component):

if not component:
try:
if ray._private.gcs_utils.check_health(address):
if ray._private.gcs_utils.check_health(
address, skip_version_check=skip_version_check
):
sys.exit(0)
except Exception:
traceback.print_exc()
pass
sys.exit(1)

Expand Down
10 changes: 10 additions & 0 deletions python/ray/tests/test_advanced_4.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import mock
import subprocess
import sys

Expand Down Expand Up @@ -112,6 +113,15 @@ def test_check_health(shutdown_only):
assert check_health(addr)


def test_check_health_version_check(shutdown_only):
with mock.patch("ray.__version__", "FOO-VERSION"):
conn = ray.init()
addr = conn.address_info["address"]
assert check_health(addr, skip_version_check=True)
with pytest.raises(RuntimeError):
check_health(addr)


def test_back_pressure(shutdown_only_with_initialization_check):
ray.init()

Expand Down

0 comments on commit 3cf4024

Please sign in to comment.