Skip to content

Commit

Permalink
Wokrs not
Browse files Browse the repository at this point in the history
  • Loading branch information
rkooo567 committed Sep 1, 2023
1 parent 484c68a commit 748ddf8
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ spec:
# This is used in the ray start command so that Ray can spawn the
# correct number of processes. Omitting this may lead to degraded
# performance.
- name: RAY_external_storage_namespace
value: # <Unique ID>. RAY_external_storage_namespace is used to isolate
# the data stored in Redis.
- name: MY_CPU_REQUEST
valueFrom:
resourceFieldRef:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ metadata. One drawback of this approach is that the head node loses the metadata
Ray can also write this metadata to an external Redis for reliability and high availability.
With this setup, the static Ray cluster can recover from head node crashes and tolerate GCS failures without losing connections to worker nodes.

To use this feature, we need to pass in the `RAY_REDIS_ADDRESS` env var and `--redis-password` in the Ray head node section of [the Kubernetes deployment config file](https://raw.githubusercontent.com/ray-project/ray/master/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml).
To use this feature, we need to pass in the `RAY_REDIS_ADDRESS` env var, `RAY_external_storage_namespace`, and `--redis-password` in the Ray head node section of [the Kubernetes deployment config file](https://raw.githubusercontent.com/ray-project/ray/master/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml).

`RAY_external_storage_namespace` is used to isolate the data stored in Redis.
This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance.
`RAY_external_storage_namespace` must be an unique ID, and whenever you restart a head node,
it should be the same.

## Running Applications on the static Ray Cluster

Expand Down
17 changes: 12 additions & 5 deletions doc/source/ray-core/fault_tolerance/gcs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,29 @@ Setting up Redis
set the OS environment ``RAY_REDIS_ADDRESS`` to
the Redis address, and supply the ``--redis-password`` flag with the password when calling ``ray start``:

You should also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis.
This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance.
``RAY_external_storage_namespace`` must be an unique ID, and whenever you restart a head node,
it should be the same.

.. code-block:: shell
RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD
RAY_external_storage_namespace=<unique_id> RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD
.. tab-item:: ray up

If you are using :ref:`ray up <ray-up-doc>` to start the Ray cluster, change :ref:`head_start_ray_commands <cluster-configuration-head-start-ray-commands>` field to add ``RAY_REDIS_ADDRESS`` and ``--redis-password`` to the ``ray start`` command:

You should also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis.
This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance.
``RAY_external_storage_namespace`` must be an unique ID, and whenever you restart a head node,
it should be the same.

.. code-block:: yaml
head_start_ray_commands:
- ray stop
- ulimit -n 65536; RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --dashboard-host=0.0.0.0
- ulimit -n 65536; RAY_external_storage_namespace=<unique_id> RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --dashboard-host=0.0.0.0
.. tab-item:: Kubernetes

Expand All @@ -60,9 +70,6 @@ If the raylet fails to reconnect to the GCS for more than 60 seconds,
the raylet will exit and the corresponding node fails.
This timeout threshold can be tuned by the OS environment variable ``RAY_gcs_rpc_server_reconnect_timeout_s``.

You can also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis.
This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance.

If the IP address of GCS will change after restarts, it's better to use a qualified domain name
and pass it to all raylets at start time. Raylet will resolve the domain name and connect to
the correct GCS. You need to ensure that at any time, only one GCS is alive.
Expand Down
13 changes: 12 additions & 1 deletion python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ def __init__(
[primary_redis_ip, port] = external_redis[0].rsplit(":", 1)
ray_params.external_addresses = external_redis
ray_params.num_redis_shards = len(external_redis) - 1
storage_namespace = os.environ.get("RAY_external_storage_namespace")
if storage_namespace is None:
raise ValueError(
"RAY_external_storage_namespace must be provided "
"when using Ray with external Redis for the fault tolerance. "
"RAY_external_storage_namespace must be an unique ID "
"and has to be the same across the same head node."
)
if ray_params.session_name is None:
ray_params.update_if_absent(session_name=f"session_{storage_namespace}")

if (
ray_params._system_config
Expand Down Expand Up @@ -1304,9 +1314,10 @@ def _write_cluster_info_to_kv(self):
self.get_gcs_client().internal_kv_put(
b"session_name",
self._session_name.encode(),
True,
False,
ray_constants.KV_NAMESPACE_SESSION,
)

self.get_gcs_client().internal_kv_put(
b"session_dir",
self._session_dir.encode(),
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ py_test_module_list(
py_test_module_list(
files = [
"test_gcs_ha_e2e.py",
"test_gcs_ha_e2e_2.py",
"test_memory_pressure.py",
"test_node_labels.py",
],
Expand Down
11 changes: 6 additions & 5 deletions python/ray/tests/conftest_docker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import time
import os
import datetime
import uuid
import pytest
from pytest_docker_tools import container, fetch, network, volume
from pytest_docker_tools import wrappers
Expand Down Expand Up @@ -78,8 +77,7 @@ def print_logs(self):
head_node_vol = volume()
worker_node_vol = volume()
head_node_container_name = "gcs" + str(int(time.time()))
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f")
session_name = f"session_{date_str}_{os.getpid()}"
external_storage_namespace = str(uuid.uuid4())

head_node = container(
image="ray_ci:v1",
Expand All @@ -98,7 +96,10 @@ def print_logs(self):
"9379",
],
volumes={"{head_node_vol.name}": {"bind": "/tmp", "mode": "rw"}},
environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"},
environment={
"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379",
"RAY_external_storage_namespace": external_storage_namespace,
},
wrapper_class=Container,
ports={
"8000/tcp": None,
Expand Down
26 changes: 26 additions & 0 deletions python/ray/tests/test_gcs_fault_tolerance.py
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,32 @@ def check_raylet_healthy():
wait_for_condition(lambda: not check_raylet_healthy())


@pytest.mark.parametrize(
"ray_start_regular_with_external_redis",
[
generate_system_config_map(
gcs_failover_worker_reconnect_timeout=20,
gcs_rpc_server_reconnect_timeout_s=60,
gcs_server_request_timeout_seconds=10,
raylet_liveness_self_check_interval_ms=3000,
)
],
indirect=True,
)
def test_session_dir_preserved(ray_start_regular_with_external_redis):
storage_namespace = os.getenv("RAY_external_storage_namespace")
session_name = ray._private.worker._global_node.session_name
assert session_name == f"session_{storage_namespace}"
ray._private.worker._global_node.kill_gcs_server()
# Start GCS
ray._private.worker._global_node.start_gcs_server()
ray.shutdown()
ray.init()
storage_namespace = os.getenv("RAY_external_storage_namespace")
session_name = ray._private.worker._global_node.session_name
assert session_name == f"session_{storage_namespace}"


if __name__ == "__main__":

import pytest
Expand Down
4 changes: 1 addition & 3 deletions python/ray/tests/test_gcs_ha_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from ray.tests.conftest_docker import * # noqa


# @pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.")
@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.")
def test_ray_nodes_liveness(docker_cluster):
get_nodes_script = """
import ray
Expand All @@ -18,8 +18,6 @@ def check_alive(n):
output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'")
text = output.output.decode().strip().split("\n")[-1]
print("Output: ", output.output.decode().strip().split("\n"))
# print(worker.logs())
# print(head.logs())
assert output.exit_code == 0
return n == int(text)

Expand Down
55 changes: 55 additions & 0 deletions python/ray/tests/test_gcs_ha_e2e_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import pytest
import sys
from time import sleep
from ray._private.test_utils import wait_for_condition
from ray.tests.conftest_docker import * # noqa


@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.")
def test_ray_session_name_preserved(docker_cluster):
get_nodes_script = """
import ray
ray.init("auto")
print(ray._private.worker._global_node.session_name)
"""
head, worker = docker_cluster

def get_session_name(to_head=True):
if to_head:
output = head.exec_run(cmd=f"python -c '{get_nodes_script}'")
else:
output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'")
session_name = output.output.decode().strip().split("\n")[-1]
print("Output: ", output.output.decode().strip().split("\n"))
assert output.exit_code == 0
return session_name

# Make sure two nodes are alive
wait_for_condition(get_session_name, to_head=True)
session_name_head = get_session_name(to_head=True)
wait_for_condition(get_session_name, to_head=False)
session_name_worker = get_session_name(to_head=False)
assert session_name_head == session_name_worker
print("head killed")
head.kill()

sleep(2)

head.restart()

wait_for_condition(get_session_name, to_head=True)
session_name_head_after_restart = get_session_name(to_head=True)
wait_for_condition(get_session_name, to_head=False)
session_name_worker_after_restart = get_session_name(to_head=False)
assert session_name_worker_after_restart == session_name_head_after_restart
assert session_name_head == session_name_head_after_restart
assert session_name_worker_after_restart == session_name_worker


if __name__ == "__main__":
import os

if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
22 changes: 22 additions & 0 deletions python/ray/tests/test_ray_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,28 @@ def test_ray_init_using_hostname(ray_start_cluster):
assert node_table[0].get("NodeManagerHostname", "") == hostname


def test_new_ray_instance_new_session_dir(shutdown_only):
ray.init()
session_dir = ray._private.worker._global_node.get_session_dir_path()
ray.shutdown()
ray.init()
assert ray._private.worker._global_node.get_session_dir_path() != session_dir


def test_new_cluster_new_session_dir(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node()
ray.init(address=cluster.address)
session_dir = ray._private.worker._global_node.get_session_dir_path()
ray.shutdown()
cluster.shutdown()
cluster.add_node()
ray.init(address=cluster.address)
assert ray._private.worker._global_node.get_session_dir_path() != session_dir
ray.shutdown()
cluster.shutdown()


if __name__ == "__main__":
import sys

Expand Down

0 comments on commit 748ddf8

Please sign in to comment.