-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core] Fix session_name not reused when GCS restarts + node ip address not set for driver #39211
Changes from all commits
d2f802a
a391f66
3966778
992d7ab
8083748
73bdd49
a25aa34
fa2dc4c
db2af38
8476e4f
ba69b1b
4a0dd44
f6ee80e
023b3a4
dba0008
77b933c
87ea063
302ecd8
cdbf084
b9aedbb
55979f0
4dbb1af
32de899
632442a
1705ec4
cbff14f
3107619
bb8e1f6
eeb3610
484c68a
748ddf8
f35a165
01e492a
8a14bcc
cd2b44d
9330cc6
e32d538
0e73a7e
b3d590a
5a41237
5df758f
951e54c
32e65fc
ec0591f
6734082
672d996
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
from typing import Dict, Optional, Tuple, IO, AnyStr | ||
|
||
from filelock import FileLock | ||
from pathlib import Path | ||
|
||
import ray | ||
import ray._private.ray_constants as ray_constants | ||
|
@@ -96,27 +97,21 @@ def __init__( | |
[primary_redis_ip, port] = external_redis[0].rsplit(":", 1) | ||
ray_params.external_addresses = external_redis | ||
ray_params.num_redis_shards = len(external_redis) - 1 | ||
storage_namespace = os.environ.get("RAY_external_storage_namespace") | ||
if head: | ||
if storage_namespace is None: | ||
raise ValueError( | ||
"RAY_external_storage_namespace must be provided " | ||
"when using Ray with external Redis via `RAY_REDIS_ADDRESS` " | ||
" for the fault tolerance. " | ||
"RAY_external_storage_namespace must be an unique ID " | ||
"and has to be the same across the same head node." | ||
) | ||
# Update the session name to be RAY_external_storage_namespace. | ||
if ray_params.session_name is None: | ||
ray_params.update_if_absent( | ||
session_name=f"session_{storage_namespace}") | ||
|
||
# Try to get node IP address with the parameters. | ||
if ray_params.node_ip_address: | ||
node_ip_address = ray_params.node_ip_address | ||
elif ray_params.redis_address: | ||
node_ip_address = ray.util.get_node_ip_address(ray_params.redis_address) | ||
else: | ||
node_ip_address = ray.util.get_node_ip_address() | ||
self._node_ip_address = node_ip_address | ||
|
||
if ray_params.raylet_ip_address: | ||
raylet_ip_address = ray_params.raylet_ip_address | ||
else: | ||
raylet_ip_address = node_ip_address | ||
|
||
if raylet_ip_address != node_ip_address and (not connect_only or head): | ||
raise ValueError( | ||
"The raylet IP address should only be different than the node " | ||
"IP address when connecting to an existing raylet; i.e., when " | ||
"head=False and connect_only=True." | ||
) | ||
if ( | ||
ray_params._system_config | ||
and len(ray_params._system_config) > 0 | ||
|
@@ -126,8 +121,6 @@ def __init__( | |
"System config parameters can only be set on the head node." | ||
) | ||
|
||
self._raylet_ip_address = raylet_ip_address | ||
|
||
ray_params.update_if_absent( | ||
include_log_monitor=True, | ||
resources={}, | ||
|
@@ -176,12 +169,14 @@ def __init__( | |
self._init_gcs_client() | ||
|
||
# Register the temp dir. | ||
if head: | ||
# date including microsecond | ||
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f") | ||
self._session_name = f"session_{date_str}_{os.getpid()}" | ||
else: | ||
if ray_params.session_name is None: | ||
self._session_name = ray_params.session_name | ||
|
||
if self._session_name is None: | ||
if head: | ||
# date including microsecond | ||
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f") | ||
self._session_name = f"session_{date_str}_{os.getpid()}" | ||
else: | ||
assert not self._default_worker | ||
session_name = ray._private.utils.internal_kv_get_with_retry( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (This not a broker) . the behavior of worker node will be changed if worker node also set Now: If the RAY_external_storage_namespace is set improperly by the user, it can potentially cause issues with the worker nodes. |
||
self.get_gcs_client(), | ||
|
@@ -190,9 +185,6 @@ def __init__( | |
num_retries=ray_constants.NUM_REDIS_GET_RETRIES, | ||
) | ||
self._session_name = ray._private.utils.decode(session_name) | ||
else: | ||
# worker mode | ||
self._session_name = ray_params.session_name | ||
|
||
# Initialize webui url | ||
if head: | ||
|
@@ -206,8 +198,37 @@ def __init__( | |
f"{ray_params.dashboard_host}:{ray_params.dashboard_port}" | ||
) | ||
|
||
# It creates a session_dir. | ||
self._init_temp() | ||
|
||
node_ip_address = ray_params.node_ip_address | ||
if node_ip_address is None: | ||
if connect_only: | ||
node_ip_address = self._wait_and_get_for_node_address() | ||
else: | ||
node_ip_address = ray.util.get_node_ip_address() | ||
|
||
assert node_ip_address is not None | ||
ray_params.update_if_absent( | ||
node_ip_address=node_ip_address, raylet_ip_address=node_ip_address | ||
) | ||
self._node_ip_address = node_ip_address | ||
if not connect_only: | ||
self._write_node_ip_address(node_ip_address) | ||
|
||
if ray_params.raylet_ip_address: | ||
raylet_ip_address = ray_params.raylet_ip_address | ||
else: | ||
raylet_ip_address = node_ip_address | ||
|
||
if raylet_ip_address != node_ip_address and (not connect_only or head): | ||
raise ValueError( | ||
"The raylet IP address should only be different than the node " | ||
"IP address when connecting to an existing raylet; i.e., when " | ||
"head=False and connect_only=True." | ||
) | ||
self._raylet_ip_address = raylet_ip_address | ||
|
||
# Validate and initialize the persistent storage API. | ||
if head: | ||
storage._init_storage(ray_params.storage, is_head=True) | ||
|
@@ -934,6 +955,120 @@ def _get_cached_port( | |
|
||
return port | ||
|
||
def _wait_and_get_for_node_address(self, timeout_s: int = 60) -> str: | ||
"""Wait until the node_ip_address.json file is avialable. | ||
|
||
node_ip_address.json is created when a ray instance is started. | ||
|
||
Args: | ||
timeout_s: If the ip address is not found within this | ||
timeout, it will raise ValueError. | ||
Returns: | ||
The node_ip_address of the current session if it finds it | ||
within timeout_s. | ||
""" | ||
for i in range(timeout_s): | ||
node_ip_address = self._get_cached_node_ip_address() | ||
|
||
if node_ip_address is not None: | ||
return node_ip_address | ||
|
||
time.sleep(1) | ||
if i % 10 == 0: | ||
logger.info( | ||
"Can't find a `node_ip_address.json` file from " | ||
f"{self.get_session_dir_path()}. " | ||
"Have you started Ray instsance using " | ||
"`ray start` or `ray.init`?" | ||
) | ||
|
||
raise ValueError( | ||
"Can't find a `node_ip_address.json` file from " | ||
f"{self.get_session_dir_path()}. " | ||
f"for {timeout_s} seconds. " | ||
"A ray instance hasn't started. " | ||
"Did you do `ray start` or `ray.init` on this host?" | ||
) | ||
|
||
def _get_cached_node_ip_address(self) -> str: | ||
"""Get a node address cached on this session. | ||
|
||
If a ray instance is started by `ray start --node-ip-address`, | ||
the node ip address is cached to a file node_ip_address.json. | ||
Otherwise, the file exists, but it is emptyl. | ||
|
||
This API is process-safe, meaning the file access is protected by | ||
a file lock. | ||
|
||
Returns: | ||
node_ip_address cached on the current node. None if the node | ||
the file doesn't exist, meaning ray instance hasn't been | ||
started on a current node. If node_ip_address is not written | ||
to a file, it means --node-ip-address is not given, and in this | ||
case, we find the IP address ourselves. | ||
""" | ||
assert hasattr(self, "_session_dir") | ||
file_path = Path( | ||
os.path.join(self.get_session_dir_path(), "node_ip_address.json") | ||
) | ||
cached_node_ip_address = {} | ||
|
||
with FileLock(str(file_path.absolute()) + ".lock"): | ||
if not file_path.exists(): | ||
return None | ||
|
||
with file_path.open() as f: | ||
cached_node_ip_address.update(json.load(f)) | ||
|
||
if "node_ip_address" in cached_node_ip_address: | ||
return cached_node_ip_address["node_ip_address"] | ||
else: | ||
return ray.util.get_node_ip_address() | ||
|
||
def _write_node_ip_address(self, node_ip_address: Optional[str]) -> None: | ||
"""Write a node ip address of the current session to | ||
node_ip_address.json. | ||
|
||
If a ray instance is started by `ray start --node-ip-address`, | ||
the node ip address is cached to a file node_ip_address.json. | ||
|
||
This API is process-safe, meaning the file access is protected by | ||
a file lock. | ||
|
||
The file contains a single string node_ip_address. It nothing | ||
is written, --node-ip-address is not given. In this case, Ray | ||
resolves the IP address on its own. It assumes in a single node, | ||
you can have only 1 IP address (which is the assumption ray | ||
has in general). | ||
|
||
node_ip_address is the ip address of the current node. | ||
|
||
Args: | ||
node_ip_address: The node IP address of the current node. | ||
If None, it means the node ip address is not given | ||
by --node-ip-address. In this case, we don't write | ||
anything to a file. | ||
""" | ||
assert hasattr(self, "_session_dir") | ||
|
||
file_path = Path( | ||
os.path.join(self.get_session_dir_path(), "node_ip_address.json") | ||
) | ||
cached_node_ip_address = {} | ||
|
||
with FileLock(str(file_path.absolute()) + ".lock"): | ||
if not file_path.exists(): | ||
with file_path.open(mode="w") as f: | ||
json.dump({}, f) | ||
|
||
with file_path.open() as f: | ||
cached_node_ip_address.update(json.load(f)) | ||
|
||
if node_ip_address is not None: | ||
cached_node_ip_address["node_ip_address"] = node_ip_address | ||
with file_path.open(mode="w") as f: | ||
json.dump(cached_node_ip_address, f) | ||
|
||
def start_reaper_process(self): | ||
""" | ||
Start the reaper process. | ||
|
@@ -1178,9 +1313,10 @@ def _write_cluster_info_to_kv(self): | |
self.get_gcs_client().internal_kv_put( | ||
b"session_name", | ||
self._session_name.encode(), | ||
True, | ||
False, | ||
ray_constants.KV_NAMESPACE_SESSION, | ||
) | ||
|
||
self.get_gcs_client().internal_kv_put( | ||
b"session_dir", | ||
self._session_dir.encode(), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we give users an example of unique id or generator code like
uuid.uuid4()
?