From d2f802a25dabbc0f268f70a7bc6df505adeec93a Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Thu, 20 Jul 2023 08:00:40 -0700 Subject: [PATCH 01/41] ip Signed-off-by: SangBin Cho --- python/ray/_private/node.py | 2 ++ python/ray/_private/services.py | 36 ++++++++++++++++++++++++++++- python/ray/_private/worker.py | 4 ++-- python/ray/tests/test_ray_init_2.py | 30 ++++++++++++++++++++++++ 4 files changed, 69 insertions(+), 3 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 3d1b53cb15a92..2f60e9415d9ad 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -207,6 +207,8 @@ def __init__( ) self._init_temp() + # SANG-TODO + # ray._private.services.record_node_ip(self._node_ip_address) # Validate and initialize the persistent storage API. if head: diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 19039132d0b56..15e4a8fda89bf 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -14,11 +14,13 @@ import subprocess import sys import time +from collections import defaultdict from pathlib import Path -from typing import List, Optional, IO, AnyStr +from typing import List, Optional, IO, AnyStr, Dict # Import psutil after ray so the packaged version is used. import psutil +from filelock import FileLock # Ray modules import ray @@ -647,6 +649,38 @@ def node_ip_address_from_perspective(address: str): return node_ip_address +# def record_node_ip(node_ip_address: str, session_dir_path: str, unique_id: str): +# file_path = os.path.join(session_dir_path, "node_ip_address.json") + +# # Maps a Node.unique_id to a dict that maps port names to port numbers. +# ip_by_node: Dict[str, str] = defaultdict(str) + +# with FileLock(file_path + ".lock"): +# if not os.path.exists(file_path): +# with open(file_path, "w") as f: +# json.dump({}, f) + +# with open(file_path, "r") as f: +# ports_by_node.update(json.load(f)) + +# if ( +# self.unique_id in ports_by_node +# and port_name in ports_by_node[self.unique_id] +# ): +# # The port has already been cached at this node, so use it. +# port = int(ports_by_node[self.unique_id][port_name]) +# else: +# # Pick a new port to use and cache it at this node. +# port = default_port or self._get_unused_port( +# set(ports_by_node[self.unique_id].values()) +# ) +# ports_by_node[self.unique_id][port_name] = port +# with open(file_path, "w") as f: +# json.dump(ports_by_node, f) + +# return port + + def get_node_ip_address(address="8.8.8.8:53"): if ray._private.worker._global_node is not None: return ray._private.worker._global_node.node_ip_address diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 7900e1cb710c5..5c83bd148f84c 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1430,8 +1430,8 @@ def init( gcs_address = bootstrap_address logger.info("Connecting to existing Ray cluster at address: %s...", gcs_address) - if _node_ip_address is not None: - node_ip_address = services.resolve_ip_for_localhost(_node_ip_address) + assert _node_ip_address is not None, "_node_ip_address cannot be None" + node_ip_address = services.resolve_ip_for_localhost(_node_ip_address) raylet_ip_address = node_ip_address if local_mode: diff --git a/python/ray/tests/test_ray_init_2.py b/python/ray/tests/test_ray_init_2.py index cd3e3f95f69c2..ae3a14ffd56e7 100644 --- a/python/ray/tests/test_ray_init_2.py +++ b/python/ray/tests/test_ray_init_2.py @@ -2,12 +2,15 @@ import os import sys import unittest.mock +from unittest.mock import patch import pytest import ray from ray._private.ray_constants import RAY_OVERRIDE_DASHBOARD_URL, DEFAULT_RESOURCES +from ray.air.util.node import _get_node_id_from_node_ip import ray._private.services +from ray._private.services import get_node_ip_address from ray.dashboard.utils import ray_address_to_api_server_url from ray._private.test_utils import ( get_current_unused_port, @@ -15,6 +18,7 @@ wait_for_condition, ) from ray.util.client.ray_client_helpers import ray_start_client_server +from ray.util.state import list_nodes def test_ray_init_context(shutdown_only): @@ -332,6 +336,32 @@ def test_temp_dir_must_be_absolute(shutdown_only): ray.init(_temp_dir="relative_path") +def test_driver_node_ip_address_auto_configuration(monkeypatch, ray_start_cluster): + """Simulate the ray is started with node-ip-address (privately assigned IP). + + At this time, the driver should automatically use the node-ip-address given + to ray start. + """ + with patch("ray._private.ray_constants.ENABLE_RAY_CLUSTER") as enable_cluster_constant: + # Without this, it will always use localhost (for MacOS and Windows). + enable_cluster_constant.return_value = True + ray_start_ip = get_node_ip_address() + + with patch("ray._private.services.node_ip_address_from_perspective") as mocked_node_ip_address: # noqa + # Mock the node_ip_address_from_perspective will return the + # IP that's not assigned to ray start. + mocked_node_ip_address.return_value = "134.31.31.31" + cluster = ray_start_cluster + cluster.add_node(node_ip_address=ray_start_ip) + print(get_node_ip_address()) + print(ray_start_ip) + + # If the IP is not correctly configured, it will hang. + ray.init(address=cluster.address) + assert (_get_node_id_from_node_ip(get_node_ip_address()).hex() + == ray.get_runtime_context().node_id) + + if __name__ == "__main__": import sys From a391f661b5bc4ca11a1bd6d98dcf2144d002fa0d Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 21 Jul 2023 00:29:46 -0700 Subject: [PATCH 02/41] ip Signed-off-by: SangBin Cho --- python/ray/_private/node.py | 2 +- python/ray/_private/services.py | 153 +++++++++++++++++++++++++------- t.py | 24 +++++ 3 files changed, 148 insertions(+), 31 deletions(-) create mode 100644 t.py diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 2f60e9415d9ad..d46788eb35182 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -208,7 +208,7 @@ def __init__( self._init_temp() # SANG-TODO - # ray._private.services.record_node_ip(self._node_ip_address) + ray._private.services.record_node_ip(self._node_ip_address) # Validate and initialize the persistent storage API. if head: diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 15e4a8fda89bf..ee85fb241e637 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -15,8 +15,10 @@ import sys import time from collections import defaultdict +from contextlib import contextmanager from pathlib import Path from typing import List, Optional, IO, AnyStr, Dict +from dataclasses import dataclass # Import psutil after ray so the packaged version is used. import psutil @@ -649,36 +651,127 @@ def node_ip_address_from_perspective(address: str): return node_ip_address -# def record_node_ip(node_ip_address: str, session_dir_path: str, unique_id: str): -# file_path = os.path.join(session_dir_path, "node_ip_address.json") - -# # Maps a Node.unique_id to a dict that maps port names to port numbers. -# ip_by_node: Dict[str, str] = defaultdict(str) - -# with FileLock(file_path + ".lock"): -# if not os.path.exists(file_path): -# with open(file_path, "w") as f: -# json.dump({}, f) - -# with open(file_path, "r") as f: -# ports_by_node.update(json.load(f)) - -# if ( -# self.unique_id in ports_by_node -# and port_name in ports_by_node[self.unique_id] -# ): -# # The port has already been cached at this node, so use it. -# port = int(ports_by_node[self.unique_id][port_name]) -# else: -# # Pick a new port to use and cache it at this node. -# port = default_port or self._get_unused_port( -# set(ports_by_node[self.unique_id].values()) -# ) -# ports_by_node[self.unique_id][port_name] = port -# with open(file_path, "w") as f: -# json.dump(ports_by_node, f) - -# return port +@dataclass +class NodePort: + name: str + port: int + + +@dataclass +class NodeMetadata: + # port_name -> port + ports: Dict[str, int] + # The ip address of the node. + node_ip_address: str + + +class NodeMetadataManager: + def __init__(self, session_dir_path: str): + # Create the file if it doesn't exist. + self.file_path = os.path.join(session_dir_path, "node_metadata.json") + with FileLock(self.file_path + ".lock"): + if not os.path.exists(self.file_path): + with open(self.file_path, "w") as f: + json.dump({}, f) + + def get_or_update_cached_port(self, unique_node_id: str, port_name: str, port: int): + id_to_node_metadata: Dict[str, NodeMetadata] = {} + with FileLock(self.file_path + ".lock"): + with open(self.file_path, "r") as f: + id_to_node_metadata.update(json.load(f)) + + node_metadata = id_to_node_metadata.get(unique_node_id, None) + if node_metadata: + node_metadata = NodeMetadata(**node_metadata) + ports = node_metadata.ports + ports[port_name] = port + else: + + + if ( + self.unique_id in ports_by_node + and port_name in ports_by_node[self.unique_id] + ): + # The port has already been cached at this node, so use it. + port = int(ports_by_node[self.unique_id][port_name]) + else: + # Pick a new port to use and cache it at this node. + port = default_port or self._get_unused_port( + set(ports_by_node[self.unique_id].values()) + ) + ports_by_node[self.unique_id][port_name] = port + with open(file_path, "w") as f: + json.dump(ports_by_node, f) + + def get_or_update_node_ip_address(self, unique_id: str, ip_address: str): + + + +def get_or_write_node_metadata(session_dir_path: str): + file_path = os.path.join(session_dir_path, "node_metadata.json") + + # Make sure only the ports in RAY_CACHED_PORTS are cached. + assert port_name in ray_constants.RAY_ALLOWED_CACHED_PORTS + + # Maps a Node.unique_id to a dict that maps port names to port numbers. + ports_by_node: Dict[str, Dict[str, int]] = defaultdict(dict) + + with FileLock(file_path + ".lock"): + if not os.path.exists(file_path): + with open(file_path, "w") as f: + json.dump({}, f) + + with open(file_path, "r") as f: + ports_by_node.update(json.load(f)) + + if ( + self.unique_id in ports_by_node + and port_name in ports_by_node[self.unique_id] + ): + # The port has already been cached at this node, so use it. + port = int(ports_by_node[self.unique_id][port_name]) + else: + # Pick a new port to use and cache it at this node. + port = default_port or self._get_unused_port( + set(ports_by_node[self.unique_id].values()) + ) + ports_by_node[self.unique_id][port_name] = port + with open(file_path, "w") as f: + json.dump(ports_by_node, f) + + return port + + +def record_node_ip(node_ip_address: str, session_dir_path: str, unique_id: str): + file_path = os.path.join(session_dir_path, "node_ip_address.json") + + # Maps a Node.unique_id to a dict that maps port names to port numbers. + ip_by_node: Dict[str, str] = defaultdict(str) + + with FileLock(file_path + ".lock"): + if not os.path.exists(file_path): + with open(file_path, "w") as f: + json.dump({}, f) + + with open(file_path, "r") as f: + ports_by_node.update(json.load(f)) + + if ( + self.unique_id in ports_by_node + and port_name in ports_by_node[self.unique_id] + ): + # The port has already been cached at this node, so use it. + port = int(ports_by_node[self.unique_id][port_name]) + else: + # Pick a new port to use and cache it at this node. + port = default_port or self._get_unused_port( + set(ports_by_node[self.unique_id].values()) + ) + ports_by_node[self.unique_id][port_name] = port + with open(file_path, "w") as f: + json.dump(ports_by_node, f) + + return port def get_node_ip_address(address="8.8.8.8:53"): diff --git a/t.py b/t.py new file mode 100644 index 0000000000000..57b9551e464b7 --- /dev/null +++ b/t.py @@ -0,0 +1,24 @@ +import ray +import time + +while True: + s = time.time() + ray.init() + @ray.remote + class A: + def f(self): + for _ in range(300): + 12 + 12 + + actors = [A.remote() for _ in range(8)] + for _ in range(30): + [ray.get(c.f.remote()) for c in actors] + for actor in actors: + ray.kill(actor) + for actor in actors: + try: + ray.get(c.__ray_ready__.remote()) + except Exception: + pass + ray.shutdown() + print(time.time() - s) From 3966778b1715375218d4b6c2b0d8cd89df2325e1 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 21 Jul 2023 08:06:09 -0700 Subject: [PATCH 03/41] working now. Signed-off-by: SangBin Cho --- python/ray/_private/node.py | 130 +++++++++++++++++++++++----- python/ray/_private/services.py | 123 -------------------------- python/ray/_private/worker.py | 11 --- python/ray/tests/test_ray_init_2.py | 16 ++-- t.py | 4 +- 5 files changed, 120 insertions(+), 164 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index d46788eb35182..f47053e1bb2f0 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -18,6 +18,7 @@ from typing import Dict, Optional, Tuple, IO, AnyStr from filelock import FileLock +from pathlib import Path import ray import ray._private.ray_constants as ray_constants @@ -97,26 +98,6 @@ def __init__( ray_params.external_addresses = external_redis ray_params.num_redis_shards = len(external_redis) - 1 - # Try to get node IP address with the parameters. - if ray_params.node_ip_address: - node_ip_address = ray_params.node_ip_address - elif ray_params.redis_address: - node_ip_address = ray.util.get_node_ip_address(ray_params.redis_address) - else: - node_ip_address = ray.util.get_node_ip_address() - self._node_ip_address = node_ip_address - - if ray_params.raylet_ip_address: - raylet_ip_address = ray_params.raylet_ip_address - else: - raylet_ip_address = node_ip_address - - if raylet_ip_address != node_ip_address and (not connect_only or head): - raise ValueError( - "The raylet IP address should only be different than the node " - "IP address when connecting to an existing raylet; i.e., when " - "head=False and connect_only=True." - ) if ( ray_params._system_config and len(ray_params._system_config) > 0 @@ -126,8 +107,6 @@ def __init__( "System config parameters can only be set on the head node." ) - self._raylet_ip_address = raylet_ip_address - ray_params.update_if_absent( include_log_monitor=True, resources={}, @@ -206,9 +185,45 @@ def __init__( f"{ray_params.dashboard_host}:{ray_params.dashboard_port}" ) + # It creates a session_dir. self._init_temp() - # SANG-TODO - ray._private.services.record_node_ip(self._node_ip_address) + + node_ip_address = ray_params.node_ip_address + if connect_only: + self.wait_for_node_address() + else: + if node_ip_address is None: + node_ip_address = ray._private.services.resolve_ip_for_localhost( + ray_constants.NODE_DEFAULT_IP + ) + + node_ip_address = self._get_or_write_node_address(node_ip_address) + + ray_params.update_if_absent( + node_ip_address=node_ip_address, raylet_ip_address=node_ip_address + ) + + # Try to get node IP address with the parameters. + # if ray_params.node_ip_address: + # node_ip_address = ray_params.node_ip_address + # elif ray_params.redis_address: + # node_ip_address = ray.util.get_node_ip_address(ray_params.redis_address) + # else: + # node_ip_address = ray.util.get_node_ip_address() + self._node_ip_address = node_ip_address + + if ray_params.raylet_ip_address: + raylet_ip_address = ray_params.raylet_ip_address + else: + raylet_ip_address = node_ip_address + + if raylet_ip_address != node_ip_address and (not connect_only or head): + raise ValueError( + "The raylet IP address should only be different than the node " + "IP address when connecting to an existing raylet; i.e., when " + "head=False and connect_only=True." + ) + self._raylet_ip_address = raylet_ip_address # Validate and initialize the persistent storage API. if head: @@ -875,6 +890,73 @@ def _get_cached_port( return port + def wait_for_node_address(self): + """Wait until the node_ip_address.json file is avialable. + + node_ip_address.json is created when a ray instance is started. + """ + assert hasattr(self, "_session_dir") + NODE_IP_FILE_NAME = "node_ip_address.json" + file_path = Path(os.path.join(self.get_session_dir_path(), NODE_IP_FILE_NAME)) + MAX_WAIT_S = 60 + for i in range(MAX_WAIT_S): + if file_path.exists(): + break + + time.sleep(1) + if i % 10 == 0: + logger.info( + f"Can't find a `{NODE_IP_FILE_NAME}` file. " + "Have you started Ray instsance using " + "`ray start` or `ray.init`?" + ) + if i == MAX_WAIT_S: + raise ValueError( + f"Can't find a `{NODE_IP_FILE_NAME}` file " + f"for {MAX_WAIT_S} seconds" + "It means the ray instance hasn't started. " + "Did you do `ray start` or `ray.init` on this host?" + ) + + def _get_or_write_node_address(self, node_ip_address: str) -> str: + """Get a node address cached on this session. + + If a ray instance is started by `ray start --node-ip-address`, + the node ip address is cached to a file node_ip_address.json. + + This API is process-safe, meaning the file access is protected by + a file lock. + + Args: + node_ip_address: The node IP address of the current node. + Returns: + node_ip_address of the current node passed to ray start if it exists. + None otherwise. + """ + assert hasattr(self, "_session_dir") + file_path = Path( + os.path.join(self.get_session_dir_path(), "node_ip_address.json") + ) + cached_node_ip_address = {} + + with FileLock(str(file_path.absolute()) + ".lock"): + if not file_path.exists(): + with file_path.open(mode="w") as f: + json.dump({}, f) + + with file_path.open() as f: + cached_node_ip_address.update(json.load(f)) + + if "node_ip_address" in cached_node_ip_address: + # The port has already been cached at this node, so use it. + node_ip_address = cached_node_ip_address["node_ip_address"] + else: + cached_node_ip_address["node_ip_address"] = node_ip_address + with file_path.open(mode="w") as f: + json.dump(cached_node_ip_address, f) + + return node_ip_address + def start_reaper_process(self): """ Start the reaper process. diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index ee85fb241e637..467d75e90e730 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -651,129 +651,6 @@ def node_ip_address_from_perspective(address: str): return node_ip_address -@dataclass -class NodePort: - name: str - port: int - - -@dataclass -class NodeMetadata: - # port_name -> port - ports: Dict[str, int] - # The ip address of the node. - node_ip_address: str - - -class NodeMetadataManager: - def __init__(self, session_dir_path: str): - # Create the file if it doesn't exist. - self.file_path = os.path.join(session_dir_path, "node_metadata.json") - with FileLock(self.file_path + ".lock"): - if not os.path.exists(self.file_path): - with open(self.file_path, "w") as f: - json.dump({}, f) - - def get_or_update_cached_port(self, unique_node_id: str, port_name: str, port: int): - id_to_node_metadata: Dict[str, NodeMetadata] = {} - with FileLock(self.file_path + ".lock"): - with open(self.file_path, "r") as f: - id_to_node_metadata.update(json.load(f)) - - node_metadata = id_to_node_metadata.get(unique_node_id, None) - if node_metadata: - node_metadata = NodeMetadata(**node_metadata) - ports = node_metadata.ports - ports[port_name] = port - else: - - - if ( - self.unique_id in ports_by_node - and port_name in ports_by_node[self.unique_id] - ): - # The port has already been cached at this node, so use it. - port = int(ports_by_node[self.unique_id][port_name]) - else: - # Pick a new port to use and cache it at this node. - port = default_port or self._get_unused_port( - set(ports_by_node[self.unique_id].values()) - ) - ports_by_node[self.unique_id][port_name] = port - with open(file_path, "w") as f: - json.dump(ports_by_node, f) - - def get_or_update_node_ip_address(self, unique_id: str, ip_address: str): - - - -def get_or_write_node_metadata(session_dir_path: str): - file_path = os.path.join(session_dir_path, "node_metadata.json") - - # Make sure only the ports in RAY_CACHED_PORTS are cached. - assert port_name in ray_constants.RAY_ALLOWED_CACHED_PORTS - - # Maps a Node.unique_id to a dict that maps port names to port numbers. - ports_by_node: Dict[str, Dict[str, int]] = defaultdict(dict) - - with FileLock(file_path + ".lock"): - if not os.path.exists(file_path): - with open(file_path, "w") as f: - json.dump({}, f) - - with open(file_path, "r") as f: - ports_by_node.update(json.load(f)) - - if ( - self.unique_id in ports_by_node - and port_name in ports_by_node[self.unique_id] - ): - # The port has already been cached at this node, so use it. - port = int(ports_by_node[self.unique_id][port_name]) - else: - # Pick a new port to use and cache it at this node. - port = default_port or self._get_unused_port( - set(ports_by_node[self.unique_id].values()) - ) - ports_by_node[self.unique_id][port_name] = port - with open(file_path, "w") as f: - json.dump(ports_by_node, f) - - return port - - -def record_node_ip(node_ip_address: str, session_dir_path: str, unique_id: str): - file_path = os.path.join(session_dir_path, "node_ip_address.json") - - # Maps a Node.unique_id to a dict that maps port names to port numbers. - ip_by_node: Dict[str, str] = defaultdict(str) - - with FileLock(file_path + ".lock"): - if not os.path.exists(file_path): - with open(file_path, "w") as f: - json.dump({}, f) - - with open(file_path, "r") as f: - ports_by_node.update(json.load(f)) - - if ( - self.unique_id in ports_by_node - and port_name in ports_by_node[self.unique_id] - ): - # The port has already been cached at this node, so use it. - port = int(ports_by_node[self.unique_id][port_name]) - else: - # Pick a new port to use and cache it at this node. - port = default_port or self._get_unused_port( - set(ports_by_node[self.unique_id].values()) - ) - ports_by_node[self.unique_id][port_name] = port - with open(file_path, "w") as f: - json.dump(ports_by_node, f) - - return port - - def get_node_ip_address(address="8.8.8.8:53"): if ray._private.worker._global_node is not None: return ray._private.worker._global_node.node_ip_address diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 5c83bd148f84c..431451508f9e5 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1284,9 +1284,6 @@ def init( ) _redis_max_memory: Optional[int] = kwargs.pop("_redis_max_memory", None) _plasma_directory: Optional[str] = kwargs.pop("_plasma_directory", None) - _node_ip_address: str = kwargs.pop( - "_node_ip_address", ray_constants.NODE_DEFAULT_IP - ) _driver_object_store_memory: Optional[int] = kwargs.pop( "_driver_object_store_memory", None ) @@ -1430,10 +1427,6 @@ def init( gcs_address = bootstrap_address logger.info("Connecting to existing Ray cluster at address: %s...", gcs_address) - assert _node_ip_address is not None, "_node_ip_address cannot be None" - node_ip_address = services.resolve_ip_for_localhost(_node_ip_address) - raylet_ip_address = node_ip_address - if local_mode: driver_mode = LOCAL_MODE warnings.warn( @@ -1478,8 +1471,6 @@ def init( # Use a random port by not specifying Redis port / GCS server port. ray_params = ray._private.parameter.RayParams( - node_ip_address=node_ip_address, - raylet_ip_address=raylet_ip_address, object_ref_seed=None, driver_mode=driver_mode, redirect_output=None, @@ -1562,8 +1553,6 @@ def init( # In this case, we only need to connect the node. ray_params = ray._private.parameter.RayParams( - node_ip_address=node_ip_address, - raylet_ip_address=raylet_ip_address, gcs_address=gcs_address, redis_address=redis_address, redis_password=_redis_password, diff --git a/python/ray/tests/test_ray_init_2.py b/python/ray/tests/test_ray_init_2.py index ae3a14ffd56e7..b51db6d2c5aee 100644 --- a/python/ray/tests/test_ray_init_2.py +++ b/python/ray/tests/test_ray_init_2.py @@ -338,16 +338,20 @@ def test_temp_dir_must_be_absolute(shutdown_only): def test_driver_node_ip_address_auto_configuration(monkeypatch, ray_start_cluster): """Simulate the ray is started with node-ip-address (privately assigned IP). - + At this time, the driver should automatically use the node-ip-address given to ray start. """ - with patch("ray._private.ray_constants.ENABLE_RAY_CLUSTER") as enable_cluster_constant: + with patch( + "ray._private.ray_constants.ENABLE_RAY_CLUSTER" + ) as enable_cluster_constant: # Without this, it will always use localhost (for MacOS and Windows). enable_cluster_constant.return_value = True ray_start_ip = get_node_ip_address() - with patch("ray._private.services.node_ip_address_from_perspective") as mocked_node_ip_address: # noqa + with patch( + "ray._private.services.node_ip_address_from_perspective" + ) as mocked_node_ip_address: # noqa # Mock the node_ip_address_from_perspective will return the # IP that's not assigned to ray start. mocked_node_ip_address.return_value = "134.31.31.31" @@ -358,8 +362,10 @@ def test_driver_node_ip_address_auto_configuration(monkeypatch, ray_start_cluste # If the IP is not correctly configured, it will hang. ray.init(address=cluster.address) - assert (_get_node_id_from_node_ip(get_node_ip_address()).hex() - == ray.get_runtime_context().node_id) + assert ( + _get_node_id_from_node_ip(get_node_ip_address()) + == ray.get_runtime_context().get_node_id() + ) if __name__ == "__main__": diff --git a/t.py b/t.py index 57b9551e464b7..72fc6b4d0b536 100644 --- a/t.py +++ b/t.py @@ -1,9 +1,11 @@ -import ray import time +import ray + while True: s = time.time() ray.init() + @ray.remote class A: def f(self): From 992d7ab7c5105fb83ecef28dd57c2e9e99508297 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 21 Jul 2023 08:06:50 -0700 Subject: [PATCH 04/41] working + lint Signed-off-by: SangBin Cho --- python/ray/_private/services.py | 6 +----- python/ray/tests/test_ray_init_2.py | 1 - t.py | 26 -------------------------- 3 files changed, 1 insertion(+), 32 deletions(-) delete mode 100644 t.py diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 467d75e90e730..19039132d0b56 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -14,15 +14,11 @@ import subprocess import sys import time -from collections import defaultdict -from contextlib import contextmanager from pathlib import Path -from typing import List, Optional, IO, AnyStr, Dict -from dataclasses import dataclass +from typing import List, Optional, IO, AnyStr # Import psutil after ray so the packaged version is used. import psutil -from filelock import FileLock # Ray modules import ray diff --git a/python/ray/tests/test_ray_init_2.py b/python/ray/tests/test_ray_init_2.py index b51db6d2c5aee..37e8780351456 100644 --- a/python/ray/tests/test_ray_init_2.py +++ b/python/ray/tests/test_ray_init_2.py @@ -18,7 +18,6 @@ wait_for_condition, ) from ray.util.client.ray_client_helpers import ray_start_client_server -from ray.util.state import list_nodes def test_ray_init_context(shutdown_only): diff --git a/t.py b/t.py deleted file mode 100644 index 72fc6b4d0b536..0000000000000 --- a/t.py +++ /dev/null @@ -1,26 +0,0 @@ -import time - -import ray - -while True: - s = time.time() - ray.init() - - @ray.remote - class A: - def f(self): - for _ in range(300): - 12 + 12 - - actors = [A.remote() for _ in range(8)] - for _ in range(30): - [ray.get(c.f.remote()) for c in actors] - for actor in actors: - ray.kill(actor) - for actor in actors: - try: - ray.get(c.__ray_ready__.remote()) - except Exception: - pass - ray.shutdown() - print(time.time() - s) From 8083748424d1d62b10d8a0230e3d04facd43f0e1 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 21 Jul 2023 08:10:08 -0700 Subject: [PATCH 05/41] Fix Signed-off-by: SangBin Cho --- python/ray/_private/node.py | 7 ------- python/ray/_private/worker.py | 3 +++ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index f47053e1bb2f0..19182cc49affb 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -203,13 +203,6 @@ def __init__( node_ip_address=node_ip_address, raylet_ip_address=node_ip_address ) - # Try to get node IP address with the parameters. - # if ray_params.node_ip_address: - # node_ip_address = ray_params.node_ip_address - # elif ray_params.redis_address: - # node_ip_address = ray.util.get_node_ip_address(ray_params.redis_address) - # else: - # node_ip_address = ray.util.get_node_ip_address() self._node_ip_address = node_ip_address if ray_params.raylet_ip_address: diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 431451508f9e5..aa90f1cb1d41d 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1284,6 +1284,7 @@ def init( ) _redis_max_memory: Optional[int] = kwargs.pop("_redis_max_memory", None) _plasma_directory: Optional[str] = kwargs.pop("_plasma_directory", None) + _node_ip_address: str = kwargs.pop("_node_ip_address", None) _driver_object_store_memory: Optional[int] = kwargs.pop( "_driver_object_store_memory", None ) @@ -1471,6 +1472,7 @@ def init( # Use a random port by not specifying Redis port / GCS server port. ray_params = ray._private.parameter.RayParams( + node_ip_address=_node_ip_address, object_ref_seed=None, driver_mode=driver_mode, redirect_output=None, @@ -1553,6 +1555,7 @@ def init( # In this case, we only need to connect the node. ray_params = ray._private.parameter.RayParams( + node_ip_address=_node_ip_address, gcs_address=gcs_address, redis_address=redis_address, redis_password=_redis_password, From a25aa34dfb43f0dc31bcbe721994414cb4531649 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Thu, 10 Aug 2023 10:21:27 +0900 Subject: [PATCH 06/41] ip --- python/ray/_private/node.py | 12 ++++++------ python/ray/_private/ray_perf.py | 1 - 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index cf49a755494cf..3a5171d1965c1 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -190,7 +190,7 @@ def __init__( node_ip_address = ray_params.node_ip_address if connect_only: - self.wait_for_node_address() + self._wait_for_node_address() else: if node_ip_address is None: node_ip_address = ray._private.services.resolve_ip_for_localhost( @@ -933,7 +933,7 @@ def _get_cached_port( return port - def wait_for_node_address(self): + def _wait_for_node_address(self, timeout_s=60): """Wait until the node_ip_address.json file is avialable. node_ip_address.json is created when a ray instance is started. @@ -941,21 +941,21 @@ def wait_for_node_address(self): assert hasattr(self, "_session_dir") NODE_IP_FILE_NAME = "node_ip_address.json" file_path = Path(os.path.join(self.get_session_dir_path(), NODE_IP_FILE_NAME)) - MAX_WAIT_S = 60 - for i in range(MAX_WAIT_S): + for i in range(timeout_s): if file_path.exists(): break time.sleep(1) if i % 10 == 0: logger.info( - f"Can't find a `{NODE_IP_FILE_NAME}` file. " + f"Can't find a `{NODE_IP_FILE_NAME}` file from " + f"{file_path}." "Have you started Ray instsance using " "`ray start` or `ray.init`?" ) if i == MAX_WAIT_S: raise ValueError( - f"Can't find a `{NODE_IP_FILE_NAME}` file " + f"Can't find a `{NODE_IP_FILE_NAME}` file from {file_path}" f"for {MAX_WAIT_S} seconds" "It means the ray instance hasn't started. " "Did you do `ray start` or `ray.init` on this host?" diff --git a/python/ray/_private/ray_perf.py b/python/ray/_private/ray_perf.py index 316f3baeca846..db2106259b1fd 100644 --- a/python/ray/_private/ray_perf.py +++ b/python/ray/_private/ray_perf.py @@ -319,6 +319,5 @@ def placement_group_create_removal(num_pgs): return results - if __name__ == "__main__": main() From db2af38d28b593900f1667077786e3ae967df699 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 22 Aug 2023 19:01:37 +0900 Subject: [PATCH 07/41] ip --- python/ray/_private/node.py | 84 +++++++++++++++++++++-------- python/ray/tests/test_ray_init_2.py | 8 +++ 2 files changed, 70 insertions(+), 22 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 402c7dfb70ce8..812248eccd7df 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -190,15 +190,14 @@ def __init__( node_ip_address = ray_params.node_ip_address if connect_only: - self._wait_for_node_address() + node_ip_address = self._wait_for_node_address() else: if node_ip_address is None: node_ip_address = ray._private.services.resolve_ip_for_localhost( ray_constants.NODE_DEFAULT_IP ) - - node_ip_address = self._get_or_write_node_address(node_ip_address) - + + assert node_ip_address is not None ray_params.update_if_absent( node_ip_address=node_ip_address, raylet_ip_address=node_ip_address ) @@ -277,6 +276,13 @@ def __init__( default_port=ray_params.runtime_env_agent_port, ) + # Write a node_ip_address to a file so that + # ray.init can pick up. + # This has to be done here because it requires + # self.unique_id to exist. + if not connect_only: + self._write_node_ip_address(node_ip_address) + ray_params.update_if_absent( metrics_agent_port=self.metrics_agent_port, metrics_export_port=self._metrics_export_port, @@ -944,35 +950,44 @@ def _get_cached_port( return port - def _wait_for_node_address(self, timeout_s=60): + def _wait_for_node_address(self, timeout_s: int = 60) -> str: """Wait until the node_ip_address.json file is avialable. node_ip_address.json is created when a ray instance is started. + + Args: + timeout_s: If the ip address is not found within this + timeout, it will raise ValueError. + Returns: + The node_ip_address of the current session if it finds it + within timeout_s. """ - assert hasattr(self, "_session_dir") - NODE_IP_FILE_NAME = "node_ip_address.json" - file_path = Path(os.path.join(self.get_session_dir_path(), NODE_IP_FILE_NAME)) for i in range(timeout_s): - if file_path.exists(): + node_ip_address = self._get_cached_node_ip_address() + + if node_ip_address is not None: break time.sleep(1) if i % 10 == 0: logger.info( f"Can't find a `{NODE_IP_FILE_NAME}` file from " - f"{file_path}." + f"{file_path} or can't the unique id " + f"{self.unique_id} from the file. " "Have you started Ray instsance using " "`ray start` or `ray.init`?" ) if i == MAX_WAIT_S: raise ValueError( - f"Can't find a `{NODE_IP_FILE_NAME}` file from {file_path}" + f"Can't find a `{NODE_IP_FILE_NAME}` file from " + f"{file_path} or can't the unique id " + f"{self.unique_id} from the file " f"for {MAX_WAIT_S} seconds" "It means the ray instance hasn't started. " "Did you do `ray start` or `ray.init` on this host?" ) - def _get_or_write_node_address(self, node_ip_address: str) -> str: + def _get_cached_node_ip_address(self) -> str: """Get a node address cached on this session. If a ray instance is started by `ray start --node-ip-address`, @@ -981,11 +996,41 @@ def _get_or_write_node_address(self, node_ip_address: str) -> str: This API is process-safe, meaning the file access is protected by a file lock. + Returns: + node_ip_address cached on the current node. None if the node + ip addrss is not written to a file or the file doesn't exist. + """ + assert hasattr(self, "_session_dir") + file_path = Path( + os.path.join(self.get_session_dir_path(), "node_ip_address.json") + ) + cached_node_ip_address = {} + + if not file_path.exists(): + return None + + with FileLock(str(file_path.absolute()) + ".lock"): + with file_path.open() as f: + cached_node_ip_address.update(json.load(f)) + + if self.unique_id in cached_node_ip_address: + return cached_node_ip_address[self.unique_id] + else: + return None + + + def _write_node_ip_address(self, node_ip_address: str) -> None: + """Write a node ip address of the current session to + node_ip_address.json. + + If a ray instance is started by `ray start --node-ip-address`, + the node ip address is cached to a file node_ip_address.json. + + This API is process-safe, meaning the file access is protected by + a file lock. + Args: node_ip_address: The node IP address of the current node. - Returns: - node_ip_address of the current node passed to ray start if it exists. - None otherwise. """ assert hasattr(self, "_session_dir") file_path = Path( @@ -1001,16 +1046,11 @@ def _get_or_write_node_address(self, node_ip_address: str) -> str: with file_path.open() as f: cached_node_ip_address.update(json.load(f)) - if "node_ip_address" in cached_node_ip_address: - # The port has already been cached at this node, so use it. - node_ip_address = cached_node_ip_address["node_ip_address"] - else: - cached_node_ip_address["node_ip_address"] = node_ip_address + if self.unique_id not in cached_node_ip_address: + cached_node_ip_address[self.unique_id] = node_ip_address with file_path.open(mode="w") as f: json.dump(cached_node_ip_address, f) - return node_ip_address - def start_reaper_process(self): """ Start the reaper process. diff --git a/python/ray/tests/test_ray_init_2.py b/python/ray/tests/test_ray_init_2.py index 0bde848aef1a2..d511f4c470c2e 100644 --- a/python/ray/tests/test_ray_init_2.py +++ b/python/ray/tests/test_ray_init_2.py @@ -281,6 +281,14 @@ def verify(): subprocess.check_output("ray stop --force", shell=True) +def test_get_and_write_node_ip_address(shutdown_only): + ray.init() + node = ray._private.worker.global_worker.node + node_ip = ray._private.services.get_node_ip_address() + cached_node_ip_address = node._get_cached_node_ip_address() + assert cached_node_ip_address.get(node.unique_id) == node_ip + + @pytest.mark.skipif(sys.platform != "linux", reason="skip except linux") def test_ray_init_from_workers(ray_start_cluster): cluster = ray_start_cluster From 8476e4f0bcd9fac5b7dadf1f56abd7d8639de3db Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 22 Aug 2023 19:03:23 +0900 Subject: [PATCH 08/41] ip --- python/ray/_private/node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 812248eccd7df..4559e853a87f1 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -281,7 +281,7 @@ def __init__( # This has to be done here because it requires # self.unique_id to exist. if not connect_only: - self._write_node_ip_address(node_ip_address) + self._write_node_ip_address(ray_params.node_ip_address) ray_params.update_if_absent( metrics_agent_port=self.metrics_agent_port, From ba69b1bfd6b8870f70804cc9517f44b5d45e891f Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 22 Aug 2023 19:53:11 +0900 Subject: [PATCH 09/41] Made it work. --- python/ray/_private/node.py | 59 +++++++++++++++++++-------------- python/ray/_private/ray_perf.py | 1 + 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 4559e853a87f1..8621a13c3fd9b 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -189,14 +189,14 @@ def __init__( self._init_temp() node_ip_address = ray_params.node_ip_address - if connect_only: - node_ip_address = self._wait_for_node_address() - else: - if node_ip_address is None: + if node_ip_address is None: + if connect_only: + node_ip_address = self._wait_for_node_address() + else: node_ip_address = ray._private.services.resolve_ip_for_localhost( ray_constants.NODE_DEFAULT_IP ) - + assert node_ip_address is not None ray_params.update_if_absent( node_ip_address=node_ip_address, raylet_ip_address=node_ip_address @@ -260,6 +260,7 @@ def __init__( self._raylet_socket_name = self._prepare_socket_file( self._ray_params.raylet_socket_name, default_prefix="raylet" ) + self._write_node_ip_address(ray_params.node_ip_address) self.metrics_agent_port = self._get_cached_port( "metrics_agent_port", default_port=ray_params.metrics_agent_port @@ -276,13 +277,6 @@ def __init__( default_port=ray_params.runtime_env_agent_port, ) - # Write a node_ip_address to a file so that - # ray.init can pick up. - # This has to be done here because it requires - # self.unique_id to exist. - if not connect_only: - self._write_node_ip_address(ray_params.node_ip_address) - ray_params.update_if_absent( metrics_agent_port=self.metrics_agent_port, metrics_export_port=self._metrics_export_port, @@ -992,34 +986,39 @@ def _get_cached_node_ip_address(self) -> str: If a ray instance is started by `ray start --node-ip-address`, the node ip address is cached to a file node_ip_address.json. + Otherwise, the file exists, but it is emptyl. This API is process-safe, meaning the file access is protected by a file lock. Returns: node_ip_address cached on the current node. None if the node - ip addrss is not written to a file or the file doesn't exist. + the file doesn't exist, meaning ray instance hasn't been + started on a current node. If node_ip_address is not written + to a file, it means --node-ip-address is not given, and in this + case, we find the IP address ourselves. """ assert hasattr(self, "_session_dir") file_path = Path( os.path.join(self.get_session_dir_path(), "node_ip_address.json") ) cached_node_ip_address = {} - - if not file_path.exists(): - return None with FileLock(str(file_path.absolute()) + ".lock"): + if not file_path.exists(): + return None + with file_path.open() as f: cached_node_ip_address.update(json.load(f)) - - if self.unique_id in cached_node_ip_address: - return cached_node_ip_address[self.unique_id] - else: - return None + if "node_ip_address" in cached_node_ip_address: + return cached_node_ip_address["node_ip_address"] + else: + return ray._private.services.resolve_ip_for_localhost( + ray_constants.NODE_DEFAULT_IP + ) - def _write_node_ip_address(self, node_ip_address: str) -> None: + def _write_node_ip_address(self, node_ip_address: Optional[str]) -> None: """Write a node ip address of the current session to node_ip_address.json. @@ -1029,10 +1028,22 @@ def _write_node_ip_address(self, node_ip_address: str) -> None: This API is process-safe, meaning the file access is protected by a file lock. + The file contains a single string node_ip_address. It nothing + is written, --node-ip-address is not given. In this case, Ray + resolves the IP address on its own. It assumes in a single node, + you can have only 1 IP address (which is the assumption ray + has in general). + + node_ip_address is the ip address of the current node. + Args: node_ip_address: The node IP address of the current node. + If None, it means the node ip address is not given + by --node-ip-address. In this case, we don't write + anything to a file. """ assert hasattr(self, "_session_dir") + file_path = Path( os.path.join(self.get_session_dir_path(), "node_ip_address.json") ) @@ -1046,8 +1057,8 @@ def _write_node_ip_address(self, node_ip_address: str) -> None: with file_path.open() as f: cached_node_ip_address.update(json.load(f)) - if self.unique_id not in cached_node_ip_address: - cached_node_ip_address[self.unique_id] = node_ip_address + if node_ip_address is not None: + cached_node_ip_address["node_ip_address"] = node_ip_address with file_path.open(mode="w") as f: json.dump(cached_node_ip_address, f) diff --git a/python/ray/_private/ray_perf.py b/python/ray/_private/ray_perf.py index db2106259b1fd..316f3baeca846 100644 --- a/python/ray/_private/ray_perf.py +++ b/python/ray/_private/ray_perf.py @@ -319,5 +319,6 @@ def placement_group_create_removal(num_pgs): return results + if __name__ == "__main__": main() From 4a0dd442115f6eff70ff53187f51ee715ede3b88 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 22 Aug 2023 20:03:21 +0900 Subject: [PATCH 10/41] working --- python/ray/_private/node.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 8621a13c3fd9b..aab2ff6269efa 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -965,18 +965,16 @@ def _wait_for_node_address(self, timeout_s: int = 60) -> str: time.sleep(1) if i % 10 == 0: logger.info( - f"Can't find a `{NODE_IP_FILE_NAME}` file from " - f"{file_path} or can't the unique id " - f"{self.unique_id} from the file. " + "Can't find a `node_ip_address.json` file from " + f"{self.get_session_dir_path()}. " "Have you started Ray instsance using " "`ray start` or `ray.init`?" ) - if i == MAX_WAIT_S: + if i == timeout_s: raise ValueError( - f"Can't find a `{NODE_IP_FILE_NAME}` file from " - f"{file_path} or can't the unique id " - f"{self.unique_id} from the file " - f"for {MAX_WAIT_S} seconds" + "Can't find a `node_ip_address.json` file from " + f"{self.get_session_dir_path()}. " + f"for {timeout_s} seconds" "It means the ray instance hasn't started. " "Did you do `ray start` or `ray.init` on this host?" ) From f6ee80e43c3c4ff289383544c5eae1e4d71789c4 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 22 Aug 2023 20:16:34 +0900 Subject: [PATCH 11/41] Fixed a broken test. --- python/ray/_private/node.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index aab2ff6269efa..ce09281498291 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -193,16 +193,14 @@ def __init__( if connect_only: node_ip_address = self._wait_for_node_address() else: - node_ip_address = ray._private.services.resolve_ip_for_localhost( - ray_constants.NODE_DEFAULT_IP - ) + node_ip_address = ray.util.get_node_ip_address() assert node_ip_address is not None ray_params.update_if_absent( node_ip_address=node_ip_address, raylet_ip_address=node_ip_address ) - self._node_ip_address = node_ip_address + self._write_node_ip_address(node_ip_address) if ray_params.raylet_ip_address: raylet_ip_address = ray_params.raylet_ip_address @@ -260,7 +258,6 @@ def __init__( self._raylet_socket_name = self._prepare_socket_file( self._ray_params.raylet_socket_name, default_prefix="raylet" ) - self._write_node_ip_address(ray_params.node_ip_address) self.metrics_agent_port = self._get_cached_port( "metrics_agent_port", default_port=ray_params.metrics_agent_port @@ -960,7 +957,7 @@ def _wait_for_node_address(self, timeout_s: int = 60) -> str: node_ip_address = self._get_cached_node_ip_address() if node_ip_address is not None: - break + return node_ip_address time.sleep(1) if i % 10 == 0: @@ -1012,9 +1009,7 @@ def _get_cached_node_ip_address(self) -> str: if "node_ip_address" in cached_node_ip_address: return cached_node_ip_address["node_ip_address"] else: - return ray._private.services.resolve_ip_for_localhost( - ray_constants.NODE_DEFAULT_IP - ) + return ray.util.get_node_ip_address() def _write_node_ip_address(self, node_ip_address: Optional[str]) -> None: """Write a node ip address of the current session to From dba0008ddd1194e1dcbb1a4c356d6eb1e740a04e Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 23 Aug 2023 08:36:57 +0900 Subject: [PATCH 12/41] Fixed the test failure. --- python/ray/tests/test_ray_init_2.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/tests/test_ray_init_2.py b/python/ray/tests/test_ray_init_2.py index d511f4c470c2e..ae98ab4f21853 100644 --- a/python/ray/tests/test_ray_init_2.py +++ b/python/ray/tests/test_ray_init_2.py @@ -284,9 +284,9 @@ def verify(): def test_get_and_write_node_ip_address(shutdown_only): ray.init() node = ray._private.worker.global_worker.node - node_ip = ray._private.services.get_node_ip_address() + node_ip = ray.util.get_node_ip_address() cached_node_ip_address = node._get_cached_node_ip_address() - assert cached_node_ip_address.get(node.unique_id) == node_ip + assert cached_node_ip_address == node_ip @pytest.mark.skipif(sys.platform != "linux", reason="skip except linux") From 77b933c3d94ba79545ad502ed066331036811b71 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 23 Aug 2023 09:43:21 +0900 Subject: [PATCH 13/41] print error messages before assertion --- python/ray/tests/test_gcs_ha_e2e.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tests/test_gcs_ha_e2e.py b/python/ray/tests/test_gcs_ha_e2e.py index e9e64599c15f0..74bd254ed8ad6 100644 --- a/python/ray/tests/test_gcs_ha_e2e.py +++ b/python/ray/tests/test_gcs_ha_e2e.py @@ -16,9 +16,9 @@ def test_ray_nodes_liveness(docker_cluster): def check_alive(n): output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'") - assert output.exit_code == 0 text = output.output.decode().strip().split("\n")[-1] print("Alive nodes: ", text) + assert output.exit_code == 0 return n == int(text) # Make sure two nodes are alive From 302ecd8b84440be27af547f82da57c7a082ad65a Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 23 Aug 2023 19:49:52 +0900 Subject: [PATCH 14/41] ip --- python/ray/tests/test_gcs_ha_e2e.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tests/test_gcs_ha_e2e.py b/python/ray/tests/test_gcs_ha_e2e.py index 74bd254ed8ad6..0f665d9165e58 100644 --- a/python/ray/tests/test_gcs_ha_e2e.py +++ b/python/ray/tests/test_gcs_ha_e2e.py @@ -17,7 +17,7 @@ def test_ray_nodes_liveness(docker_cluster): def check_alive(n): output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'") text = output.output.decode().strip().split("\n")[-1] - print("Alive nodes: ", text) + print("Output: ", output.output.decode().strip().split("\n")) assert output.exit_code == 0 return n == int(text) From cdbf08429fcf2039b28c9bf2d19e2eacb1e76961 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Thu, 24 Aug 2023 03:42:59 +0900 Subject: [PATCH 15/41] more info for debugging. --- python/ray/_private/node.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index ce09281498291..21977a367c7fd 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -953,6 +953,11 @@ def _wait_for_node_address(self, timeout_s: int = 60) -> str: The node_ip_address of the current session if it finds it within timeout_s. """ + logger.error(f"Read file from {self.get_session_dir_path()}") + path = Path(self.get_session_dir_path()) + file_names = [f.name for f in path.iterdir() if f.is_file()] + logger.error(file_names) + for i in range(timeout_s): node_ip_address = self._get_cached_node_ip_address() @@ -967,14 +972,14 @@ def _wait_for_node_address(self, timeout_s: int = 60) -> str: "Have you started Ray instsance using " "`ray start` or `ray.init`?" ) - if i == timeout_s: - raise ValueError( - "Can't find a `node_ip_address.json` file from " - f"{self.get_session_dir_path()}. " - f"for {timeout_s} seconds" - "It means the ray instance hasn't started. " - "Did you do `ray start` or `ray.init` on this host?" - ) + + raise ValueError( + "Can't find a `node_ip_address.json` file from " + f"{self.get_session_dir_path()}. " + f"for {timeout_s} seconds" + "It means the ray instance hasn't started. " + "Did you do `ray start` or `ray.init` on this host?" + ) def _get_cached_node_ip_address(self) -> str: """Get a node address cached on this session. From 55979f0177ad6476ae3be9f8433498cf0999d955 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Thu, 31 Aug 2023 21:30:51 +0900 Subject: [PATCH 16/41] ip --- python/ray/_private/node.py | 7 ++++--- python/ray/tests/conftest_docker.py | 2 ++ python/ray/tests/test_gcs_ha_e2e.py | 4 +++- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 21977a367c7fd..e3243f40f2b55 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -191,7 +191,7 @@ def __init__( node_ip_address = ray_params.node_ip_address if node_ip_address is None: if connect_only: - node_ip_address = self._wait_for_node_address() + node_ip_address = self._wait_and_get_for_node_address() else: node_ip_address = ray.util.get_node_ip_address() @@ -200,7 +200,8 @@ def __init__( node_ip_address=node_ip_address, raylet_ip_address=node_ip_address ) self._node_ip_address = node_ip_address - self._write_node_ip_address(node_ip_address) + if not connect_only: + self._write_node_ip_address(node_ip_address) if ray_params.raylet_ip_address: raylet_ip_address = ray_params.raylet_ip_address @@ -941,7 +942,7 @@ def _get_cached_port( return port - def _wait_for_node_address(self, timeout_s: int = 60) -> str: + def _wait_and_get_for_node_address(self, timeout_s: int = 60) -> str: """Wait until the node_ip_address.json file is avialable. node_ip_address.json is created when a ray instance is started. diff --git a/python/ray/tests/conftest_docker.py b/python/ray/tests/conftest_docker.py index 9120022dbe5b9..e9fbad6b0e3e4 100644 --- a/python/ray/tests/conftest_docker.py +++ b/python/ray/tests/conftest_docker.py @@ -98,6 +98,7 @@ def print_logs(self): ports={ "8000/tcp": None, }, + timeout=120, # volumes={ # "/tmp/ray/": {"bind": "/tmp/ray/", "mode": "rw"} # }, @@ -123,6 +124,7 @@ def print_logs(self): ports={ "8000/tcp": None, }, + timeout=120, # volumes={ # "/tmp/ray/": {"bind": "/tmp/ray/", "mode": "rw"} # }, diff --git a/python/ray/tests/test_gcs_ha_e2e.py b/python/ray/tests/test_gcs_ha_e2e.py index 0f665d9165e58..10a71e55704ec 100644 --- a/python/ray/tests/test_gcs_ha_e2e.py +++ b/python/ray/tests/test_gcs_ha_e2e.py @@ -5,7 +5,7 @@ from ray.tests.conftest_docker import * # noqa -@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") +# @pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") def test_ray_nodes_liveness(docker_cluster): get_nodes_script = """ import ray @@ -18,6 +18,8 @@ def check_alive(n): output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'") text = output.output.decode().strip().split("\n")[-1] print("Output: ", output.output.decode().strip().split("\n")) + print(worker.logs()) + print(head.logs()) assert output.exit_code == 0 return n == int(text) From 4dbb1af65b51ee505bcbc345b0ea88251530a7a9 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Thu, 31 Aug 2023 21:34:45 +0900 Subject: [PATCH 17/41] ip --- python/ray/_private/node.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index e3243f40f2b55..2ad8787dcfde9 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -201,6 +201,7 @@ def __init__( ) self._node_ip_address = node_ip_address if not connect_only: + print("Writing node ip address, ", node_ip_address) self._write_node_ip_address(node_ip_address) if ray_params.raylet_ip_address: From 32de899609459b13004639cb3573e218b256ae8d Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Thu, 31 Aug 2023 21:39:47 +0900 Subject: [PATCH 18/41] ip --- python/ray/_private/node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 2ad8787dcfde9..e18b8ca089578 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -201,7 +201,7 @@ def __init__( ) self._node_ip_address = node_ip_address if not connect_only: - print("Writing node ip address, ", node_ip_address) + print("Writing node ip address to a session, ", node_ip_address, self._session_dir) self._write_node_ip_address(node_ip_address) if ray_params.raylet_ip_address: From 632442a1506e70dfe8dcb72f1332c3238bf95906 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Thu, 31 Aug 2023 21:47:10 +0900 Subject: [PATCH 19/41] ip --- python/ray/_private/node.py | 4 ++-- python/ray/tests/conftest_docker.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index e18b8ca089578..387f62bc71816 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -978,8 +978,8 @@ def _wait_and_get_for_node_address(self, timeout_s: int = 60) -> str: raise ValueError( "Can't find a `node_ip_address.json` file from " f"{self.get_session_dir_path()}. " - f"for {timeout_s} seconds" - "It means the ray instance hasn't started. " + f"for {timeout_s} seconds. " + "A ray instance hasn't started. " "Did you do `ray start` or `ray.init` on this host?" ) diff --git a/python/ray/tests/conftest_docker.py b/python/ray/tests/conftest_docker.py index e9fbad6b0e3e4..62f4fe3a57f73 100644 --- a/python/ray/tests/conftest_docker.py +++ b/python/ray/tests/conftest_docker.py @@ -92,7 +92,7 @@ def print_logs(self): "--node-manager-port", "9379", ], - volumes={"{head_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, + volumes={"/tmp": {"bind": "/tmp", "mode": "rw"}}, environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"}, wrapper_class=Container, ports={ @@ -118,7 +118,7 @@ def print_logs(self): "--node-manager-port", "9379", ], - volumes={"{worker_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, + volumes={"/tmp": {"bind": "/tmp", "mode": "rw"}}, environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"}, wrapper_class=Container, ports={ From 1705ec43fc2098db7eb415a4215714a324210104 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Thu, 31 Aug 2023 23:58:56 +0900 Subject: [PATCH 20/41] remove bind --- python/ray/tests/conftest_docker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/tests/conftest_docker.py b/python/ray/tests/conftest_docker.py index 62f4fe3a57f73..e9fbad6b0e3e4 100644 --- a/python/ray/tests/conftest_docker.py +++ b/python/ray/tests/conftest_docker.py @@ -92,7 +92,7 @@ def print_logs(self): "--node-manager-port", "9379", ], - volumes={"/tmp": {"bind": "/tmp", "mode": "rw"}}, + volumes={"{head_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"}, wrapper_class=Container, ports={ @@ -118,7 +118,7 @@ def print_logs(self): "--node-manager-port", "9379", ], - volumes={"/tmp": {"bind": "/tmp", "mode": "rw"}}, + volumes={"{worker_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"}, wrapper_class=Container, ports={ From cbff14ff2d626df03833e93d3dc991fccc26686c Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 1 Sep 2023 08:31:08 +0900 Subject: [PATCH 21/41] try fixing it. --- python/ray/_private/node.py | 10 +++++++--- python/ray/tests/test_gcs_ha_e2e.py | 6 +++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 387f62bc71816..4dc18dea2923c 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -201,7 +201,11 @@ def __init__( ) self._node_ip_address = node_ip_address if not connect_only: - print("Writing node ip address to a session, ", node_ip_address, self._session_dir) + print( + "Writing node ip address to a session, ", + node_ip_address, + self._session_dir, + ) self._write_node_ip_address(node_ip_address) if ray_params.raylet_ip_address: @@ -955,10 +959,10 @@ def _wait_and_get_for_node_address(self, timeout_s: int = 60) -> str: The node_ip_address of the current session if it finds it within timeout_s. """ - logger.error(f"Read file from {self.get_session_dir_path()}") + # logger.error(f"Read file from {self.get_session_dir_path()}") path = Path(self.get_session_dir_path()) file_names = [f.name for f in path.iterdir() if f.is_file()] - logger.error(file_names) + # logger.error(file_names) for i in range(timeout_s): node_ip_address = self._get_cached_node_ip_address() diff --git a/python/ray/tests/test_gcs_ha_e2e.py b/python/ray/tests/test_gcs_ha_e2e.py index 10a71e55704ec..fc1368144aa8b 100644 --- a/python/ray/tests/test_gcs_ha_e2e.py +++ b/python/ray/tests/test_gcs_ha_e2e.py @@ -15,11 +15,11 @@ def test_ray_nodes_liveness(docker_cluster): head, worker = docker_cluster def check_alive(n): - output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'") + output = head.exec_run(cmd=f"python -c '{get_nodes_script}'") text = output.output.decode().strip().split("\n")[-1] print("Output: ", output.output.decode().strip().split("\n")) - print(worker.logs()) - print(head.logs()) + # print(worker.logs()) + # print(head.logs()) assert output.exit_code == 0 return n == int(text) From 310761934fbdaec83e5e1089ac088de903458c8a Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 1 Sep 2023 08:48:05 +0900 Subject: [PATCH 22/41] remove print --- python/ray/_private/node.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 4dc18dea2923c..b0db00a3de4db 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -201,11 +201,6 @@ def __init__( ) self._node_ip_address = node_ip_address if not connect_only: - print( - "Writing node ip address to a session, ", - node_ip_address, - self._session_dir, - ) self._write_node_ip_address(node_ip_address) if ray_params.raylet_ip_address: From bb8e1f616fe28e95295b95cfdb71d2527f5d618d Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 1 Sep 2023 08:46:31 +0900 Subject: [PATCH 23/41] Work around. --- python/ray/_private/node.py | 17 ++++++++--------- python/ray/scripts/scripts.py | 14 ++++++++++++++ python/ray/tests/conftest_docker.py | 5 +++++ python/ray/tests/test_gcs_ha_e2e.py | 2 +- 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index b0db00a3de4db..6a1d5492d4d4b 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -155,12 +155,14 @@ def __init__( self._init_gcs_client() # Register the temp dir. - if head: - # date including microsecond - date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f") - self._session_name = f"session_{date_str}_{os.getpid()}" - else: - if ray_params.session_name is None: + self._session_name = ray_params.session_name + + if self._session_name is None: + if head: + # date including microsecond + date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f") + self._session_name = f"session_{date_str}_{os.getpid()}" + else: assert not self._default_worker session_name = ray._private.utils.internal_kv_get_with_retry( self.get_gcs_client(), @@ -169,9 +171,6 @@ def __init__( num_retries=ray_constants.NUM_REDIS_GET_RETRIES, ) self._session_name = ray._private.utils.decode(session_name) - else: - # worker mode - self._session_name = ray_params.session_name # Initialize webui url if head: diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 8b40537684d2c..feeb26e1d5413 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -537,6 +537,14 @@ def debug(address): type=str, help="a JSON serialized dictionary mapping label name to label value.", ) +@click.option( + "--session-name", + required=False, + hidden=False, + default=None, + type=str, + help="The session name of the cluster.", +) @add_click_logging_options @PublicAPI def start( @@ -582,6 +590,7 @@ def start( ray_debugger_external, disable_usage_stats, labels, + session_name, ): """Start Ray processes manually on the local machine.""" @@ -634,6 +643,9 @@ def start( if has_ray_client and ray_client_server_port is None: ray_client_server_port = 10001 + if session_name: + session_name = f"session_{session_name}" + ray_params = ray._private.parameter.RayParams( node_ip_address=node_ip_address, node_name=node_name if node_name else node_ip_address, @@ -670,7 +682,9 @@ def start( no_monitor=no_monitor, tracing_startup_hook=tracing_startup_hook, ray_debugger_external=ray_debugger_external, + session_name=session_name, ) + print("SANG-TODO 1", ray_params.session_name) if ray_constants.RAY_START_HOOK in os.environ: _load_class(os.environ[ray_constants.RAY_START_HOOK])(ray_params, head) diff --git a/python/ray/tests/conftest_docker.py b/python/ray/tests/conftest_docker.py index e9fbad6b0e3e4..78089fb71a5f1 100644 --- a/python/ray/tests/conftest_docker.py +++ b/python/ray/tests/conftest_docker.py @@ -76,6 +76,9 @@ def print_logs(self): head_node_vol = volume() worker_node_vol = volume() head_node_container_name = "gcs" + str(int(time.time())) +date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f") +session_name = f"session_{date_str}_{os.getpid()}" + head_node = container( image="ray_ci:v1", name=head_node_container_name, @@ -91,6 +94,8 @@ def print_logs(self): # ip:port is treated as a different raylet. "--node-manager-port", "9379", + "--session-name", + session_name, ], volumes={"{head_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"}, diff --git a/python/ray/tests/test_gcs_ha_e2e.py b/python/ray/tests/test_gcs_ha_e2e.py index fc1368144aa8b..96e24eebbd043 100644 --- a/python/ray/tests/test_gcs_ha_e2e.py +++ b/python/ray/tests/test_gcs_ha_e2e.py @@ -15,7 +15,7 @@ def test_ray_nodes_liveness(docker_cluster): head, worker = docker_cluster def check_alive(n): - output = head.exec_run(cmd=f"python -c '{get_nodes_script}'") + output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'") text = output.output.decode().strip().split("\n")[-1] print("Output: ", output.output.decode().strip().split("\n")) # print(worker.logs()) From eeb36109496467ed2ddd1961e74981e3ec84e939 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 1 Sep 2023 08:47:46 +0900 Subject: [PATCH 24/41] . --- python/ray/tests/conftest_docker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/tests/conftest_docker.py b/python/ray/tests/conftest_docker.py index 78089fb71a5f1..29090199b704c 100644 --- a/python/ray/tests/conftest_docker.py +++ b/python/ray/tests/conftest_docker.py @@ -1,4 +1,6 @@ import time +import os +import datetime import pytest from pytest_docker_tools import container, fetch, network, volume from pytest_docker_tools import wrappers From 484c68aa894d2e06a9c38b4ff2aaf034bd3210a6 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 1 Sep 2023 13:57:01 +0900 Subject: [PATCH 25/41] Revert --- python/ray/scripts/scripts.py | 14 -------------- python/ray/tests/conftest_docker.py | 2 -- 2 files changed, 16 deletions(-) diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index feeb26e1d5413..8b40537684d2c 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -537,14 +537,6 @@ def debug(address): type=str, help="a JSON serialized dictionary mapping label name to label value.", ) -@click.option( - "--session-name", - required=False, - hidden=False, - default=None, - type=str, - help="The session name of the cluster.", -) @add_click_logging_options @PublicAPI def start( @@ -590,7 +582,6 @@ def start( ray_debugger_external, disable_usage_stats, labels, - session_name, ): """Start Ray processes manually on the local machine.""" @@ -643,9 +634,6 @@ def start( if has_ray_client and ray_client_server_port is None: ray_client_server_port = 10001 - if session_name: - session_name = f"session_{session_name}" - ray_params = ray._private.parameter.RayParams( node_ip_address=node_ip_address, node_name=node_name if node_name else node_ip_address, @@ -682,9 +670,7 @@ def start( no_monitor=no_monitor, tracing_startup_hook=tracing_startup_hook, ray_debugger_external=ray_debugger_external, - session_name=session_name, ) - print("SANG-TODO 1", ray_params.session_name) if ray_constants.RAY_START_HOOK in os.environ: _load_class(os.environ[ray_constants.RAY_START_HOOK])(ray_params, head) diff --git a/python/ray/tests/conftest_docker.py b/python/ray/tests/conftest_docker.py index 29090199b704c..293f5a711cdcf 100644 --- a/python/ray/tests/conftest_docker.py +++ b/python/ray/tests/conftest_docker.py @@ -96,8 +96,6 @@ def print_logs(self): # ip:port is treated as a different raylet. "--node-manager-port", "9379", - "--session-name", - session_name, ], volumes={"{head_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"}, From 748ddf8791d9284209305e492716f5ea60fe0c46 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 1 Sep 2023 16:48:46 +0900 Subject: [PATCH 26/41] Wokrs not --- ...atic-ray-cluster.with-fault-tolerance.yaml | 3 + .../static-ray-cluster-without-kuberay.md | 7 ++- doc/source/ray-core/fault_tolerance/gcs.rst | 17 ++++-- python/ray/_private/node.py | 13 ++++- python/ray/tests/BUILD | 1 + python/ray/tests/conftest_docker.py | 11 ++-- python/ray/tests/test_gcs_fault_tolerance.py | 26 +++++++++ python/ray/tests/test_gcs_ha_e2e.py | 4 +- python/ray/tests/test_gcs_ha_e2e_2.py | 55 +++++++++++++++++++ python/ray/tests/test_ray_init.py | 22 ++++++++ 10 files changed, 144 insertions(+), 15 deletions(-) create mode 100644 python/ray/tests/test_gcs_ha_e2e_2.py diff --git a/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml b/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml index 84b46c36f5f5f..619fea139e68e 100644 --- a/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml +++ b/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml @@ -148,6 +148,9 @@ spec: # This is used in the ray start command so that Ray can spawn the # correct number of processes. Omitting this may lead to degraded # performance. + - name: RAY_external_storage_namespace + value: # . RAY_external_storage_namespace is used to isolate + # the data stored in Redis. - name: MY_CPU_REQUEST valueFrom: resourceFieldRef: diff --git a/doc/source/cluster/kubernetes/user-guides/static-ray-cluster-without-kuberay.md b/doc/source/cluster/kubernetes/user-guides/static-ray-cluster-without-kuberay.md index d5530ecb6a190..db5eedb253b71 100644 --- a/doc/source/cluster/kubernetes/user-guides/static-ray-cluster-without-kuberay.md +++ b/doc/source/cluster/kubernetes/user-guides/static-ray-cluster-without-kuberay.md @@ -120,7 +120,12 @@ metadata. One drawback of this approach is that the head node loses the metadata Ray can also write this metadata to an external Redis for reliability and high availability. With this setup, the static Ray cluster can recover from head node crashes and tolerate GCS failures without losing connections to worker nodes. -To use this feature, we need to pass in the `RAY_REDIS_ADDRESS` env var and `--redis-password` in the Ray head node section of [the Kubernetes deployment config file](https://raw.githubusercontent.com/ray-project/ray/master/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml). +To use this feature, we need to pass in the `RAY_REDIS_ADDRESS` env var, `RAY_external_storage_namespace`, and `--redis-password` in the Ray head node section of [the Kubernetes deployment config file](https://raw.githubusercontent.com/ray-project/ray/master/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml). + +`RAY_external_storage_namespace` is used to isolate the data stored in Redis. +This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance. +`RAY_external_storage_namespace` must be an unique ID, and whenever you restart a head node, +it should be the same. ## Running Applications on the static Ray Cluster diff --git a/doc/source/ray-core/fault_tolerance/gcs.rst b/doc/source/ray-core/fault_tolerance/gcs.rst index 9f995e518a803..6b6c1026cee13 100644 --- a/doc/source/ray-core/fault_tolerance/gcs.rst +++ b/doc/source/ray-core/fault_tolerance/gcs.rst @@ -34,19 +34,29 @@ Setting up Redis set the OS environment ``RAY_REDIS_ADDRESS`` to the Redis address, and supply the ``--redis-password`` flag with the password when calling ``ray start``: + You should also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis. + This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance. + ``RAY_external_storage_namespace`` must be an unique ID, and whenever you restart a head node, + it should be the same. + .. code-block:: shell - RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD + RAY_external_storage_namespace= RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD .. tab-item:: ray up If you are using :ref:`ray up ` to start the Ray cluster, change :ref:`head_start_ray_commands ` field to add ``RAY_REDIS_ADDRESS`` and ``--redis-password`` to the ``ray start`` command: + You should also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis. + This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance. + ``RAY_external_storage_namespace`` must be an unique ID, and whenever you restart a head node, + it should be the same. + .. code-block:: yaml head_start_ray_commands: - ray stop - - ulimit -n 65536; RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --dashboard-host=0.0.0.0 + - ulimit -n 65536; RAY_external_storage_namespace= RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --dashboard-host=0.0.0.0 .. tab-item:: Kubernetes @@ -60,9 +70,6 @@ If the raylet fails to reconnect to the GCS for more than 60 seconds, the raylet will exit and the corresponding node fails. This timeout threshold can be tuned by the OS environment variable ``RAY_gcs_rpc_server_reconnect_timeout_s``. -You can also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis. -This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance. - If the IP address of GCS will change after restarts, it's better to use a qualified domain name and pass it to all raylets at start time. Raylet will resolve the domain name and connect to the correct GCS. You need to ensure that at any time, only one GCS is alive. diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 6a1d5492d4d4b..1974a8665826d 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -97,6 +97,16 @@ def __init__( [primary_redis_ip, port] = external_redis[0].rsplit(":", 1) ray_params.external_addresses = external_redis ray_params.num_redis_shards = len(external_redis) - 1 + storage_namespace = os.environ.get("RAY_external_storage_namespace") + if storage_namespace is None: + raise ValueError( + "RAY_external_storage_namespace must be provided " + "when using Ray with external Redis for the fault tolerance. " + "RAY_external_storage_namespace must be an unique ID " + "and has to be the same across the same head node." + ) + if ray_params.session_name is None: + ray_params.update_if_absent(session_name=f"session_{storage_namespace}") if ( ray_params._system_config @@ -1304,9 +1314,10 @@ def _write_cluster_info_to_kv(self): self.get_gcs_client().internal_kv_put( b"session_name", self._session_name.encode(), - True, + False, ray_constants.KV_NAMESPACE_SESSION, ) + self.get_gcs_client().internal_kv_put( b"session_dir", self._session_dir.encode(), diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 3f589089f83eb..e25ddee607c52 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -224,6 +224,7 @@ py_test_module_list( py_test_module_list( files = [ "test_gcs_ha_e2e.py", + "test_gcs_ha_e2e_2.py", "test_memory_pressure.py", "test_node_labels.py", ], diff --git a/python/ray/tests/conftest_docker.py b/python/ray/tests/conftest_docker.py index 293f5a711cdcf..b1ef48111bd7c 100644 --- a/python/ray/tests/conftest_docker.py +++ b/python/ray/tests/conftest_docker.py @@ -1,6 +1,5 @@ import time -import os -import datetime +import uuid import pytest from pytest_docker_tools import container, fetch, network, volume from pytest_docker_tools import wrappers @@ -78,8 +77,7 @@ def print_logs(self): head_node_vol = volume() worker_node_vol = volume() head_node_container_name = "gcs" + str(int(time.time())) -date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f") -session_name = f"session_{date_str}_{os.getpid()}" +external_storage_namespace = str(uuid.uuid4()) head_node = container( image="ray_ci:v1", @@ -98,7 +96,10 @@ def print_logs(self): "9379", ], volumes={"{head_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, - environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"}, + environment={ + "RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379", + "RAY_external_storage_namespace": external_storage_namespace, + }, wrapper_class=Container, ports={ "8000/tcp": None, diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index dea003d5631ef..99abe129cf35b 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -917,6 +917,32 @@ def check_raylet_healthy(): wait_for_condition(lambda: not check_raylet_healthy()) +@pytest.mark.parametrize( + "ray_start_regular_with_external_redis", + [ + generate_system_config_map( + gcs_failover_worker_reconnect_timeout=20, + gcs_rpc_server_reconnect_timeout_s=60, + gcs_server_request_timeout_seconds=10, + raylet_liveness_self_check_interval_ms=3000, + ) + ], + indirect=True, +) +def test_session_dir_preserved(ray_start_regular_with_external_redis): + storage_namespace = os.getenv("RAY_external_storage_namespace") + session_name = ray._private.worker._global_node.session_name + assert session_name == f"session_{storage_namespace}" + ray._private.worker._global_node.kill_gcs_server() + # Start GCS + ray._private.worker._global_node.start_gcs_server() + ray.shutdown() + ray.init() + storage_namespace = os.getenv("RAY_external_storage_namespace") + session_name = ray._private.worker._global_node.session_name + assert session_name == f"session_{storage_namespace}" + + if __name__ == "__main__": import pytest diff --git a/python/ray/tests/test_gcs_ha_e2e.py b/python/ray/tests/test_gcs_ha_e2e.py index 96e24eebbd043..0f665d9165e58 100644 --- a/python/ray/tests/test_gcs_ha_e2e.py +++ b/python/ray/tests/test_gcs_ha_e2e.py @@ -5,7 +5,7 @@ from ray.tests.conftest_docker import * # noqa -# @pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") +@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") def test_ray_nodes_liveness(docker_cluster): get_nodes_script = """ import ray @@ -18,8 +18,6 @@ def check_alive(n): output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'") text = output.output.decode().strip().split("\n")[-1] print("Output: ", output.output.decode().strip().split("\n")) - # print(worker.logs()) - # print(head.logs()) assert output.exit_code == 0 return n == int(text) diff --git a/python/ray/tests/test_gcs_ha_e2e_2.py b/python/ray/tests/test_gcs_ha_e2e_2.py new file mode 100644 index 0000000000000..380f527be1719 --- /dev/null +++ b/python/ray/tests/test_gcs_ha_e2e_2.py @@ -0,0 +1,55 @@ +import pytest +import sys +from time import sleep +from ray._private.test_utils import wait_for_condition +from ray.tests.conftest_docker import * # noqa + + +@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") +def test_ray_session_name_preserved(docker_cluster): + get_nodes_script = """ +import ray +ray.init("auto") +print(ray._private.worker._global_node.session_name) +""" + head, worker = docker_cluster + + def get_session_name(to_head=True): + if to_head: + output = head.exec_run(cmd=f"python -c '{get_nodes_script}'") + else: + output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'") + session_name = output.output.decode().strip().split("\n")[-1] + print("Output: ", output.output.decode().strip().split("\n")) + assert output.exit_code == 0 + return session_name + + # Make sure two nodes are alive + wait_for_condition(get_session_name, to_head=True) + session_name_head = get_session_name(to_head=True) + wait_for_condition(get_session_name, to_head=False) + session_name_worker = get_session_name(to_head=False) + assert session_name_head == session_name_worker + print("head killed") + head.kill() + + sleep(2) + + head.restart() + + wait_for_condition(get_session_name, to_head=True) + session_name_head_after_restart = get_session_name(to_head=True) + wait_for_condition(get_session_name, to_head=False) + session_name_worker_after_restart = get_session_name(to_head=False) + assert session_name_worker_after_restart == session_name_head_after_restart + assert session_name_head == session_name_head_after_restart + assert session_name_worker_after_restart == session_name_worker + + +if __name__ == "__main__": + import os + + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/test_ray_init.py b/python/ray/tests/test_ray_init.py index 38e351a8a025b..be73a6e76f4f4 100644 --- a/python/ray/tests/test_ray_init.py +++ b/python/ray/tests/test_ray_init.py @@ -234,6 +234,28 @@ def test_ray_init_using_hostname(ray_start_cluster): assert node_table[0].get("NodeManagerHostname", "") == hostname +def test_new_ray_instance_new_session_dir(shutdown_only): + ray.init() + session_dir = ray._private.worker._global_node.get_session_dir_path() + ray.shutdown() + ray.init() + assert ray._private.worker._global_node.get_session_dir_path() != session_dir + + +def test_new_cluster_new_session_dir(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node() + ray.init(address=cluster.address) + session_dir = ray._private.worker._global_node.get_session_dir_path() + ray.shutdown() + cluster.shutdown() + cluster.add_node() + ray.init(address=cluster.address) + assert ray._private.worker._global_node.get_session_dir_path() != session_dir + ray.shutdown() + cluster.shutdown() + + if __name__ == "__main__": import sys From f35a16531caa0c7275ce5fb6451a9f4f39607692 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 1 Sep 2023 17:20:46 +0900 Subject: [PATCH 27/41] Fix failed ha tests. --- python/ray/_private/node.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 1974a8665826d..560faf960e309 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -98,7 +98,7 @@ def __init__( ray_params.external_addresses = external_redis ray_params.num_redis_shards = len(external_redis) - 1 storage_namespace = os.environ.get("RAY_external_storage_namespace") - if storage_namespace is None: + if head and storage_namespace is None: raise ValueError( "RAY_external_storage_namespace must be provided " "when using Ray with external Redis for the fault tolerance. " @@ -963,11 +963,6 @@ def _wait_and_get_for_node_address(self, timeout_s: int = 60) -> str: The node_ip_address of the current session if it finds it within timeout_s. """ - # logger.error(f"Read file from {self.get_session_dir_path()}") - path = Path(self.get_session_dir_path()) - file_names = [f.name for f in path.iterdir() if f.is_file()] - # logger.error(file_names) - for i in range(timeout_s): node_ip_address = self._get_cached_node_ip_address() From 01e492a3205320b8ae9598f553cdebf9062294bb Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 1 Sep 2023 19:29:57 +0900 Subject: [PATCH 28/41] fix some tests. --- python/ray/_private/node.py | 3 ++- python/ray/tests/test_advanced_9.py | 2 ++ python/ray/tests/test_multi_node_3.py | 3 +++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 560faf960e309..c54f9523c9730 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -101,7 +101,8 @@ def __init__( if head and storage_namespace is None: raise ValueError( "RAY_external_storage_namespace must be provided " - "when using Ray with external Redis for the fault tolerance. " + "when using Ray with external Redis via `RAY_REDIS_ADDRESS` " + " for the fault tolerance. " "RAY_external_storage_namespace must be an unique ID " "and has to be the same across the same head node." ) diff --git a/python/ray/tests/test_advanced_9.py b/python/ray/tests/test_advanced_9.py index a4ba35d1756d9..2b06d039fe6b7 100644 --- a/python/ray/tests/test_advanced_9.py +++ b/python/ray/tests/test_advanced_9.py @@ -1,5 +1,6 @@ import sys import time +import uuid import pytest @@ -357,6 +358,7 @@ def check_demands(n): def test_redis_not_available(monkeypatch, call_ray_stop_only): monkeypatch.setenv("RAY_NUM_REDIS_GET_RETRIES", "2") monkeypatch.setenv("RAY_REDIS_ADDRESS", "localhost:12345") + monkeypatch.setenv("RAY_external_storage_namespace", str(uuid.uuid4())) p = subprocess.run( "ray start --head", shell=True, diff --git a/python/ray/tests/test_multi_node_3.py b/python/ray/tests/test_multi_node_3.py index 7f602ed863dc4..7dec623e0373e 100644 --- a/python/ray/tests/test_multi_node_3.py +++ b/python/ray/tests/test_multi_node_3.py @@ -2,6 +2,7 @@ import os import subprocess import sys +import uuid from pathlib import Path import psutil @@ -135,10 +136,12 @@ def test_calling_start_ray_head(call_ray_stop_only): temp_dir, 8888, password=ray_constants.REDIS_DEFAULT_PASSWORD ) os.environ["RAY_REDIS_ADDRESS"] = "127.0.0.1:8888" + os.environ["RAY_external_storage_namespace"] = str(uuid.uuid4()) check_call_ray(["start", "--head"]) check_call_ray(["stop"]) proc.process.terminate() del os.environ["RAY_REDIS_ADDRESS"] + del os.environ["RAY_external_storage_namespace"] # Test --block. Killing a child process should cause the command to exit. blocked = subprocess.Popen(["ray", "start", "--head", "--block", "--port", "0"]) From 8a14bccf397673ffdeace3cc8632747fac4f8d23 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 1 Sep 2023 19:32:13 +0900 Subject: [PATCH 29/41] done --- python/ray/_private/node.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index c54f9523c9730..eafa4e38975fc 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -98,16 +98,19 @@ def __init__( ray_params.external_addresses = external_redis ray_params.num_redis_shards = len(external_redis) - 1 storage_namespace = os.environ.get("RAY_external_storage_namespace") - if head and storage_namespace is None: - raise ValueError( - "RAY_external_storage_namespace must be provided " - "when using Ray with external Redis via `RAY_REDIS_ADDRESS` " - " for the fault tolerance. " - "RAY_external_storage_namespace must be an unique ID " - "and has to be the same across the same head node." - ) - if ray_params.session_name is None: - ray_params.update_if_absent(session_name=f"session_{storage_namespace}") + if head: + if storage_namespace is None: + raise ValueError( + "RAY_external_storage_namespace must be provided " + "when using Ray with external Redis via `RAY_REDIS_ADDRESS` " + " for the fault tolerance. " + "RAY_external_storage_namespace must be an unique ID " + "and has to be the same across the same head node." + ) + # Update the session name to be RAY_external_storage_namespace. + if ray_params.session_name is None: + ray_params.update_if_absent( + session_name=f"session_{storage_namespace}") if ( ray_params._system_config From cd2b44d0dc0290190ba9d4757b7a2ca01107074a Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 1 Sep 2023 21:44:31 +0900 Subject: [PATCH 30/41] maybe working? --- python/ray/tests/conftest_docker.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/python/ray/tests/conftest_docker.py b/python/ray/tests/conftest_docker.py index b1ef48111bd7c..29b39c0f6b290 100644 --- a/python/ray/tests/conftest_docker.py +++ b/python/ray/tests/conftest_docker.py @@ -99,6 +99,8 @@ def print_logs(self): environment={ "RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379", "RAY_external_storage_namespace": external_storage_namespace, + "RAY_raylet_client_num_connect_attempts": "10", + "RAY_raylet_client_connect_timeout_milliseconds": "100", }, wrapper_class=Container, ports={ @@ -125,7 +127,11 @@ def print_logs(self): "9379", ], volumes={"{worker_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, - environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"}, + environment={ + "RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379", + "RAY_raylet_client_num_connect_attempts": "10", + "RAY_raylet_client_connect_timeout_milliseconds": "100", + }, wrapper_class=Container, ports={ "8000/tcp": None, From 9330cc61b657065d1c5113643b0f682e95089ed7 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Sat, 2 Sep 2023 01:14:48 +0900 Subject: [PATCH 31/41] Revert "maybe working?" This reverts commit cd2b44d0dc0290190ba9d4757b7a2ca01107074a. --- python/ray/tests/conftest_docker.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/python/ray/tests/conftest_docker.py b/python/ray/tests/conftest_docker.py index 29b39c0f6b290..b1ef48111bd7c 100644 --- a/python/ray/tests/conftest_docker.py +++ b/python/ray/tests/conftest_docker.py @@ -99,8 +99,6 @@ def print_logs(self): environment={ "RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379", "RAY_external_storage_namespace": external_storage_namespace, - "RAY_raylet_client_num_connect_attempts": "10", - "RAY_raylet_client_connect_timeout_milliseconds": "100", }, wrapper_class=Container, ports={ @@ -127,11 +125,7 @@ def print_logs(self): "9379", ], volumes={"{worker_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, - environment={ - "RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379", - "RAY_raylet_client_num_connect_attempts": "10", - "RAY_raylet_client_connect_timeout_milliseconds": "100", - }, + environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"}, wrapper_class=Container, ports={ "8000/tcp": None, From e32d53820e6614018fc740ac6d0f865ceee37ad3 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Sat, 2 Sep 2023 01:14:49 +0900 Subject: [PATCH 32/41] Revert "done" This reverts commit 8a14bccf397673ffdeace3cc8632747fac4f8d23. --- python/ray/_private/node.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index eafa4e38975fc..c54f9523c9730 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -98,19 +98,16 @@ def __init__( ray_params.external_addresses = external_redis ray_params.num_redis_shards = len(external_redis) - 1 storage_namespace = os.environ.get("RAY_external_storage_namespace") - if head: - if storage_namespace is None: - raise ValueError( - "RAY_external_storage_namespace must be provided " - "when using Ray with external Redis via `RAY_REDIS_ADDRESS` " - " for the fault tolerance. " - "RAY_external_storage_namespace must be an unique ID " - "and has to be the same across the same head node." - ) - # Update the session name to be RAY_external_storage_namespace. - if ray_params.session_name is None: - ray_params.update_if_absent( - session_name=f"session_{storage_namespace}") + if head and storage_namespace is None: + raise ValueError( + "RAY_external_storage_namespace must be provided " + "when using Ray with external Redis via `RAY_REDIS_ADDRESS` " + " for the fault tolerance. " + "RAY_external_storage_namespace must be an unique ID " + "and has to be the same across the same head node." + ) + if ray_params.session_name is None: + ray_params.update_if_absent(session_name=f"session_{storage_namespace}") if ( ray_params._system_config From 0e73a7e6b8e8ee1b325677f84f4bc99f3fb0b42e Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Sat, 2 Sep 2023 01:14:51 +0900 Subject: [PATCH 33/41] Revert "fix some tests." This reverts commit 01e492a3205320b8ae9598f553cdebf9062294bb. --- python/ray/_private/node.py | 3 +-- python/ray/tests/test_advanced_9.py | 2 -- python/ray/tests/test_multi_node_3.py | 3 --- 3 files changed, 1 insertion(+), 7 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index c54f9523c9730..560faf960e309 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -101,8 +101,7 @@ def __init__( if head and storage_namespace is None: raise ValueError( "RAY_external_storage_namespace must be provided " - "when using Ray with external Redis via `RAY_REDIS_ADDRESS` " - " for the fault tolerance. " + "when using Ray with external Redis for the fault tolerance. " "RAY_external_storage_namespace must be an unique ID " "and has to be the same across the same head node." ) diff --git a/python/ray/tests/test_advanced_9.py b/python/ray/tests/test_advanced_9.py index 2b06d039fe6b7..a4ba35d1756d9 100644 --- a/python/ray/tests/test_advanced_9.py +++ b/python/ray/tests/test_advanced_9.py @@ -1,6 +1,5 @@ import sys import time -import uuid import pytest @@ -358,7 +357,6 @@ def check_demands(n): def test_redis_not_available(monkeypatch, call_ray_stop_only): monkeypatch.setenv("RAY_NUM_REDIS_GET_RETRIES", "2") monkeypatch.setenv("RAY_REDIS_ADDRESS", "localhost:12345") - monkeypatch.setenv("RAY_external_storage_namespace", str(uuid.uuid4())) p = subprocess.run( "ray start --head", shell=True, diff --git a/python/ray/tests/test_multi_node_3.py b/python/ray/tests/test_multi_node_3.py index 7dec623e0373e..7f602ed863dc4 100644 --- a/python/ray/tests/test_multi_node_3.py +++ b/python/ray/tests/test_multi_node_3.py @@ -2,7 +2,6 @@ import os import subprocess import sys -import uuid from pathlib import Path import psutil @@ -136,12 +135,10 @@ def test_calling_start_ray_head(call_ray_stop_only): temp_dir, 8888, password=ray_constants.REDIS_DEFAULT_PASSWORD ) os.environ["RAY_REDIS_ADDRESS"] = "127.0.0.1:8888" - os.environ["RAY_external_storage_namespace"] = str(uuid.uuid4()) check_call_ray(["start", "--head"]) check_call_ray(["stop"]) proc.process.terminate() del os.environ["RAY_REDIS_ADDRESS"] - del os.environ["RAY_external_storage_namespace"] # Test --block. Killing a child process should cause the command to exit. blocked = subprocess.Popen(["ray", "start", "--head", "--block", "--port", "0"]) From b3d590ab60e4487cfda87ce5a1295c8122e6663b Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Sat, 2 Sep 2023 01:14:51 +0900 Subject: [PATCH 34/41] Revert "Fix failed ha tests." This reverts commit f35a16531caa0c7275ce5fb6451a9f4f39607692. --- python/ray/_private/node.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 560faf960e309..1974a8665826d 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -98,7 +98,7 @@ def __init__( ray_params.external_addresses = external_redis ray_params.num_redis_shards = len(external_redis) - 1 storage_namespace = os.environ.get("RAY_external_storage_namespace") - if head and storage_namespace is None: + if storage_namespace is None: raise ValueError( "RAY_external_storage_namespace must be provided " "when using Ray with external Redis for the fault tolerance. " @@ -963,6 +963,11 @@ def _wait_and_get_for_node_address(self, timeout_s: int = 60) -> str: The node_ip_address of the current session if it finds it within timeout_s. """ + # logger.error(f"Read file from {self.get_session_dir_path()}") + path = Path(self.get_session_dir_path()) + file_names = [f.name for f in path.iterdir() if f.is_file()] + # logger.error(file_names) + for i in range(timeout_s): node_ip_address = self._get_cached_node_ip_address() From 5a4123777d803bf9787ec526dbae7610ef8e8fe0 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Sat, 2 Sep 2023 01:14:51 +0900 Subject: [PATCH 35/41] Revert "Wokrs not" This reverts commit 748ddf8791d9284209305e492716f5ea60fe0c46. --- ...atic-ray-cluster.with-fault-tolerance.yaml | 3 - .../static-ray-cluster-without-kuberay.md | 7 +-- doc/source/ray-core/fault_tolerance/gcs.rst | 17 ++---- python/ray/_private/node.py | 13 +---- python/ray/tests/BUILD | 1 - python/ray/tests/conftest_docker.py | 11 ++-- python/ray/tests/test_gcs_fault_tolerance.py | 26 --------- python/ray/tests/test_gcs_ha_e2e.py | 4 +- python/ray/tests/test_gcs_ha_e2e_2.py | 55 ------------------- python/ray/tests/test_ray_init.py | 22 -------- 10 files changed, 15 insertions(+), 144 deletions(-) delete mode 100644 python/ray/tests/test_gcs_ha_e2e_2.py diff --git a/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml b/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml index 619fea139e68e..84b46c36f5f5f 100644 --- a/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml +++ b/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml @@ -148,9 +148,6 @@ spec: # This is used in the ray start command so that Ray can spawn the # correct number of processes. Omitting this may lead to degraded # performance. - - name: RAY_external_storage_namespace - value: # . RAY_external_storage_namespace is used to isolate - # the data stored in Redis. - name: MY_CPU_REQUEST valueFrom: resourceFieldRef: diff --git a/doc/source/cluster/kubernetes/user-guides/static-ray-cluster-without-kuberay.md b/doc/source/cluster/kubernetes/user-guides/static-ray-cluster-without-kuberay.md index db5eedb253b71..d5530ecb6a190 100644 --- a/doc/source/cluster/kubernetes/user-guides/static-ray-cluster-without-kuberay.md +++ b/doc/source/cluster/kubernetes/user-guides/static-ray-cluster-without-kuberay.md @@ -120,12 +120,7 @@ metadata. One drawback of this approach is that the head node loses the metadata Ray can also write this metadata to an external Redis for reliability and high availability. With this setup, the static Ray cluster can recover from head node crashes and tolerate GCS failures without losing connections to worker nodes. -To use this feature, we need to pass in the `RAY_REDIS_ADDRESS` env var, `RAY_external_storage_namespace`, and `--redis-password` in the Ray head node section of [the Kubernetes deployment config file](https://raw.githubusercontent.com/ray-project/ray/master/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml). - -`RAY_external_storage_namespace` is used to isolate the data stored in Redis. -This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance. -`RAY_external_storage_namespace` must be an unique ID, and whenever you restart a head node, -it should be the same. +To use this feature, we need to pass in the `RAY_REDIS_ADDRESS` env var and `--redis-password` in the Ray head node section of [the Kubernetes deployment config file](https://raw.githubusercontent.com/ray-project/ray/master/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml). ## Running Applications on the static Ray Cluster diff --git a/doc/source/ray-core/fault_tolerance/gcs.rst b/doc/source/ray-core/fault_tolerance/gcs.rst index 6b6c1026cee13..9f995e518a803 100644 --- a/doc/source/ray-core/fault_tolerance/gcs.rst +++ b/doc/source/ray-core/fault_tolerance/gcs.rst @@ -34,29 +34,19 @@ Setting up Redis set the OS environment ``RAY_REDIS_ADDRESS`` to the Redis address, and supply the ``--redis-password`` flag with the password when calling ``ray start``: - You should also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis. - This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance. - ``RAY_external_storage_namespace`` must be an unique ID, and whenever you restart a head node, - it should be the same. - .. code-block:: shell - RAY_external_storage_namespace= RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD + RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD .. tab-item:: ray up If you are using :ref:`ray up ` to start the Ray cluster, change :ref:`head_start_ray_commands ` field to add ``RAY_REDIS_ADDRESS`` and ``--redis-password`` to the ``ray start`` command: - You should also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis. - This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance. - ``RAY_external_storage_namespace`` must be an unique ID, and whenever you restart a head node, - it should be the same. - .. code-block:: yaml head_start_ray_commands: - ray stop - - ulimit -n 65536; RAY_external_storage_namespace= RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --dashboard-host=0.0.0.0 + - ulimit -n 65536; RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --dashboard-host=0.0.0.0 .. tab-item:: Kubernetes @@ -70,6 +60,9 @@ If the raylet fails to reconnect to the GCS for more than 60 seconds, the raylet will exit and the corresponding node fails. This timeout threshold can be tuned by the OS environment variable ``RAY_gcs_rpc_server_reconnect_timeout_s``. +You can also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis. +This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance. + If the IP address of GCS will change after restarts, it's better to use a qualified domain name and pass it to all raylets at start time. Raylet will resolve the domain name and connect to the correct GCS. You need to ensure that at any time, only one GCS is alive. diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 1974a8665826d..6a1d5492d4d4b 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -97,16 +97,6 @@ def __init__( [primary_redis_ip, port] = external_redis[0].rsplit(":", 1) ray_params.external_addresses = external_redis ray_params.num_redis_shards = len(external_redis) - 1 - storage_namespace = os.environ.get("RAY_external_storage_namespace") - if storage_namespace is None: - raise ValueError( - "RAY_external_storage_namespace must be provided " - "when using Ray with external Redis for the fault tolerance. " - "RAY_external_storage_namespace must be an unique ID " - "and has to be the same across the same head node." - ) - if ray_params.session_name is None: - ray_params.update_if_absent(session_name=f"session_{storage_namespace}") if ( ray_params._system_config @@ -1314,10 +1304,9 @@ def _write_cluster_info_to_kv(self): self.get_gcs_client().internal_kv_put( b"session_name", self._session_name.encode(), - False, + True, ray_constants.KV_NAMESPACE_SESSION, ) - self.get_gcs_client().internal_kv_put( b"session_dir", self._session_dir.encode(), diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index e25ddee607c52..3f589089f83eb 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -224,7 +224,6 @@ py_test_module_list( py_test_module_list( files = [ "test_gcs_ha_e2e.py", - "test_gcs_ha_e2e_2.py", "test_memory_pressure.py", "test_node_labels.py", ], diff --git a/python/ray/tests/conftest_docker.py b/python/ray/tests/conftest_docker.py index b1ef48111bd7c..293f5a711cdcf 100644 --- a/python/ray/tests/conftest_docker.py +++ b/python/ray/tests/conftest_docker.py @@ -1,5 +1,6 @@ import time -import uuid +import os +import datetime import pytest from pytest_docker_tools import container, fetch, network, volume from pytest_docker_tools import wrappers @@ -77,7 +78,8 @@ def print_logs(self): head_node_vol = volume() worker_node_vol = volume() head_node_container_name = "gcs" + str(int(time.time())) -external_storage_namespace = str(uuid.uuid4()) +date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f") +session_name = f"session_{date_str}_{os.getpid()}" head_node = container( image="ray_ci:v1", @@ -96,10 +98,7 @@ def print_logs(self): "9379", ], volumes={"{head_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, - environment={ - "RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379", - "RAY_external_storage_namespace": external_storage_namespace, - }, + environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"}, wrapper_class=Container, ports={ "8000/tcp": None, diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index 99abe129cf35b..dea003d5631ef 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -917,32 +917,6 @@ def check_raylet_healthy(): wait_for_condition(lambda: not check_raylet_healthy()) -@pytest.mark.parametrize( - "ray_start_regular_with_external_redis", - [ - generate_system_config_map( - gcs_failover_worker_reconnect_timeout=20, - gcs_rpc_server_reconnect_timeout_s=60, - gcs_server_request_timeout_seconds=10, - raylet_liveness_self_check_interval_ms=3000, - ) - ], - indirect=True, -) -def test_session_dir_preserved(ray_start_regular_with_external_redis): - storage_namespace = os.getenv("RAY_external_storage_namespace") - session_name = ray._private.worker._global_node.session_name - assert session_name == f"session_{storage_namespace}" - ray._private.worker._global_node.kill_gcs_server() - # Start GCS - ray._private.worker._global_node.start_gcs_server() - ray.shutdown() - ray.init() - storage_namespace = os.getenv("RAY_external_storage_namespace") - session_name = ray._private.worker._global_node.session_name - assert session_name == f"session_{storage_namespace}" - - if __name__ == "__main__": import pytest diff --git a/python/ray/tests/test_gcs_ha_e2e.py b/python/ray/tests/test_gcs_ha_e2e.py index 0f665d9165e58..96e24eebbd043 100644 --- a/python/ray/tests/test_gcs_ha_e2e.py +++ b/python/ray/tests/test_gcs_ha_e2e.py @@ -5,7 +5,7 @@ from ray.tests.conftest_docker import * # noqa -@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") +# @pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") def test_ray_nodes_liveness(docker_cluster): get_nodes_script = """ import ray @@ -18,6 +18,8 @@ def check_alive(n): output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'") text = output.output.decode().strip().split("\n")[-1] print("Output: ", output.output.decode().strip().split("\n")) + # print(worker.logs()) + # print(head.logs()) assert output.exit_code == 0 return n == int(text) diff --git a/python/ray/tests/test_gcs_ha_e2e_2.py b/python/ray/tests/test_gcs_ha_e2e_2.py deleted file mode 100644 index 380f527be1719..0000000000000 --- a/python/ray/tests/test_gcs_ha_e2e_2.py +++ /dev/null @@ -1,55 +0,0 @@ -import pytest -import sys -from time import sleep -from ray._private.test_utils import wait_for_condition -from ray.tests.conftest_docker import * # noqa - - -@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") -def test_ray_session_name_preserved(docker_cluster): - get_nodes_script = """ -import ray -ray.init("auto") -print(ray._private.worker._global_node.session_name) -""" - head, worker = docker_cluster - - def get_session_name(to_head=True): - if to_head: - output = head.exec_run(cmd=f"python -c '{get_nodes_script}'") - else: - output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'") - session_name = output.output.decode().strip().split("\n")[-1] - print("Output: ", output.output.decode().strip().split("\n")) - assert output.exit_code == 0 - return session_name - - # Make sure two nodes are alive - wait_for_condition(get_session_name, to_head=True) - session_name_head = get_session_name(to_head=True) - wait_for_condition(get_session_name, to_head=False) - session_name_worker = get_session_name(to_head=False) - assert session_name_head == session_name_worker - print("head killed") - head.kill() - - sleep(2) - - head.restart() - - wait_for_condition(get_session_name, to_head=True) - session_name_head_after_restart = get_session_name(to_head=True) - wait_for_condition(get_session_name, to_head=False) - session_name_worker_after_restart = get_session_name(to_head=False) - assert session_name_worker_after_restart == session_name_head_after_restart - assert session_name_head == session_name_head_after_restart - assert session_name_worker_after_restart == session_name_worker - - -if __name__ == "__main__": - import os - - if os.environ.get("PARALLEL_CI"): - sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) - else: - sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/test_ray_init.py b/python/ray/tests/test_ray_init.py index be73a6e76f4f4..38e351a8a025b 100644 --- a/python/ray/tests/test_ray_init.py +++ b/python/ray/tests/test_ray_init.py @@ -234,28 +234,6 @@ def test_ray_init_using_hostname(ray_start_cluster): assert node_table[0].get("NodeManagerHostname", "") == hostname -def test_new_ray_instance_new_session_dir(shutdown_only): - ray.init() - session_dir = ray._private.worker._global_node.get_session_dir_path() - ray.shutdown() - ray.init() - assert ray._private.worker._global_node.get_session_dir_path() != session_dir - - -def test_new_cluster_new_session_dir(ray_start_cluster): - cluster = ray_start_cluster - cluster.add_node() - ray.init(address=cluster.address) - session_dir = ray._private.worker._global_node.get_session_dir_path() - ray.shutdown() - cluster.shutdown() - cluster.add_node() - ray.init(address=cluster.address) - assert ray._private.worker._global_node.get_session_dir_path() != session_dir - ray.shutdown() - cluster.shutdown() - - if __name__ == "__main__": import sys From 5df758fb50c609fe3c430f8be9e6ace89d27d05e Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Sat, 2 Sep 2023 01:19:55 +0900 Subject: [PATCH 36/41] clean up --- python/ray/_private/node.py | 5 ----- python/ray/tests/conftest_docker.py | 6 ------ python/ray/tests/test_gcs_ha_e2e.py | 4 +--- 3 files changed, 1 insertion(+), 14 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 6a1d5492d4d4b..f3aace1da587a 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -953,11 +953,6 @@ def _wait_and_get_for_node_address(self, timeout_s: int = 60) -> str: The node_ip_address of the current session if it finds it within timeout_s. """ - # logger.error(f"Read file from {self.get_session_dir_path()}") - path = Path(self.get_session_dir_path()) - file_names = [f.name for f in path.iterdir() if f.is_file()] - # logger.error(file_names) - for i in range(timeout_s): node_ip_address = self._get_cached_node_ip_address() diff --git a/python/ray/tests/conftest_docker.py b/python/ray/tests/conftest_docker.py index 293f5a711cdcf..abdef87cbc2ab 100644 --- a/python/ray/tests/conftest_docker.py +++ b/python/ray/tests/conftest_docker.py @@ -1,6 +1,4 @@ import time -import os -import datetime import pytest from pytest_docker_tools import container, fetch, network, volume from pytest_docker_tools import wrappers @@ -78,8 +76,6 @@ def print_logs(self): head_node_vol = volume() worker_node_vol = volume() head_node_container_name = "gcs" + str(int(time.time())) -date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f") -session_name = f"session_{date_str}_{os.getpid()}" head_node = container( image="ray_ci:v1", @@ -103,7 +99,6 @@ def print_logs(self): ports={ "8000/tcp": None, }, - timeout=120, # volumes={ # "/tmp/ray/": {"bind": "/tmp/ray/", "mode": "rw"} # }, @@ -129,7 +124,6 @@ def print_logs(self): ports={ "8000/tcp": None, }, - timeout=120, # volumes={ # "/tmp/ray/": {"bind": "/tmp/ray/", "mode": "rw"} # }, diff --git a/python/ray/tests/test_gcs_ha_e2e.py b/python/ray/tests/test_gcs_ha_e2e.py index 96e24eebbd043..0f665d9165e58 100644 --- a/python/ray/tests/test_gcs_ha_e2e.py +++ b/python/ray/tests/test_gcs_ha_e2e.py @@ -5,7 +5,7 @@ from ray.tests.conftest_docker import * # noqa -# @pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") +@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") def test_ray_nodes_liveness(docker_cluster): get_nodes_script = """ import ray @@ -18,8 +18,6 @@ def check_alive(n): output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'") text = output.output.decode().strip().split("\n")[-1] print("Output: ", output.output.decode().strip().split("\n")) - # print(worker.logs()) - # print(head.logs()) assert output.exit_code == 0 return n == int(text) From 951e54c87894006f19204444a04c4e2533a25911 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Sat, 2 Sep 2023 01:26:04 +0900 Subject: [PATCH 37/41] Revert "Revert "Wokrs not"" This reverts commit 5a4123777d803bf9787ec526dbae7610ef8e8fe0. --- ...atic-ray-cluster.with-fault-tolerance.yaml | 3 + .../static-ray-cluster-without-kuberay.md | 7 ++- doc/source/ray-core/fault_tolerance/gcs.rst | 17 ++++-- python/ray/_private/node.py | 13 ++++- python/ray/tests/BUILD | 1 + python/ray/tests/conftest_docker.py | 7 ++- python/ray/tests/test_gcs_fault_tolerance.py | 26 +++++++++ python/ray/tests/test_gcs_ha_e2e_2.py | 55 +++++++++++++++++++ python/ray/tests/test_ray_init.py | 22 ++++++++ 9 files changed, 143 insertions(+), 8 deletions(-) create mode 100644 python/ray/tests/test_gcs_ha_e2e_2.py diff --git a/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml b/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml index 84b46c36f5f5f..619fea139e68e 100644 --- a/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml +++ b/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml @@ -148,6 +148,9 @@ spec: # This is used in the ray start command so that Ray can spawn the # correct number of processes. Omitting this may lead to degraded # performance. + - name: RAY_external_storage_namespace + value: # . RAY_external_storage_namespace is used to isolate + # the data stored in Redis. - name: MY_CPU_REQUEST valueFrom: resourceFieldRef: diff --git a/doc/source/cluster/kubernetes/user-guides/static-ray-cluster-without-kuberay.md b/doc/source/cluster/kubernetes/user-guides/static-ray-cluster-without-kuberay.md index d5530ecb6a190..db5eedb253b71 100644 --- a/doc/source/cluster/kubernetes/user-guides/static-ray-cluster-without-kuberay.md +++ b/doc/source/cluster/kubernetes/user-guides/static-ray-cluster-without-kuberay.md @@ -120,7 +120,12 @@ metadata. One drawback of this approach is that the head node loses the metadata Ray can also write this metadata to an external Redis for reliability and high availability. With this setup, the static Ray cluster can recover from head node crashes and tolerate GCS failures without losing connections to worker nodes. -To use this feature, we need to pass in the `RAY_REDIS_ADDRESS` env var and `--redis-password` in the Ray head node section of [the Kubernetes deployment config file](https://raw.githubusercontent.com/ray-project/ray/master/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml). +To use this feature, we need to pass in the `RAY_REDIS_ADDRESS` env var, `RAY_external_storage_namespace`, and `--redis-password` in the Ray head node section of [the Kubernetes deployment config file](https://raw.githubusercontent.com/ray-project/ray/master/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml). + +`RAY_external_storage_namespace` is used to isolate the data stored in Redis. +This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance. +`RAY_external_storage_namespace` must be an unique ID, and whenever you restart a head node, +it should be the same. ## Running Applications on the static Ray Cluster diff --git a/doc/source/ray-core/fault_tolerance/gcs.rst b/doc/source/ray-core/fault_tolerance/gcs.rst index 9f995e518a803..6b6c1026cee13 100644 --- a/doc/source/ray-core/fault_tolerance/gcs.rst +++ b/doc/source/ray-core/fault_tolerance/gcs.rst @@ -34,19 +34,29 @@ Setting up Redis set the OS environment ``RAY_REDIS_ADDRESS`` to the Redis address, and supply the ``--redis-password`` flag with the password when calling ``ray start``: + You should also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis. + This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance. + ``RAY_external_storage_namespace`` must be an unique ID, and whenever you restart a head node, + it should be the same. + .. code-block:: shell - RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD + RAY_external_storage_namespace= RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD .. tab-item:: ray up If you are using :ref:`ray up ` to start the Ray cluster, change :ref:`head_start_ray_commands ` field to add ``RAY_REDIS_ADDRESS`` and ``--redis-password`` to the ``ray start`` command: + You should also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis. + This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance. + ``RAY_external_storage_namespace`` must be an unique ID, and whenever you restart a head node, + it should be the same. + .. code-block:: yaml head_start_ray_commands: - ray stop - - ulimit -n 65536; RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --dashboard-host=0.0.0.0 + - ulimit -n 65536; RAY_external_storage_namespace= RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --dashboard-host=0.0.0.0 .. tab-item:: Kubernetes @@ -60,9 +70,6 @@ If the raylet fails to reconnect to the GCS for more than 60 seconds, the raylet will exit and the corresponding node fails. This timeout threshold can be tuned by the OS environment variable ``RAY_gcs_rpc_server_reconnect_timeout_s``. -You can also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis. -This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance. - If the IP address of GCS will change after restarts, it's better to use a qualified domain name and pass it to all raylets at start time. Raylet will resolve the domain name and connect to the correct GCS. You need to ensure that at any time, only one GCS is alive. diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index f3aace1da587a..20c21d34aec50 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -97,6 +97,16 @@ def __init__( [primary_redis_ip, port] = external_redis[0].rsplit(":", 1) ray_params.external_addresses = external_redis ray_params.num_redis_shards = len(external_redis) - 1 + storage_namespace = os.environ.get("RAY_external_storage_namespace") + if storage_namespace is None: + raise ValueError( + "RAY_external_storage_namespace must be provided " + "when using Ray with external Redis for the fault tolerance. " + "RAY_external_storage_namespace must be an unique ID " + "and has to be the same across the same head node." + ) + if ray_params.session_name is None: + ray_params.update_if_absent(session_name=f"session_{storage_namespace}") if ( ray_params._system_config @@ -1299,9 +1309,10 @@ def _write_cluster_info_to_kv(self): self.get_gcs_client().internal_kv_put( b"session_name", self._session_name.encode(), - True, + False, ray_constants.KV_NAMESPACE_SESSION, ) + self.get_gcs_client().internal_kv_put( b"session_dir", self._session_dir.encode(), diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 3f589089f83eb..e25ddee607c52 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -224,6 +224,7 @@ py_test_module_list( py_test_module_list( files = [ "test_gcs_ha_e2e.py", + "test_gcs_ha_e2e_2.py", "test_memory_pressure.py", "test_node_labels.py", ], diff --git a/python/ray/tests/conftest_docker.py b/python/ray/tests/conftest_docker.py index abdef87cbc2ab..d9baa91d8f9b9 100644 --- a/python/ray/tests/conftest_docker.py +++ b/python/ray/tests/conftest_docker.py @@ -1,4 +1,5 @@ import time +import uuid import pytest from pytest_docker_tools import container, fetch, network, volume from pytest_docker_tools import wrappers @@ -76,6 +77,7 @@ def print_logs(self): head_node_vol = volume() worker_node_vol = volume() head_node_container_name = "gcs" + str(int(time.time())) +external_storage_namespace = str(uuid.uuid4()) head_node = container( image="ray_ci:v1", @@ -94,7 +96,10 @@ def print_logs(self): "9379", ], volumes={"{head_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, - environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"}, + environment={ + "RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379", + "RAY_external_storage_namespace": external_storage_namespace, + }, wrapper_class=Container, ports={ "8000/tcp": None, diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index dea003d5631ef..99abe129cf35b 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -917,6 +917,32 @@ def check_raylet_healthy(): wait_for_condition(lambda: not check_raylet_healthy()) +@pytest.mark.parametrize( + "ray_start_regular_with_external_redis", + [ + generate_system_config_map( + gcs_failover_worker_reconnect_timeout=20, + gcs_rpc_server_reconnect_timeout_s=60, + gcs_server_request_timeout_seconds=10, + raylet_liveness_self_check_interval_ms=3000, + ) + ], + indirect=True, +) +def test_session_dir_preserved(ray_start_regular_with_external_redis): + storage_namespace = os.getenv("RAY_external_storage_namespace") + session_name = ray._private.worker._global_node.session_name + assert session_name == f"session_{storage_namespace}" + ray._private.worker._global_node.kill_gcs_server() + # Start GCS + ray._private.worker._global_node.start_gcs_server() + ray.shutdown() + ray.init() + storage_namespace = os.getenv("RAY_external_storage_namespace") + session_name = ray._private.worker._global_node.session_name + assert session_name == f"session_{storage_namespace}" + + if __name__ == "__main__": import pytest diff --git a/python/ray/tests/test_gcs_ha_e2e_2.py b/python/ray/tests/test_gcs_ha_e2e_2.py new file mode 100644 index 0000000000000..380f527be1719 --- /dev/null +++ b/python/ray/tests/test_gcs_ha_e2e_2.py @@ -0,0 +1,55 @@ +import pytest +import sys +from time import sleep +from ray._private.test_utils import wait_for_condition +from ray.tests.conftest_docker import * # noqa + + +@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") +def test_ray_session_name_preserved(docker_cluster): + get_nodes_script = """ +import ray +ray.init("auto") +print(ray._private.worker._global_node.session_name) +""" + head, worker = docker_cluster + + def get_session_name(to_head=True): + if to_head: + output = head.exec_run(cmd=f"python -c '{get_nodes_script}'") + else: + output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'") + session_name = output.output.decode().strip().split("\n")[-1] + print("Output: ", output.output.decode().strip().split("\n")) + assert output.exit_code == 0 + return session_name + + # Make sure two nodes are alive + wait_for_condition(get_session_name, to_head=True) + session_name_head = get_session_name(to_head=True) + wait_for_condition(get_session_name, to_head=False) + session_name_worker = get_session_name(to_head=False) + assert session_name_head == session_name_worker + print("head killed") + head.kill() + + sleep(2) + + head.restart() + + wait_for_condition(get_session_name, to_head=True) + session_name_head_after_restart = get_session_name(to_head=True) + wait_for_condition(get_session_name, to_head=False) + session_name_worker_after_restart = get_session_name(to_head=False) + assert session_name_worker_after_restart == session_name_head_after_restart + assert session_name_head == session_name_head_after_restart + assert session_name_worker_after_restart == session_name_worker + + +if __name__ == "__main__": + import os + + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/test_ray_init.py b/python/ray/tests/test_ray_init.py index 38e351a8a025b..be73a6e76f4f4 100644 --- a/python/ray/tests/test_ray_init.py +++ b/python/ray/tests/test_ray_init.py @@ -234,6 +234,28 @@ def test_ray_init_using_hostname(ray_start_cluster): assert node_table[0].get("NodeManagerHostname", "") == hostname +def test_new_ray_instance_new_session_dir(shutdown_only): + ray.init() + session_dir = ray._private.worker._global_node.get_session_dir_path() + ray.shutdown() + ray.init() + assert ray._private.worker._global_node.get_session_dir_path() != session_dir + + +def test_new_cluster_new_session_dir(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node() + ray.init(address=cluster.address) + session_dir = ray._private.worker._global_node.get_session_dir_path() + ray.shutdown() + cluster.shutdown() + cluster.add_node() + ray.init(address=cluster.address) + assert ray._private.worker._global_node.get_session_dir_path() != session_dir + ray.shutdown() + cluster.shutdown() + + if __name__ == "__main__": import sys From 32e65fcaf7a79fab38c484e17b04bda54a1aae7f Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Sat, 2 Sep 2023 01:26:08 +0900 Subject: [PATCH 38/41] Revert "Revert "Fix failed ha tests."" This reverts commit b3d590ab60e4487cfda87ce5a1295c8122e6663b. --- python/ray/_private/node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 20c21d34aec50..560faf960e309 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -98,7 +98,7 @@ def __init__( ray_params.external_addresses = external_redis ray_params.num_redis_shards = len(external_redis) - 1 storage_namespace = os.environ.get("RAY_external_storage_namespace") - if storage_namespace is None: + if head and storage_namespace is None: raise ValueError( "RAY_external_storage_namespace must be provided " "when using Ray with external Redis for the fault tolerance. " From ec0591f028eaf74a3d24a4bfa37f60df1a47a125 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Sat, 2 Sep 2023 01:26:09 +0900 Subject: [PATCH 39/41] Revert "Revert "fix some tests."" This reverts commit 0e73a7e6b8e8ee1b325677f84f4bc99f3fb0b42e. --- python/ray/_private/node.py | 3 ++- python/ray/tests/test_advanced_9.py | 2 ++ python/ray/tests/test_multi_node_3.py | 3 +++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 560faf960e309..c54f9523c9730 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -101,7 +101,8 @@ def __init__( if head and storage_namespace is None: raise ValueError( "RAY_external_storage_namespace must be provided " - "when using Ray with external Redis for the fault tolerance. " + "when using Ray with external Redis via `RAY_REDIS_ADDRESS` " + " for the fault tolerance. " "RAY_external_storage_namespace must be an unique ID " "and has to be the same across the same head node." ) diff --git a/python/ray/tests/test_advanced_9.py b/python/ray/tests/test_advanced_9.py index a4ba35d1756d9..2b06d039fe6b7 100644 --- a/python/ray/tests/test_advanced_9.py +++ b/python/ray/tests/test_advanced_9.py @@ -1,5 +1,6 @@ import sys import time +import uuid import pytest @@ -357,6 +358,7 @@ def check_demands(n): def test_redis_not_available(monkeypatch, call_ray_stop_only): monkeypatch.setenv("RAY_NUM_REDIS_GET_RETRIES", "2") monkeypatch.setenv("RAY_REDIS_ADDRESS", "localhost:12345") + monkeypatch.setenv("RAY_external_storage_namespace", str(uuid.uuid4())) p = subprocess.run( "ray start --head", shell=True, diff --git a/python/ray/tests/test_multi_node_3.py b/python/ray/tests/test_multi_node_3.py index 7f602ed863dc4..7dec623e0373e 100644 --- a/python/ray/tests/test_multi_node_3.py +++ b/python/ray/tests/test_multi_node_3.py @@ -2,6 +2,7 @@ import os import subprocess import sys +import uuid from pathlib import Path import psutil @@ -135,10 +136,12 @@ def test_calling_start_ray_head(call_ray_stop_only): temp_dir, 8888, password=ray_constants.REDIS_DEFAULT_PASSWORD ) os.environ["RAY_REDIS_ADDRESS"] = "127.0.0.1:8888" + os.environ["RAY_external_storage_namespace"] = str(uuid.uuid4()) check_call_ray(["start", "--head"]) check_call_ray(["stop"]) proc.process.terminate() del os.environ["RAY_REDIS_ADDRESS"] + del os.environ["RAY_external_storage_namespace"] # Test --block. Killing a child process should cause the command to exit. blocked = subprocess.Popen(["ray", "start", "--head", "--block", "--port", "0"]) From 6734082899c6c115139d55282faaa88a076cec4b Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Sat, 2 Sep 2023 01:26:10 +0900 Subject: [PATCH 40/41] Revert "Revert "done"" This reverts commit e32d53820e6614018fc740ac6d0f865ceee37ad3. --- python/ray/_private/node.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index c54f9523c9730..eafa4e38975fc 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -98,16 +98,19 @@ def __init__( ray_params.external_addresses = external_redis ray_params.num_redis_shards = len(external_redis) - 1 storage_namespace = os.environ.get("RAY_external_storage_namespace") - if head and storage_namespace is None: - raise ValueError( - "RAY_external_storage_namespace must be provided " - "when using Ray with external Redis via `RAY_REDIS_ADDRESS` " - " for the fault tolerance. " - "RAY_external_storage_namespace must be an unique ID " - "and has to be the same across the same head node." - ) - if ray_params.session_name is None: - ray_params.update_if_absent(session_name=f"session_{storage_namespace}") + if head: + if storage_namespace is None: + raise ValueError( + "RAY_external_storage_namespace must be provided " + "when using Ray with external Redis via `RAY_REDIS_ADDRESS` " + " for the fault tolerance. " + "RAY_external_storage_namespace must be an unique ID " + "and has to be the same across the same head node." + ) + # Update the session name to be RAY_external_storage_namespace. + if ray_params.session_name is None: + ray_params.update_if_absent( + session_name=f"session_{storage_namespace}") if ( ray_params._system_config From 672d996c4fdca2aaf9532a91b3b3807a8fd780e5 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Sat, 2 Sep 2023 01:26:10 +0900 Subject: [PATCH 41/41] Revert "Revert "maybe working?"" This reverts commit 9330cc61b657065d1c5113643b0f682e95089ed7. --- python/ray/tests/conftest_docker.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/python/ray/tests/conftest_docker.py b/python/ray/tests/conftest_docker.py index d9baa91d8f9b9..aefb39c39fa7c 100644 --- a/python/ray/tests/conftest_docker.py +++ b/python/ray/tests/conftest_docker.py @@ -99,6 +99,8 @@ def print_logs(self): environment={ "RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379", "RAY_external_storage_namespace": external_storage_namespace, + "RAY_raylet_client_num_connect_attempts": "10", + "RAY_raylet_client_connect_timeout_milliseconds": "100", }, wrapper_class=Container, ports={ @@ -124,7 +126,11 @@ def print_logs(self): "9379", ], volumes={"{worker_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, - environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"}, + environment={ + "RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379", + "RAY_raylet_client_num_connect_attempts": "10", + "RAY_raylet_client_connect_timeout_milliseconds": "100", + }, wrapper_class=Container, ports={ "8000/tcp": None,