Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Expose Redis getter to Python and use to retrieve session name #39194

Merged
merged 17 commits into from
Sep 5, 2023
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2531,6 +2531,7 @@ pyx_library(
deps = [
"//:core_worker_lib",
"//:global_state_accessor_lib",
"//:gcs_server_lib",
"//:raylet_lib",
"//:redis_client",
"//:src/ray/ray_exported_symbols.lds",
Expand Down
68 changes: 58 additions & 10 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
import ray._private.services
import ray._private.utils
from ray._private import storage
from ray._raylet import GcsClient
from ray._raylet import GcsClient, get_session_key_from_storage
from ray._private.resource_spec import ResourceSpec
from ray._private.services import serialize_config
from ray._private.utils import open_log, try_to_create_directory, try_to_symlink

# Logger for this module. It should be configured at the entry point
Expand Down Expand Up @@ -177,9 +178,15 @@ def __init__(

# Register the temp dir.
if head:
# date including microsecond
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f")
self._session_name = f"session_{date_str}_{os.getpid()}"
# We expect this the first time we initialize a cluster, but not during
# subsequent restarts of the head node.
maybe_key = self.check_persisted_session_name()
if maybe_key is None:
vitsai marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iycheng do we need to handle edge cases like there are 2 head nodes started with the same redis (without storage namespace)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. It's not working right now if the namespace is not set. So not a regression IMO.
Application needs to set it up.

# date including microsecond
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f")
self._session_name = f"session_{date_str}_{os.getpid()}"
else:
self._session_name = ray._private.utils.decode(maybe_key)
else:
if ray_params.session_name is None:
assert not self._default_worker
Expand Down Expand Up @@ -317,6 +324,41 @@ def __init__(
self.validate_ip_port(self.gcs_address)
self._record_stats()

def check_persisted_session_name(self):
if self._ray_params.external_addresses is None:
vitsai marked this conversation as resolved.
Show resolved Hide resolved
return None
self._redis_address = self._ray_params.external_addresses[0]
# Address is ip:port or redis:https://ip:port
parts = self._redis_address.split(":https://", 1)
enable_redis_ssl = False
if len(parts) == 1:
redis_ip_address, redis_port = parts[0].rsplit(":", 1)
else:
# rediss for SSL
if len(parts) != 2 or parts[0] not in ("redis", "rediss"):
vitsai marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
f"Invalid redis address {self._redis_address}."
"Expected format is ip:port or redis:https://ip:port, "
"or rediss:https://ip:port for SSL."
)
redis_ip_address, redis_port = parts[1].rsplit(":", 1)
if parts[0] == "rediss":
enable_redis_ssl = True
if int(redis_port) < 0:
raise ValueError(
f"Invalid Redis port provided: {redis_port}."
"The port must be a non-negative integer."
)

return get_session_key_from_storage(
redis_ip_address,
int(redis_port),
self._ray_params.redis_password,
enable_redis_ssl,
serialize_config(self._config),
b"session_name",
)

@staticmethod
def validate_ip_port(ip_port):
"""Validates the address is in the ip:port format"""
Expand Down Expand Up @@ -1175,12 +1217,22 @@ def _write_cluster_info_to_kv(self):

ray_usage_lib.put_cluster_metadata(self.get_gcs_client())
# Make sure GCS is up.
self.get_gcs_client().internal_kv_put(
added = self.get_gcs_client().internal_kv_put(
b"session_name",
self._session_name.encode(),
True,
False,
ray_constants.KV_NAMESPACE_SESSION,
)
if not added:
curr_val = self.get_gcs_client().internal_kv_get(
b"session_name", ray_constants.KV_NAMESPACE_SESSION
)
assert curr_val != self._session_name, (
f"Session name {self._session_name} does not match "
f"persisted value {curr_val}. Perhaps there was an "
f"error connecting to Redis."
)

self.get_gcs_client().internal_kv_put(
b"session_dir",
self._session_dir.encode(),
Expand Down Expand Up @@ -1215,13 +1267,9 @@ def start_head_processes(self):
logger.debug(
f"Process STDOUT and STDERR is being " f"redirected to {self._logs_dir}."
)
assert self._redis_address is None
assert self._gcs_address is None
assert self._gcs_client is None

if self._ray_params.external_addresses is not None:
self._redis_address = self._ray_params.external_addresses[0]

self.start_gcs_server()
assert self.get_gcs_client() is not None
self._write_cluster_info_to_kv()
Expand Down
24 changes: 23 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ from ray.includes.libcoreworker cimport (

from ray.includes.ray_config cimport RayConfig
from ray.includes.global_state_accessor cimport CGlobalStateAccessor
from ray.includes.global_state_accessor cimport RedisDelKeySync
from ray.includes.global_state_accessor cimport RedisDelKeySync, RedisGetKeySync
from ray.includes.optional cimport (
optional, nullopt
)
Expand Down Expand Up @@ -4574,3 +4574,25 @@ cdef void async_callback(shared_ptr[CRayObject] obj,

def del_key_from_storage(host, port, password, use_ssl, key):
return RedisDelKeySync(host, port, password, use_ssl, key)


def get_session_key_from_storage(host, port, password, use_ssl, config, key):
"""
Get the session key from the storage.
Intended to be used for session_name only.
Args:
host: The address of the owner (caller) of the
generator task.
port: The task ID of the generator task.
password: The redis password.
use_ssl: Whether to use SSL.
config: The Ray config. Used to get storage namespace.
key: The key to retrieve.
"""
cdef:
c_string data
result = RedisGetKeySync(host, port, password, use_ssl, config, key, &data)
if result:
return data
else:
return None
67 changes: 67 additions & 0 deletions python/ray/includes/global_state_accessor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,73 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
const c_string &node_ip_address,
c_string *node_to_connect)

cdef extern from * namespace "ray::gcs" nogil:
"""
#include <thread>
#include "ray/gcs/gcs_server/store_client_kv.h"
namespace ray {
namespace gcs {

bool RedisGetKeySync(const std::string& host,
int32_t port,
const std::string& password,
bool use_ssl,
const std::string& config,
const std::string& key,
std::string* data) {
InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog,
vitsai marked this conversation as resolved.
Show resolved Hide resolved
ray::RayLog::ShutDownRayLog,
"ray_init",
ray::RayLogLevel::WARNING,
"" /* log_dir */);

RedisClientOptions options(host, port, password, false, use_ssl);

std::string config_list;
RAY_CHECK(absl::Base64Unescape(config, &config_list));
RayConfig::instance().initialize(config_list);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm is this safe? What's going to happen if we call ray.init with the same config later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not thread-safe, but this part is single-threaded. Currently, each process (raylet, gcs, worker), calls it once, but if it is called multiple times serially, the values will just be reset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reset sounds like the right behavior,

can you add an unit test to set this config from redis level and set it differently from ray.init level and see if it resolves to ray.init config correctly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comment here this will be reset when a core worker initializes the config again

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Manually tested by setting the value before and after the new Redis call, and running the new test with Redis enabled. It resolves to the correct config. Maybe we can defer the unit test to after this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I fail to get it. Why do we need setup this? Is it about redis configs?


instrumented_io_context io_service;

auto redis_client = std::make_shared<RedisClient>(options);
auto status = redis_client->Connect(io_service);

if(!status.ok()) {
RAY_LOG(ERROR) << "Failed to connect to redis: " << status.ToString();
return false;
}

auto cli = std::make_unique<StoreClientInternalKV>(
std::make_unique<RedisStoreClient>(std::move(redis_client)));

bool ret_val = false;
cli->Get("session", key, [&](std::optional<std::string> result) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's happening if the CLI fails by other error? E.g., timeout or random Redis related issues? Is it just result contains no value? Would this contain error message in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will just not contain a value. Internally, on the C++ side, we do retry in redis_context.cc with exponential backoff, but the whole thing is bounded here by the io_context.run_for(duration). In terms of error propagation, the existing code isn't great about that, unfortunately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this means we cannot distinguish redis failure vs the first time cluster start?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the redis failed, GCS will crash too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I meant GET failure. But we workaround this by checking the session name if put failed (because override=False)

if (result.has_value()) {
*data = result.value();
ret_val = true;
} else {
RAY_LOG(INFO) << "Failed to retrieve the key " << key
<< " from persistent storage.";
ret_val = false;
}
});
io_service.run_for(std::chrono::milliseconds(1000));
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved

return ret_val;
}

}
}
"""
c_bool RedisGetKeySync(const c_string& host,
c_int32_t port,
const c_string& password,
c_bool use_ssl,
const c_string& config,
const c_string& key,
c_string* data)


cdef extern from * namespace "ray::gcs" nogil:
"""
#include <thread>
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ py_test_module_list(
py_test_module_list(
files = [
"test_gcs_ha_e2e.py",
"test_gcs_ha_e2e_2.py",
"test_memory_pressure.py",
"test_node_labels.py",
],
Expand Down
5 changes: 2 additions & 3 deletions python/ray/tests/test_advanced_9.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,8 @@ def test_redis_not_available(monkeypatch, call_ray_stop_only):
capture_output=True,
)
assert "Could not establish connection to Redis" in p.stderr.decode()
assert "Please check" in p.stderr.decode()
assert "gcs_server.out for details" in p.stderr.decode()
assert "RuntimeError: Failed to start GCS" in p.stderr.decode()
assert "Please check " in p.stderr.decode()
assert "redis storage is alive or not." in p.stderr.decode()


@pytest.mark.skipif(not enable_external_redis(), reason="Only valid in redis env")
Expand Down
30 changes: 30 additions & 0 deletions python/ray/tests/test_gcs_fault_tolerance.py
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,36 @@ def check_raylet_healthy():
sleep(1)


def test_session_name(ray_start_cluster):
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
# Kill GCS and check that raylets kill themselves when not backed by Redis,
# and stay alive when backed by Redis.
# Raylets should kill themselves due to cluster ID mismatch in the
# non-persisted case.
cluster = ray_start_cluster
cluster.add_node()
cluster.wait_for_nodes()

head_node = cluster.head_node
session_dir = head_node.get_session_dir_path()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CAn you also submit a driver and check if their session name is correct?

import ray
ray.init()
# Make sure this is correct
ray._private.worker._global_node.get_session_dir_path()

We need this

  1. Start the first head node. Submit a driver to a head node and get a session dir
  2. Start the worker node. Submit a driver to "worker node" and get a session dir.
  3. Kill head node. Restart head node.
  4. Submit a driver to a head node and get a session dir
  5. Submit a driver to a worker node and get a session dir.

This could also be done in a similar way as test_gcs_ha_e2e.py (it starts head / worker in docker containers, so it is easy to test it)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add the same suite of tests in this commit; 748ddf8?


gcs_server_process = head_node.all_processes["gcs_server"][0].process
gcs_server_pid = gcs_server_process.pid
cluster.remove_node(head_node, allow_graceful=False)
# Wait to prevent the gcs server process becoming zombie.
gcs_server_process.wait()
wait_for_pid_to_exit(gcs_server_pid, 1000)

# Add head node back
cluster.add_node()
head_node = cluster.head_node
new_session_dir = head_node.get_session_dir_path()

if not enable_external_redis():
assert session_dir != new_session_dir
else:
assert session_dir == new_session_dir
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add tests inside this commit 748ddf8#diff-8e02c2ad08f47c6f22d3a04682d0c3867bffe7cf5f9f2838e6f0f999e97952d5?

The one inside test_ray_init.py and test_gcs_ha_e2e_2.py. I think test_gcs_ha_e2e_2 should just work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests inside test_ray_init only work if redis is disabled. Right now for the tests, we either disable redis or enable redis for the duration of the entire test. In the latter case, we expect the session to be the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it is to verify session dir is not changed when redis is disabled!



@pytest.mark.parametrize(
"ray_start_regular_with_external_redis",
[
Expand Down
4 changes: 3 additions & 1 deletion python/ray/tests/test_gcs_ha_e2e.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import pytest
import sys
from time import sleep

import pytest

from ray._private.test_utils import wait_for_condition
from ray.tests.conftest_docker import * # noqa

Expand Down
55 changes: 55 additions & 0 deletions python/ray/tests/test_gcs_ha_e2e_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import pytest
import sys
from time import sleep
from ray._private.test_utils import wait_for_condition
from ray.tests.conftest_docker import * # noqa


@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.")
def test_ray_session_name_preserved(docker_cluster):
get_nodes_script = """
import ray
ray.init("auto")
print(ray._private.worker._global_node.session_name)
"""
head, worker = docker_cluster

def get_session_name(to_head=True):
if to_head:
output = head.exec_run(cmd=f"python -c '{get_nodes_script}'")
else:
output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'")
session_name = output.output.decode().strip().split("\n")[-1]
print("Output: ", output.output.decode().strip().split("\n"))
assert output.exit_code == 0
return session_name

# Make sure two nodes are alive
wait_for_condition(get_session_name, to_head=True)
session_name_head = get_session_name(to_head=True)
wait_for_condition(get_session_name, to_head=False)
session_name_worker = get_session_name(to_head=False)
assert session_name_head == session_name_worker
print("head killed")
head.kill()

sleep(2)

head.restart()

wait_for_condition(get_session_name, to_head=True)
session_name_head_after_restart = get_session_name(to_head=True)
wait_for_condition(get_session_name, to_head=False)
session_name_worker_after_restart = get_session_name(to_head=False)
assert session_name_worker_after_restart == session_name_head_after_restart
assert session_name_head == session_name_head_after_restart
assert session_name_worker_after_restart == session_name_worker


if __name__ == "__main__":
import os

if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
10 changes: 10 additions & 0 deletions python/ray/tests/test_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@
)


def test_ray_init_no_redis_logs():
vitsai marked this conversation as resolved.
Show resolved Hide resolved
script = """
import ray

ray.init()
"""
out_str = run_string_as_driver(script)
assert "redis" not in out_str, out_str


def test_dedup_logs():
script = """
import ray
Expand Down
Loading
Loading