Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Fix session_name not reused when GCS restarts + node ip address not set for driver #39211

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
d2f802a
ip
rkooo567 Jul 20, 2023
a391f66
ip
rkooo567 Jul 21, 2023
3966778
working now.
rkooo567 Jul 21, 2023
992d7ab
working + lint
rkooo567 Jul 21, 2023
8083748
Fix
rkooo567 Jul 21, 2023
73bdd49
Merge branch 'master' into automatically-set-node-ip-addr
rkooo567 Aug 10, 2023
a25aa34
ip
rkooo567 Aug 10, 2023
fa2dc4c
Merge branch 'master' into automatically-set-node-ip-addr
rkooo567 Aug 22, 2023
db2af38
ip
rkooo567 Aug 22, 2023
8476e4f
ip
rkooo567 Aug 22, 2023
ba69b1b
Made it work.
rkooo567 Aug 22, 2023
4a0dd44
working
rkooo567 Aug 22, 2023
f6ee80e
Fixed a broken test.
rkooo567 Aug 22, 2023
023b3a4
Merge branch 'master' into automatically-set-node-ip-addr
rkooo567 Aug 22, 2023
dba0008
Fixed the test failure.
rkooo567 Aug 22, 2023
77b933c
print error messages before assertion
rkooo567 Aug 23, 2023
87ea063
Merge branch 'master' into automatically-set-node-ip-addr
rkooo567 Aug 23, 2023
302ecd8
ip
rkooo567 Aug 23, 2023
cdbf084
more info for debugging.
rkooo567 Aug 23, 2023
b9aedbb
Merge branch 'master' into automatically-set-node-ip-addr
rkooo567 Aug 31, 2023
55979f0
ip
rkooo567 Aug 31, 2023
4dbb1af
ip
rkooo567 Aug 31, 2023
32de899
ip
rkooo567 Aug 31, 2023
632442a
ip
rkooo567 Aug 31, 2023
1705ec4
remove bind
rkooo567 Aug 31, 2023
cbff14f
try fixing it.
rkooo567 Aug 31, 2023
3107619
remove print
rkooo567 Aug 31, 2023
bb8e1f6
Work around.
rkooo567 Aug 31, 2023
eeb3610
.
rkooo567 Aug 31, 2023
484c68a
Revert
rkooo567 Sep 1, 2023
748ddf8
Wokrs not
rkooo567 Sep 1, 2023
f35a165
Fix failed ha tests.
rkooo567 Sep 1, 2023
01e492a
fix some tests.
rkooo567 Sep 1, 2023
8a14bcc
done
rkooo567 Sep 1, 2023
cd2b44d
maybe working?
rkooo567 Sep 1, 2023
9330cc6
Revert "maybe working?"
rkooo567 Sep 1, 2023
e32d538
Revert "done"
rkooo567 Sep 1, 2023
0e73a7e
Revert "fix some tests."
rkooo567 Sep 1, 2023
b3d590a
Revert "Fix failed ha tests."
rkooo567 Sep 1, 2023
5a41237
Revert "Wokrs not"
rkooo567 Sep 1, 2023
5df758f
clean up
rkooo567 Sep 1, 2023
951e54c
Revert "Revert "Wokrs not""
rkooo567 Sep 1, 2023
32e65fc
Revert "Revert "Fix failed ha tests.""
rkooo567 Sep 1, 2023
ec0591f
Revert "Revert "fix some tests.""
rkooo567 Sep 1, 2023
6734082
Revert "Revert "done""
rkooo567 Sep 1, 2023
672d996
Revert "Revert "maybe working?""
rkooo567 Sep 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ spec:
# This is used in the ray start command so that Ray can spawn the
# correct number of processes. Omitting this may lead to degraded
# performance.
- name: RAY_external_storage_namespace
value: # <Unique ID>. RAY_external_storage_namespace is used to isolate
# the data stored in Redis.
- name: MY_CPU_REQUEST
valueFrom:
resourceFieldRef:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ metadata. One drawback of this approach is that the head node loses the metadata
Ray can also write this metadata to an external Redis for reliability and high availability.
With this setup, the static Ray cluster can recover from head node crashes and tolerate GCS failures without losing connections to worker nodes.

To use this feature, we need to pass in the `RAY_REDIS_ADDRESS` env var and `--redis-password` in the Ray head node section of [the Kubernetes deployment config file](https://raw.githubusercontent.com/ray-project/ray/master/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml).
To use this feature, we need to pass in the `RAY_REDIS_ADDRESS` env var, `RAY_external_storage_namespace`, and `--redis-password` in the Ray head node section of [the Kubernetes deployment config file](https://raw.githubusercontent.com/ray-project/ray/master/doc/source/cluster/kubernetes/configs/static-ray-cluster.with-fault-tolerance.yaml).

`RAY_external_storage_namespace` is used to isolate the data stored in Redis.
This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance.
`RAY_external_storage_namespace` must be an unique ID, and whenever you restart a head node,
it should be the same.

## Running Applications on the static Ray Cluster

Expand Down
17 changes: 12 additions & 5 deletions doc/source/ray-core/fault_tolerance/gcs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,29 @@ Setting up Redis
set the OS environment ``RAY_REDIS_ADDRESS`` to
the Redis address, and supply the ``--redis-password`` flag with the password when calling ``ray start``:

You should also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis.
This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance.
``RAY_external_storage_namespace`` must be an unique ID, and whenever you restart a head node,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we give users an example of unique id or generator code like uuid.uuid4() ?

it should be the same.

.. code-block:: shell

RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD
RAY_external_storage_namespace=<unique_id> RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD

.. tab-item:: ray up

If you are using :ref:`ray up <ray-up-doc>` to start the Ray cluster, change :ref:`head_start_ray_commands <cluster-configuration-head-start-ray-commands>` field to add ``RAY_REDIS_ADDRESS`` and ``--redis-password`` to the ``ray start`` command:

You should also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis.
This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance.
``RAY_external_storage_namespace`` must be an unique ID, and whenever you restart a head node,
it should be the same.

.. code-block:: yaml

head_start_ray_commands:
- ray stop
- ulimit -n 65536; RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --dashboard-host=0.0.0.0
- ulimit -n 65536; RAY_external_storage_namespace=<unique_id> RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --dashboard-host=0.0.0.0

.. tab-item:: Kubernetes

Expand All @@ -60,9 +70,6 @@ If the raylet fails to reconnect to the GCS for more than 60 seconds,
the raylet will exit and the corresponding node fails.
This timeout threshold can be tuned by the OS environment variable ``RAY_gcs_rpc_server_reconnect_timeout_s``.

You can also set the OS environment variable ``RAY_external_storage_namespace`` to isolate the data stored in Redis.
This makes sure that there is no data conflicts if multiple Ray clusters share the same Redis instance.

If the IP address of GCS will change after restarts, it's better to use a qualified domain name
and pass it to all raylets at start time. Raylet will resolve the domain name and connect to
the correct GCS. You need to ensure that at any time, only one GCS is alive.
Expand Down
200 changes: 168 additions & 32 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import Dict, Optional, Tuple, IO, AnyStr

from filelock import FileLock
from pathlib import Path

import ray
import ray._private.ray_constants as ray_constants
Expand Down Expand Up @@ -96,27 +97,21 @@ def __init__(
[primary_redis_ip, port] = external_redis[0].rsplit(":", 1)
ray_params.external_addresses = external_redis
ray_params.num_redis_shards = len(external_redis) - 1
storage_namespace = os.environ.get("RAY_external_storage_namespace")
if head:
if storage_namespace is None:
raise ValueError(
"RAY_external_storage_namespace must be provided "
"when using Ray with external Redis via `RAY_REDIS_ADDRESS` "
" for the fault tolerance. "
"RAY_external_storage_namespace must be an unique ID "
"and has to be the same across the same head node."
)
# Update the session name to be RAY_external_storage_namespace.
if ray_params.session_name is None:
ray_params.update_if_absent(
session_name=f"session_{storage_namespace}")

# Try to get node IP address with the parameters.
if ray_params.node_ip_address:
node_ip_address = ray_params.node_ip_address
elif ray_params.redis_address:
node_ip_address = ray.util.get_node_ip_address(ray_params.redis_address)
else:
node_ip_address = ray.util.get_node_ip_address()
self._node_ip_address = node_ip_address

if ray_params.raylet_ip_address:
raylet_ip_address = ray_params.raylet_ip_address
else:
raylet_ip_address = node_ip_address

if raylet_ip_address != node_ip_address and (not connect_only or head):
raise ValueError(
"The raylet IP address should only be different than the node "
"IP address when connecting to an existing raylet; i.e., when "
"head=False and connect_only=True."
)
if (
ray_params._system_config
and len(ray_params._system_config) > 0
Expand All @@ -126,8 +121,6 @@ def __init__(
"System config parameters can only be set on the head node."
)

self._raylet_ip_address = raylet_ip_address

ray_params.update_if_absent(
include_log_monitor=True,
resources={},
Expand Down Expand Up @@ -176,12 +169,14 @@ def __init__(
self._init_gcs_client()

# Register the temp dir.
if head:
# date including microsecond
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f")
self._session_name = f"session_{date_str}_{os.getpid()}"
else:
if ray_params.session_name is None:
self._session_name = ray_params.session_name

if self._session_name is None:
if head:
# date including microsecond
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f")
self._session_name = f"session_{date_str}_{os.getpid()}"
else:
assert not self._default_worker
session_name = ray._private.utils.internal_kv_get_with_retry(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This not a broker) . the behavior of worker node will be changed if worker node also set RAY_external_storage_namespace.
origin:
worker node's session_name always fetch from kv storage.

Now:
If worker node set RAY_external_storage_namespace. worker node's session_name will use RAY_external_storage_namespace first.

If the RAY_external_storage_namespace is set improperly by the user, it can potentially cause issues with the worker nodes.

self.get_gcs_client(),
Expand All @@ -190,9 +185,6 @@ def __init__(
num_retries=ray_constants.NUM_REDIS_GET_RETRIES,
)
self._session_name = ray._private.utils.decode(session_name)
else:
# worker mode
self._session_name = ray_params.session_name

# Initialize webui url
if head:
Expand All @@ -206,8 +198,37 @@ def __init__(
f"{ray_params.dashboard_host}:{ray_params.dashboard_port}"
)

# It creates a session_dir.
self._init_temp()

node_ip_address = ray_params.node_ip_address
if node_ip_address is None:
if connect_only:
node_ip_address = self._wait_and_get_for_node_address()
else:
node_ip_address = ray.util.get_node_ip_address()

assert node_ip_address is not None
ray_params.update_if_absent(
node_ip_address=node_ip_address, raylet_ip_address=node_ip_address
)
self._node_ip_address = node_ip_address
if not connect_only:
self._write_node_ip_address(node_ip_address)

if ray_params.raylet_ip_address:
raylet_ip_address = ray_params.raylet_ip_address
else:
raylet_ip_address = node_ip_address

if raylet_ip_address != node_ip_address and (not connect_only or head):
raise ValueError(
"The raylet IP address should only be different than the node "
"IP address when connecting to an existing raylet; i.e., when "
"head=False and connect_only=True."
)
self._raylet_ip_address = raylet_ip_address

# Validate and initialize the persistent storage API.
if head:
storage._init_storage(ray_params.storage, is_head=True)
Expand Down Expand Up @@ -934,6 +955,120 @@ def _get_cached_port(

return port

def _wait_and_get_for_node_address(self, timeout_s: int = 60) -> str:
"""Wait until the node_ip_address.json file is avialable.

node_ip_address.json is created when a ray instance is started.

Args:
timeout_s: If the ip address is not found within this
timeout, it will raise ValueError.
Returns:
The node_ip_address of the current session if it finds it
within timeout_s.
"""
for i in range(timeout_s):
node_ip_address = self._get_cached_node_ip_address()

if node_ip_address is not None:
return node_ip_address

time.sleep(1)
if i % 10 == 0:
logger.info(
"Can't find a `node_ip_address.json` file from "
f"{self.get_session_dir_path()}. "
"Have you started Ray instsance using "
"`ray start` or `ray.init`?"
)

raise ValueError(
"Can't find a `node_ip_address.json` file from "
f"{self.get_session_dir_path()}. "
f"for {timeout_s} seconds. "
"A ray instance hasn't started. "
"Did you do `ray start` or `ray.init` on this host?"
)

def _get_cached_node_ip_address(self) -> str:
"""Get a node address cached on this session.

If a ray instance is started by `ray start --node-ip-address`,
the node ip address is cached to a file node_ip_address.json.
Otherwise, the file exists, but it is emptyl.

This API is process-safe, meaning the file access is protected by
a file lock.

Returns:
node_ip_address cached on the current node. None if the node
the file doesn't exist, meaning ray instance hasn't been
started on a current node. If node_ip_address is not written
to a file, it means --node-ip-address is not given, and in this
case, we find the IP address ourselves.
"""
assert hasattr(self, "_session_dir")
file_path = Path(
os.path.join(self.get_session_dir_path(), "node_ip_address.json")
)
cached_node_ip_address = {}

with FileLock(str(file_path.absolute()) + ".lock"):
if not file_path.exists():
return None

with file_path.open() as f:
cached_node_ip_address.update(json.load(f))

if "node_ip_address" in cached_node_ip_address:
return cached_node_ip_address["node_ip_address"]
else:
return ray.util.get_node_ip_address()

def _write_node_ip_address(self, node_ip_address: Optional[str]) -> None:
"""Write a node ip address of the current session to
node_ip_address.json.

If a ray instance is started by `ray start --node-ip-address`,
the node ip address is cached to a file node_ip_address.json.

This API is process-safe, meaning the file access is protected by
a file lock.

The file contains a single string node_ip_address. It nothing
is written, --node-ip-address is not given. In this case, Ray
resolves the IP address on its own. It assumes in a single node,
you can have only 1 IP address (which is the assumption ray
has in general).

node_ip_address is the ip address of the current node.

Args:
node_ip_address: The node IP address of the current node.
If None, it means the node ip address is not given
by --node-ip-address. In this case, we don't write
anything to a file.
"""
assert hasattr(self, "_session_dir")

file_path = Path(
os.path.join(self.get_session_dir_path(), "node_ip_address.json")
)
cached_node_ip_address = {}

with FileLock(str(file_path.absolute()) + ".lock"):
if not file_path.exists():
with file_path.open(mode="w") as f:
json.dump({}, f)

with file_path.open() as f:
cached_node_ip_address.update(json.load(f))

if node_ip_address is not None:
cached_node_ip_address["node_ip_address"] = node_ip_address
with file_path.open(mode="w") as f:
json.dump(cached_node_ip_address, f)

def start_reaper_process(self):
"""
Start the reaper process.
Expand Down Expand Up @@ -1178,9 +1313,10 @@ def _write_cluster_info_to_kv(self):
self.get_gcs_client().internal_kv_put(
b"session_name",
self._session_name.encode(),
True,
False,
ray_constants.KV_NAMESPACE_SESSION,
)

self.get_gcs_client().internal_kv_put(
b"session_dir",
self._session_dir.encode(),
Expand Down
14 changes: 3 additions & 11 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1301,9 +1301,7 @@ def init(
)
_redis_max_memory: Optional[int] = kwargs.pop("_redis_max_memory", None)
_plasma_directory: Optional[str] = kwargs.pop("_plasma_directory", None)
_node_ip_address: str = kwargs.pop(
"_node_ip_address", ray_constants.NODE_DEFAULT_IP
)
_node_ip_address: str = kwargs.pop("_node_ip_address", None)
_driver_object_store_memory: Optional[int] = kwargs.pop(
"_driver_object_store_memory", None
)
Expand Down Expand Up @@ -1458,10 +1456,6 @@ def init(
gcs_address = bootstrap_address
logger.info("Connecting to existing Ray cluster at address: %s...", gcs_address)

if _node_ip_address is not None:
node_ip_address = services.resolve_ip_for_localhost(_node_ip_address)
raylet_ip_address = node_ip_address

if local_mode:
driver_mode = LOCAL_MODE
warnings.warn(
Expand Down Expand Up @@ -1506,8 +1500,7 @@ def init(

# Use a random port by not specifying Redis port / GCS server port.
ray_params = ray._private.parameter.RayParams(
node_ip_address=node_ip_address,
raylet_ip_address=raylet_ip_address,
node_ip_address=_node_ip_address,
object_ref_seed=None,
driver_mode=driver_mode,
redirect_output=None,
Expand Down Expand Up @@ -1590,8 +1583,7 @@ def init(

# In this case, we only need to connect the node.
ray_params = ray._private.parameter.RayParams(
node_ip_address=node_ip_address,
raylet_ip_address=raylet_ip_address,
node_ip_address=_node_ip_address,
gcs_address=gcs_address,
redis_address=redis_address,
redis_password=_redis_password,
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ py_test_module_list(
py_test_module_list(
files = [
"test_gcs_ha_e2e.py",
"test_gcs_ha_e2e_2.py",
"test_memory_pressure.py",
"test_node_labels.py",
],
Expand Down
Loading
Loading