From 748ddf8791d9284209305e492716f5ea60fe0c46 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 1 Sep 2023 16:48:46 +0900 Subject: [PATCH] Wokrs not --- ...atic-ray-cluster.with-fault-tolerance.yaml | 3 + .../static-ray-cluster-without-kuberay.md | 7 ++- doc/source/ray-core/fault_tolerance/gcs.rst | 17 ++++-- python/ray/_private/node.py | 13 ++++- python/ray/tests/BUILD | 1 + python/ray/tests/conftest_docker.py | 11 ++-- python/ray/tests/test_gcs_fault_tolerance.py | 26 +++++++++ python/ray/tests/test_gcs_ha_e2e.py | 4 +- python/ray/tests/test_gcs_ha_e2e_2.py | 55 +++++++++++++++++++ python/ray/tests/test_ray_init.py | 22 ++++++++ 10 files changed, 144 insertions(+), 15 deletions(-) create mode 100644 python/ray/tests/test_gcs_ha_e2e_2.py diff --git a/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml b/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml index 84b46c36f5f5f..619fea139e68e 100644 --- a/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml +++ b/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml @@ -148,6 +148,9 @@ spec: # This is used in the ray start command so that Ray can spawn the # correct number of processes. Omitting this may lead to degraded # performance. + - name: RAY_external_storage_namespace + value: # . RAY_external_storage_namespace is used to isolate + # the data stored in Redis. - name: MY_CPU_REQUEST valueFrom: resourceFieldRef: diff --git a/doc/source/cluster/kubernetes/user-guides/static-ray-cluster-without-kuberay.md b/doc/source/cluster/kubernetes/user-guides/static-ray-cluster-without-kuberay.md index d5530ecb6a190..db5eedb253b71 100644 --- a/doc/source/cluster/kubernetes/user-guides/static-ray-cluster-without-kuberay.md +++ b/doc/source/cluster/kubernetes/user-guides/static-ray-cluster-without-kuberay.md @@ -120,7 +120,12 @@ metadata. One drawback of this approach is that the head node loses the metadata Ray can also write this metadata to an external Redis for reliability and high availability. With this setup, the static Ray cluster can recover from head node crashes and tolerate GCS failures without losing connections to worker nodes. -To use this feature, we need to pass in the `RAY_REDIS_ADDRESS` env var and `--redis-password` in the Ray head node section of [the Kubernetes deployment config file](https://raw.githubusercontent.com/ray-project/ray/master/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml). +To use this feature, we need to pass in the `RAY_REDIS_ADDRESS` env var, `RAY_external_storage_namespace`, and `--redis-password` in the Ray head node section of [the Kubernetes deployment config file](https://raw.githubusercontent.com/ray-project/ray/master/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml). + +`RAY_external_storage_namespace` is used to isolate the data stored in Redis. +This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance. +`RAY_external_storage_namespace` must be an unique ID, and whenever you restart a head node, +it should be the same. ## Running Applications on the static Ray Cluster diff --git a/doc/source/ray-core/fault_tolerance/gcs.rst b/doc/source/ray-core/fault_tolerance/gcs.rst index 9f995e518a803..6b6c1026cee13 100644 --- a/doc/source/ray-core/fault_tolerance/gcs.rst +++ b/doc/source/ray-core/fault_tolerance/gcs.rst @@ -34,19 +34,29 @@ Setting up Redis set the OS environment ``RAY_REDIS_ADDRESS`` to the Redis address, and supply the ``--redis-password`` flag with the password when calling ``ray start``: + You should also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis. + This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance. + ``RAY_external_storage_namespace`` must be an unique ID, and whenever you restart a head node, + it should be the same. + .. code-block:: shell - RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD + RAY_external_storage_namespace= RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD .. tab-item:: ray up If you are using :ref:`ray up ` to start the Ray cluster, change :ref:`head_start_ray_commands ` field to add ``RAY_REDIS_ADDRESS`` and ``--redis-password`` to the ``ray start`` command: + You should also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis. + This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance. + ``RAY_external_storage_namespace`` must be an unique ID, and whenever you restart a head node, + it should be the same. + .. code-block:: yaml head_start_ray_commands: - ray stop - - ulimit -n 65536; RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --dashboard-host=0.0.0.0 + - ulimit -n 65536; RAY_external_storage_namespace= RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --dashboard-host=0.0.0.0 .. tab-item:: Kubernetes @@ -60,9 +70,6 @@ If the raylet fails to reconnect to the GCS for more than 60 seconds, the raylet will exit and the corresponding node fails. This timeout threshold can be tuned by the OS environment variable ``RAY_gcs_rpc_server_reconnect_timeout_s``. -You can also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis. -This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance. - If the IP address of GCS will change after restarts, it's better to use a qualified domain name and pass it to all raylets at start time. Raylet will resolve the domain name and connect to the correct GCS. You need to ensure that at any time, only one GCS is alive. diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 6a1d5492d4d4b..1974a8665826d 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -97,6 +97,16 @@ def __init__( [primary_redis_ip, port] = external_redis[0].rsplit(":", 1) ray_params.external_addresses = external_redis ray_params.num_redis_shards = len(external_redis) - 1 + storage_namespace = os.environ.get("RAY_external_storage_namespace") + if storage_namespace is None: + raise ValueError( + "RAY_external_storage_namespace must be provided " + "when using Ray with external Redis for the fault tolerance. " + "RAY_external_storage_namespace must be an unique ID " + "and has to be the same across the same head node." + ) + if ray_params.session_name is None: + ray_params.update_if_absent(session_name=f"session_{storage_namespace}") if ( ray_params._system_config @@ -1304,9 +1314,10 @@ def _write_cluster_info_to_kv(self): self.get_gcs_client().internal_kv_put( b"session_name", self._session_name.encode(), - True, + False, ray_constants.KV_NAMESPACE_SESSION, ) + self.get_gcs_client().internal_kv_put( b"session_dir", self._session_dir.encode(), diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 3f589089f83eb..e25ddee607c52 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -224,6 +224,7 @@ py_test_module_list( py_test_module_list( files = [ "test_gcs_ha_e2e.py", + "test_gcs_ha_e2e_2.py", "test_memory_pressure.py", "test_node_labels.py", ], diff --git a/python/ray/tests/conftest_docker.py b/python/ray/tests/conftest_docker.py index 293f5a711cdcf..b1ef48111bd7c 100644 --- a/python/ray/tests/conftest_docker.py +++ b/python/ray/tests/conftest_docker.py @@ -1,6 +1,5 @@ import time -import os -import datetime +import uuid import pytest from pytest_docker_tools import container, fetch, network, volume from pytest_docker_tools import wrappers @@ -78,8 +77,7 @@ def print_logs(self): head_node_vol = volume() worker_node_vol = volume() head_node_container_name = "gcs" + str(int(time.time())) -date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f") -session_name = f"session_{date_str}_{os.getpid()}" +external_storage_namespace = str(uuid.uuid4()) head_node = container( image="ray_ci:v1", @@ -98,7 +96,10 @@ def print_logs(self): "9379", ], volumes={"{head_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, - environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"}, + environment={ + "RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379", + "RAY_external_storage_namespace": external_storage_namespace, + }, wrapper_class=Container, ports={ "8000/tcp": None, diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index dea003d5631ef..99abe129cf35b 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -917,6 +917,32 @@ def check_raylet_healthy(): wait_for_condition(lambda: not check_raylet_healthy()) +@pytest.mark.parametrize( + "ray_start_regular_with_external_redis", + [ + generate_system_config_map( + gcs_failover_worker_reconnect_timeout=20, + gcs_rpc_server_reconnect_timeout_s=60, + gcs_server_request_timeout_seconds=10, + raylet_liveness_self_check_interval_ms=3000, + ) + ], + indirect=True, +) +def test_session_dir_preserved(ray_start_regular_with_external_redis): + storage_namespace = os.getenv("RAY_external_storage_namespace") + session_name = ray._private.worker._global_node.session_name + assert session_name == f"session_{storage_namespace}" + ray._private.worker._global_node.kill_gcs_server() + # Start GCS + ray._private.worker._global_node.start_gcs_server() + ray.shutdown() + ray.init() + storage_namespace = os.getenv("RAY_external_storage_namespace") + session_name = ray._private.worker._global_node.session_name + assert session_name == f"session_{storage_namespace}" + + if __name__ == "__main__": import pytest diff --git a/python/ray/tests/test_gcs_ha_e2e.py b/python/ray/tests/test_gcs_ha_e2e.py index 96e24eebbd043..0f665d9165e58 100644 --- a/python/ray/tests/test_gcs_ha_e2e.py +++ b/python/ray/tests/test_gcs_ha_e2e.py @@ -5,7 +5,7 @@ from ray.tests.conftest_docker import * # noqa -# @pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") +@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") def test_ray_nodes_liveness(docker_cluster): get_nodes_script = """ import ray @@ -18,8 +18,6 @@ def check_alive(n): output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'") text = output.output.decode().strip().split("\n")[-1] print("Output: ", output.output.decode().strip().split("\n")) - # print(worker.logs()) - # print(head.logs()) assert output.exit_code == 0 return n == int(text) diff --git a/python/ray/tests/test_gcs_ha_e2e_2.py b/python/ray/tests/test_gcs_ha_e2e_2.py new file mode 100644 index 0000000000000..380f527be1719 --- /dev/null +++ b/python/ray/tests/test_gcs_ha_e2e_2.py @@ -0,0 +1,55 @@ +import pytest +import sys +from time import sleep +from ray._private.test_utils import wait_for_condition +from ray.tests.conftest_docker import * # noqa + + +@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") +def test_ray_session_name_preserved(docker_cluster): + get_nodes_script = """ +import ray +ray.init("auto") +print(ray._private.worker._global_node.session_name) +""" + head, worker = docker_cluster + + def get_session_name(to_head=True): + if to_head: + output = head.exec_run(cmd=f"python -c '{get_nodes_script}'") + else: + output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'") + session_name = output.output.decode().strip().split("\n")[-1] + print("Output: ", output.output.decode().strip().split("\n")) + assert output.exit_code == 0 + return session_name + + # Make sure two nodes are alive + wait_for_condition(get_session_name, to_head=True) + session_name_head = get_session_name(to_head=True) + wait_for_condition(get_session_name, to_head=False) + session_name_worker = get_session_name(to_head=False) + assert session_name_head == session_name_worker + print("head killed") + head.kill() + + sleep(2) + + head.restart() + + wait_for_condition(get_session_name, to_head=True) + session_name_head_after_restart = get_session_name(to_head=True) + wait_for_condition(get_session_name, to_head=False) + session_name_worker_after_restart = get_session_name(to_head=False) + assert session_name_worker_after_restart == session_name_head_after_restart + assert session_name_head == session_name_head_after_restart + assert session_name_worker_after_restart == session_name_worker + + +if __name__ == "__main__": + import os + + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/test_ray_init.py b/python/ray/tests/test_ray_init.py index 38e351a8a025b..be73a6e76f4f4 100644 --- a/python/ray/tests/test_ray_init.py +++ b/python/ray/tests/test_ray_init.py @@ -234,6 +234,28 @@ def test_ray_init_using_hostname(ray_start_cluster): assert node_table[0].get("NodeManagerHostname", "") == hostname +def test_new_ray_instance_new_session_dir(shutdown_only): + ray.init() + session_dir = ray._private.worker._global_node.get_session_dir_path() + ray.shutdown() + ray.init() + assert ray._private.worker._global_node.get_session_dir_path() != session_dir + + +def test_new_cluster_new_session_dir(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node() + ray.init(address=cluster.address) + session_dir = ray._private.worker._global_node.get_session_dir_path() + ray.shutdown() + cluster.shutdown() + cluster.add_node() + ray.init(address=cluster.address) + assert ray._private.worker._global_node.get_session_dir_path() != session_dir + ray.shutdown() + cluster.shutdown() + + if __name__ == "__main__": import sys