Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Fix session_name not reused when GCS restarts + node ip address not set for driver #39211

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
d2f802a
ip
rkooo567 Jul 20, 2023
a391f66
ip
rkooo567 Jul 21, 2023
3966778
working now.
rkooo567 Jul 21, 2023
992d7ab
working + lint
rkooo567 Jul 21, 2023
8083748
Fix
rkooo567 Jul 21, 2023
73bdd49
Merge branch 'master' into automatically-set-node-ip-addr
rkooo567 Aug 10, 2023
a25aa34
ip
rkooo567 Aug 10, 2023
fa2dc4c
Merge branch 'master' into automatically-set-node-ip-addr
rkooo567 Aug 22, 2023
db2af38
ip
rkooo567 Aug 22, 2023
8476e4f
ip
rkooo567 Aug 22, 2023
ba69b1b
Made it work.
rkooo567 Aug 22, 2023
4a0dd44
working
rkooo567 Aug 22, 2023
f6ee80e
Fixed a broken test.
rkooo567 Aug 22, 2023
023b3a4
Merge branch 'master' into automatically-set-node-ip-addr
rkooo567 Aug 22, 2023
dba0008
Fixed the test failure.
rkooo567 Aug 22, 2023
77b933c
print error messages before assertion
rkooo567 Aug 23, 2023
87ea063
Merge branch 'master' into automatically-set-node-ip-addr
rkooo567 Aug 23, 2023
302ecd8
ip
rkooo567 Aug 23, 2023
cdbf084
more info for debugging.
rkooo567 Aug 23, 2023
b9aedbb
Merge branch 'master' into automatically-set-node-ip-addr
rkooo567 Aug 31, 2023
55979f0
ip
rkooo567 Aug 31, 2023
4dbb1af
ip
rkooo567 Aug 31, 2023
32de899
ip
rkooo567 Aug 31, 2023
632442a
ip
rkooo567 Aug 31, 2023
1705ec4
remove bind
rkooo567 Aug 31, 2023
cbff14f
try fixing it.
rkooo567 Aug 31, 2023
3107619
remove print
rkooo567 Aug 31, 2023
bb8e1f6
Work around.
rkooo567 Aug 31, 2023
eeb3610
.
rkooo567 Aug 31, 2023
484c68a
Revert
rkooo567 Sep 1, 2023
748ddf8
Wokrs not
rkooo567 Sep 1, 2023
f35a165
Fix failed ha tests.
rkooo567 Sep 1, 2023
01e492a
fix some tests.
rkooo567 Sep 1, 2023
8a14bcc
done
rkooo567 Sep 1, 2023
cd2b44d
maybe working?
rkooo567 Sep 1, 2023
9330cc6
Revert "maybe working?"
rkooo567 Sep 1, 2023
e32d538
Revert "done"
rkooo567 Sep 1, 2023
0e73a7e
Revert "fix some tests."
rkooo567 Sep 1, 2023
b3d590a
Revert "Fix failed ha tests."
rkooo567 Sep 1, 2023
5a41237
Revert "Wokrs not"
rkooo567 Sep 1, 2023
5df758f
clean up
rkooo567 Sep 1, 2023
951e54c
Revert "Revert "Wokrs not""
rkooo567 Sep 1, 2023
32e65fc
Revert "Revert "Fix failed ha tests.""
rkooo567 Sep 1, 2023
ec0591f
Revert "Revert "fix some tests.""
rkooo567 Sep 1, 2023
6734082
Revert "Revert "done""
rkooo567 Sep 1, 2023
672d996
Revert "Revert "maybe working?""
rkooo567 Sep 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ip
  • Loading branch information
rkooo567 committed Aug 22, 2023
commit db2af38d28b593900f1667077786e3ae967df699
84 changes: 62 additions & 22 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,14 @@ def __init__(

node_ip_address = ray_params.node_ip_address
if connect_only:
self._wait_for_node_address()
node_ip_address = self._wait_for_node_address()
else:
if node_ip_address is None:
node_ip_address = ray._private.services.resolve_ip_for_localhost(
ray_constants.NODE_DEFAULT_IP
)

node_ip_address = self._get_or_write_node_address(node_ip_address)


assert node_ip_address is not None
ray_params.update_if_absent(
node_ip_address=node_ip_address, raylet_ip_address=node_ip_address
)
Expand Down Expand Up @@ -277,6 +276,13 @@ def __init__(
default_port=ray_params.runtime_env_agent_port,
)

# Write a node_ip_address to a file so that
# ray.init can pick up.
# This has to be done here because it requires
# self.unique_id to exist.
if not connect_only:
self._write_node_ip_address(node_ip_address)

ray_params.update_if_absent(
metrics_agent_port=self.metrics_agent_port,
metrics_export_port=self._metrics_export_port,
Expand Down Expand Up @@ -944,35 +950,44 @@ def _get_cached_port(

return port

def _wait_for_node_address(self, timeout_s=60):
def _wait_for_node_address(self, timeout_s: int = 60) -> str:
"""Wait until the node_ip_address.json file is avialable.

node_ip_address.json is created when a ray instance is started.

Args:
timeout_s: If the ip address is not found within this
timeout, it will raise ValueError.
Returns:
The node_ip_address of the current session if it finds it
within timeout_s.
"""
assert hasattr(self, "_session_dir")
NODE_IP_FILE_NAME = "node_ip_address.json"
file_path = Path(os.path.join(self.get_session_dir_path(), NODE_IP_FILE_NAME))
for i in range(timeout_s):
if file_path.exists():
node_ip_address = self._get_cached_node_ip_address()

if node_ip_address is not None:
break

time.sleep(1)
if i % 10 == 0:
logger.info(
f"Can't find a `{NODE_IP_FILE_NAME}` file from "
f"{file_path}."
f"{file_path} or can't the unique id "
f"{self.unique_id} from the file. "
"Have you started Ray instsance using "
"`ray start` or `ray.init`?"
)
if i == MAX_WAIT_S:
raise ValueError(
f"Can't find a `{NODE_IP_FILE_NAME}` file from {file_path}"
f"Can't find a `{NODE_IP_FILE_NAME}` file from "
f"{file_path} or can't the unique id "
f"{self.unique_id} from the file "
f"for {MAX_WAIT_S} seconds"
"It means the ray instance hasn't started. "
"Did you do `ray start` or `ray.init` on this host?"
)

def _get_or_write_node_address(self, node_ip_address: str) -> str:
def _get_cached_node_ip_address(self) -> str:
"""Get a node address cached on this session.

If a ray instance is started by `ray start --node-ip-address`,
Expand All @@ -981,11 +996,41 @@ def _get_or_write_node_address(self, node_ip_address: str) -> str:
This API is process-safe, meaning the file access is protected by
a file lock.

Returns:
node_ip_address cached on the current node. None if the node
ip addrss is not written to a file or the file doesn't exist.
"""
assert hasattr(self, "_session_dir")
file_path = Path(
os.path.join(self.get_session_dir_path(), "node_ip_address.json")
)
cached_node_ip_address = {}

if not file_path.exists():
return None

with FileLock(str(file_path.absolute()) + ".lock"):
with file_path.open() as f:
cached_node_ip_address.update(json.load(f))

if self.unique_id in cached_node_ip_address:
return cached_node_ip_address[self.unique_id]
else:
return None


def _write_node_ip_address(self, node_ip_address: str) -> None:
"""Write a node ip address of the current session to
node_ip_address.json.

If a ray instance is started by `ray start --node-ip-address`,
the node ip address is cached to a file node_ip_address.json.

This API is process-safe, meaning the file access is protected by
a file lock.

Args:
node_ip_address: The node IP address of the current node.
Returns:
node_ip_address of the current node passed to ray start if it exists.
None otherwise.
"""
assert hasattr(self, "_session_dir")
file_path = Path(
Expand All @@ -1001,16 +1046,11 @@ def _get_or_write_node_address(self, node_ip_address: str) -> str:
with file_path.open() as f:
cached_node_ip_address.update(json.load(f))

if "node_ip_address" in cached_node_ip_address:
# The port has already been cached at this node, so use it.
node_ip_address = cached_node_ip_address["node_ip_address"]
else:
cached_node_ip_address["node_ip_address"] = node_ip_address
if self.unique_id not in cached_node_ip_address:
cached_node_ip_address[self.unique_id] = node_ip_address
with file_path.open(mode="w") as f:
json.dump(cached_node_ip_address, f)

return node_ip_address

def start_reaper_process(self):
"""
Start the reaper process.
Expand Down
8 changes: 8 additions & 0 deletions python/ray/tests/test_ray_init_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,14 @@ def verify():
subprocess.check_output("ray stop --force", shell=True)


def test_get_and_write_node_ip_address(shutdown_only):
ray.init()
node = ray._private.worker.global_worker.node
node_ip = ray._private.services.get_node_ip_address()
cached_node_ip_address = node._get_cached_node_ip_address()
assert cached_node_ip_address.get(node.unique_id) == node_ip


@pytest.mark.skipif(sys.platform != "linux", reason="skip except linux")
def test_ray_init_from_workers(ray_start_cluster):
cluster = ray_start_cluster
Expand Down