Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Expose Redis getter to Python and use to retrieve session name #39194

Merged
merged 17 commits into from
Sep 5, 2023
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2352,6 +2352,7 @@ pyx_library(
deps = [
"//:core_worker_lib",
"//:global_state_accessor_lib",
"//:gcs_server_lib",
"//:raylet_lib",
"//:redis_client",
"//:src/ray/ray_exported_symbols.lds",
Expand Down
56 changes: 46 additions & 10 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
import ray._private.services
import ray._private.utils
from ray._private import storage
from ray._raylet import GcsClient
from ray._raylet import GcsClient, get_session_key_from_storage
from ray._private.resource_spec import ResourceSpec
from ray._private.services import serialize_config, get_address
from ray._private.utils import open_log, try_to_create_directory, try_to_symlink

# Logger for this module. It should be configured at the entry point
Expand Down Expand Up @@ -177,9 +178,15 @@ def __init__(

# Register the temp dir.
if head:
# date including microsecond
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f")
self._session_name = f"session_{date_str}_{os.getpid()}"
# We expect this the first time we initialize a cluster, but not during
# subsequent restarts of the head node.
maybe_key = self.check_persisted_session_name()
if maybe_key is None:
vitsai marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iycheng do we need to handle edge cases like there are 2 head nodes started with the same redis (without storage namespace)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. It's not working right now if the namespace is not set. So not a regression IMO.
Application needs to set it up.

# date including microsecond
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f")
self._session_name = f"session_{date_str}_{os.getpid()}"
else:
self._session_name = ray._private.utils.decode(maybe_key)
else:
if ray_params.session_name is None:
assert not self._default_worker
Expand Down Expand Up @@ -317,6 +324,29 @@ def __init__(
self.validate_ip_port(self.gcs_address)
self._record_stats()

def check_persisted_session_name(self):
if self._ray_params.external_addresses is None:
vitsai marked this conversation as resolved.
Show resolved Hide resolved
return None
self._redis_address = self._ray_params.external_addresses[0]
redis_ip_address, redis_port, enable_redis_ssl = get_address(
self._redis_address,
)
# Address is ip:port or redis:https://ip:port
if int(redis_port) < 0:
raise ValueError(
f"Invalid Redis port provided: {redis_port}."
"The port must be a non-negative integer."
)

return get_session_key_from_storage(
redis_ip_address,
int(redis_port),
self._ray_params.redis_password,
enable_redis_ssl,
serialize_config(self._config),
b"session_name",
)

@staticmethod
def validate_ip_port(ip_port):
"""Validates the address is in the ip:port format"""
Expand Down Expand Up @@ -1173,12 +1203,22 @@ def _write_cluster_info_to_kv(self):

ray_usage_lib.put_cluster_metadata(self.get_gcs_client())
# Make sure GCS is up.
self.get_gcs_client().internal_kv_put(
added = self.get_gcs_client().internal_kv_put(
b"session_name",
self._session_name.encode(),
True,
False,
ray_constants.KV_NAMESPACE_SESSION,
)
if not added:
curr_val = self.get_gcs_client().internal_kv_get(
b"session_name", ray_constants.KV_NAMESPACE_SESSION
)
assert curr_val != self._session_name, (
f"Session name {self._session_name} does not match "
f"persisted value {curr_val}. Perhaps there was an "
f"error connecting to Redis."
)

self.get_gcs_client().internal_kv_put(
b"session_dir",
self._session_dir.encode(),
Expand Down Expand Up @@ -1213,13 +1253,9 @@ def start_head_processes(self):
logger.debug(
f"Process STDOUT and STDERR is being " f"redirected to {self._logs_dir}."
)
assert self._redis_address is None
assert self._gcs_address is None
assert self._gcs_client is None

if self._ray_params.external_addresses is not None:
self._redis_address = self._ray_params.external_addresses[0]

self.start_gcs_server()
assert self.get_gcs_client() is not None
self._write_cluster_info_to_kv()
Expand Down
32 changes: 21 additions & 11 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,25 @@ def read_log(filename, lines_to_read):
return None, None


def get_address(redis_address):
parts = redis_address.split(":https://", 1)
enable_redis_ssl = False
if len(parts) == 1:
redis_ip_address, redis_port = parts[0].rsplit(":", 1)
else:
# rediss for SSL
if len(parts) != 2 or parts[0] not in ("redis", "rediss"):
raise ValueError(
f"Invalid redis address {redis_address}."
"Expected format is ip:port or redis:https://ip:port, "
"or rediss:https://ip:port for SSL."
)
redis_ip_address, redis_port = parts[1].rsplit(":", 1)
if parts[0] == "rediss":
enable_redis_ssl = True
return redis_ip_address, redis_port, enable_redis_ssl


def start_gcs_server(
redis_address: str,
log_dir: str,
Expand Down Expand Up @@ -1339,21 +1358,12 @@ def start_gcs_server(
f"--session-name={session_name}",
]
if redis_address:
parts = redis_address.split(":https://", 1)
enable_redis_ssl = "false"
if len(parts) == 1:
redis_ip_address, redis_port = parts[0].rsplit(":", 1)
else:
if len(parts) != 2 or parts[0] not in ("redis", "rediss"):
raise ValueError(f"Invalid redis address {redis_address}")
redis_ip_address, redis_port = parts[1].rsplit(":", 1)
if parts[0] == "rediss":
enable_redis_ssl = "true"
redis_ip_address, redis_port, enable_redis_ssl = get_address(redis_address)

command += [
f"--redis_address={redis_ip_address}",
f"--redis_port={redis_port}",
f"--redis_enable_ssl={enable_redis_ssl}",
f"--redis_enable_ssl={'true' if enable_redis_ssl else 'false'}",
]
if redis_password:
command += [f"--redis_password={redis_password}"]
Expand Down
24 changes: 23 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ from ray.includes.libcoreworker cimport (

from ray.includes.ray_config cimport RayConfig
from ray.includes.global_state_accessor cimport CGlobalStateAccessor
from ray.includes.global_state_accessor cimport RedisDelKeySync
from ray.includes.global_state_accessor cimport RedisDelKeySync, RedisGetKeySync
from ray.includes.optional cimport (
optional, nullopt
)
Expand Down Expand Up @@ -4579,3 +4579,25 @@ cdef void async_callback(shared_ptr[CRayObject] obj,

def del_key_from_storage(host, port, password, use_ssl, key):
return RedisDelKeySync(host, port, password, use_ssl, key)


def get_session_key_from_storage(host, port, password, use_ssl, config, key):
"""
Get the session key from the storage.
Intended to be used for session_name only.
Args:
host: The address of the owner (caller) of the
generator task.
port: The task ID of the generator task.
password: The redis password.
use_ssl: Whether to use SSL.
config: The Ray config. Used to get storage namespace.
key: The key to retrieve.
"""
cdef:
c_string data
result = RedisGetKeySync(host, port, password, use_ssl, config, key, &data)
if result:
return data
else:
return None
66 changes: 66 additions & 0 deletions python/ray/includes/global_state_accessor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,72 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
const c_string &node_ip_address,
c_string *node_to_connect)

cdef extern from * namespace "ray::gcs" nogil:
"""
#include <thread>
#include "ray/gcs/gcs_server/store_client_kv.h"
namespace ray {
namespace gcs {

bool RedisGetKeySync(const std::string& host,
int32_t port,
const std::string& password,
bool use_ssl,
const std::string& config,
const std::string& key,
std::string* data) {
InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog,
vitsai marked this conversation as resolved.
Show resolved Hide resolved
ray::RayLog::ShutDownRayLog,
"ray_init",
ray::RayLogLevel::WARNING,
"" /* log_dir */);

RedisClientOptions options(host, port, password, false, use_ssl);

std::string config_list;
RAY_CHECK(absl::Base64Unescape(config, &config_list));
RayConfig::instance().initialize(config_list);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm is this safe? What's going to happen if we call ray.init with the same config later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not thread-safe, but this part is single-threaded. Currently, each process (raylet, gcs, worker), calls it once, but if it is called multiple times serially, the values will just be reset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reset sounds like the right behavior,

can you add an unit test to set this config from redis level and set it differently from ray.init level and see if it resolves to ray.init config correctly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comment here this will be reset when a core worker initializes the config again

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Manually tested by setting the value before and after the new Redis call, and running the new test with Redis enabled. It resolves to the correct config. Maybe we can defer the unit test to after this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I fail to get it. Why do we need setup this? Is it about redis configs?


instrumented_io_context io_service;

auto redis_client = std::make_shared<RedisClient>(options);
auto status = redis_client->Connect(io_service);
if(!status.ok()) {
RAY_LOG(ERROR) << "Failed to connect to redis: " << status.ToString();
return false;
}

auto cli = std::make_unique<StoreClientInternalKV>(
std::make_unique<RedisStoreClient>(std::move(redis_client)));

bool ret_val = false;
cli->Get("session", key, [&](std::optional<std::string> result) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's happening if the CLI fails by other error? E.g., timeout or random Redis related issues? Is it just result contains no value? Would this contain error message in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will just not contain a value. Internally, on the C++ side, we do retry in redis_context.cc with exponential backoff, but the whole thing is bounded here by the io_context.run_for(duration). In terms of error propagation, the existing code isn't great about that, unfortunately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this means we cannot distinguish redis failure vs the first time cluster start?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the redis failed, GCS will crash too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I meant GET failure. But we workaround this by checking the session name if put failed (because override=False)

if (result.has_value()) {
*data = result.value();
ret_val = true;
} else {
RAY_LOG(INFO) << "Failed to retrieve the key " << key
<< " from persistent storage.";
ret_val = false;
}
});
io_service.run_for(std::chrono::milliseconds(1000));
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved

return ret_val;
}

}
}
"""
c_bool RedisGetKeySync(const c_string& host,
c_int32_t port,
const c_string& password,
c_bool use_ssl,
const c_string& config,
const c_string& key,
c_string* data)


cdef extern from * namespace "ray::gcs" nogil:
"""
#include <thread>
Expand Down
3 changes: 2 additions & 1 deletion python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ py_test_module_list(
"test_multinode_failures_2.py",
"test_node_manager.py",
"test_object_assign_owner.py",
"test_placement_group.py",
"test_placement_group_2.py",
"test_placement_group_4.py",
"test_placement_group_failover.py",
Expand Down Expand Up @@ -224,6 +223,7 @@ py_test_module_list(
py_test_module_list(
files = [
"test_gcs_ha_e2e.py",
"test_gcs_ha_e2e_2.py",
"test_memory_pressure.py",
"test_node_labels.py",
],
Expand Down Expand Up @@ -317,6 +317,7 @@ py_test_module_list(
"test_reference_counting_2.py",
"test_exit_observability.py",
"test_usage_stats.py",
"test_placement_group.py",
"test_placement_group_3.py",
"test_placement_group_5.py",
"test_cancel.py",
Expand Down
12 changes: 10 additions & 2 deletions python/ray/tests/conftest_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ def print_logs(self):
"9379",
],
volumes={"{head_node_vol.name}": {"bind": "/tmp", "mode": "rw"}},
environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"},
environment={
"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379",
"RAY_raylet_client_num_connect_attempts": "10",
"RAY_raylet_client_connect_timeout_milliseconds": "100",
},
wrapper_class=Container,
ports={
"8000/tcp": None,
Expand All @@ -118,7 +122,11 @@ def print_logs(self):
"9379",
],
volumes={"{worker_node_vol.name}": {"bind": "/tmp", "mode": "rw"}},
environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"},
environment={
"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379",
"RAY_raylet_client_num_connect_attempts": "10",
"RAY_raylet_client_connect_timeout_milliseconds": "100",
},
wrapper_class=Container,
ports={
"8000/tcp": None,
Expand Down
12 changes: 5 additions & 7 deletions python/ray/tests/test_advanced_9.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,31 +355,29 @@ def check_demands(n):

@pytest.mark.skipif(enable_external_redis(), reason="Only valid in non redis env")
def test_redis_not_available(monkeypatch, call_ray_stop_only):
monkeypatch.setenv("RAY_NUM_REDIS_GET_RETRIES", "2")
monkeypatch.setenv("RAY_redis_db_connect_retries", "5")
monkeypatch.setenv("RAY_REDIS_ADDRESS", "localhost:12345")

p = subprocess.run(
"ray start --head",
shell=True,
capture_output=True,
)
assert "Could not establish connection to Redis" in p.stderr.decode()
assert "Please check" in p.stderr.decode()
assert "gcs_server.out for details" in p.stderr.decode()
assert "RuntimeError: Failed to start GCS" in p.stderr.decode()
assert "Please check " in p.stderr.decode()
assert "redis storage is alive or not." in p.stderr.decode()


@pytest.mark.skipif(not enable_external_redis(), reason="Only valid in redis env")
def test_redis_wrong_password(monkeypatch, external_redis, call_ray_stop_only):
monkeypatch.setenv("RAY_NUM_REDIS_GET_RETRIES", "2")
monkeypatch.setenv("RAY_redis_db_connect_retries", "5")
p = subprocess.run(
"ray start --head --redis-password=1234",
shell=True,
capture_output=True,
)

assert "RedisError: ERR AUTH <password> called" in p.stderr.decode()
assert "Please check /tmp/ray/session" in p.stderr.decode()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the error message now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just the authentication error

assert "RuntimeError: Failed to start GCS" in p.stderr.decode()


@pytest.mark.skipif(not enable_external_redis(), reason="Only valid in redis env")
Expand Down
Loading
Loading