Skip to content

Commit

Permalink
[core] Expose Redis getter to Python and use to retrieve session name (
Browse files Browse the repository at this point in the history
…ray-project#39194)

If only GCS communicates with Redis, there is no way to initialize the log directories using persisted values. However, these directories are required to exist before starting GCS. Expose a function to the Python layer specifically to retrieve these keys from Redis and set them. Follow-ups will ensure that these keys are only set and retrieved through this interface.
  • Loading branch information
vitsai authored and LeonLuttenberger committed Sep 5, 2023
1 parent 489b070 commit 8d63078
Show file tree
Hide file tree
Showing 13 changed files with 320 additions and 35 deletions.
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2352,6 +2352,7 @@ pyx_library(
deps = [
"//:core_worker_lib",
"//:global_state_accessor_lib",
"//:gcs_server_lib",
"//:raylet_lib",
"//:redis_client",
"//:src/ray/ray_exported_symbols.lds",
Expand Down
56 changes: 46 additions & 10 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
import ray._private.services
import ray._private.utils
from ray._private import storage
from ray._raylet import GcsClient
from ray._raylet import GcsClient, get_session_key_from_storage
from ray._private.resource_spec import ResourceSpec
from ray._private.services import serialize_config, get_address
from ray._private.utils import open_log, try_to_create_directory, try_to_symlink

# Logger for this module. It should be configured at the entry point
Expand Down Expand Up @@ -177,9 +178,15 @@ def __init__(

# Register the temp dir.
if head:
# date including microsecond
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f")
self._session_name = f"session_{date_str}_{os.getpid()}"
# We expect this the first time we initialize a cluster, but not during
# subsequent restarts of the head node.
maybe_key = self.check_persisted_session_name()
if maybe_key is None:
# date including microsecond
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f")
self._session_name = f"session_{date_str}_{os.getpid()}"
else:
self._session_name = ray._private.utils.decode(maybe_key)
else:
if ray_params.session_name is None:
assert not self._default_worker
Expand Down Expand Up @@ -317,6 +324,29 @@ def __init__(
self.validate_ip_port(self.gcs_address)
self._record_stats()

def check_persisted_session_name(self):
if self._ray_params.external_addresses is None:
return None
self._redis_address = self._ray_params.external_addresses[0]
redis_ip_address, redis_port, enable_redis_ssl = get_address(
self._redis_address,
)
# Address is ip:port or redis:https://ip:port
if int(redis_port) < 0:
raise ValueError(
f"Invalid Redis port provided: {redis_port}."
"The port must be a non-negative integer."
)

return get_session_key_from_storage(
redis_ip_address,
int(redis_port),
self._ray_params.redis_password,
enable_redis_ssl,
serialize_config(self._config),
b"session_name",
)

@staticmethod
def validate_ip_port(ip_port):
"""Validates the address is in the ip:port format"""
Expand Down Expand Up @@ -1173,12 +1203,22 @@ def _write_cluster_info_to_kv(self):

ray_usage_lib.put_cluster_metadata(self.get_gcs_client())
# Make sure GCS is up.
self.get_gcs_client().internal_kv_put(
added = self.get_gcs_client().internal_kv_put(
b"session_name",
self._session_name.encode(),
True,
False,
ray_constants.KV_NAMESPACE_SESSION,
)
if not added:
curr_val = self.get_gcs_client().internal_kv_get(
b"session_name", ray_constants.KV_NAMESPACE_SESSION
)
assert curr_val != self._session_name, (
f"Session name {self._session_name} does not match "
f"persisted value {curr_val}. Perhaps there was an "
f"error connecting to Redis."
)

self.get_gcs_client().internal_kv_put(
b"session_dir",
self._session_dir.encode(),
Expand Down Expand Up @@ -1213,13 +1253,9 @@ def start_head_processes(self):
logger.debug(
f"Process STDOUT and STDERR is being " f"redirected to {self._logs_dir}."
)
assert self._redis_address is None
assert self._gcs_address is None
assert self._gcs_client is None

if self._ray_params.external_addresses is not None:
self._redis_address = self._ray_params.external_addresses[0]

self.start_gcs_server()
assert self.get_gcs_client() is not None
self._write_cluster_info_to_kv()
Expand Down
32 changes: 21 additions & 11 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,25 @@ def read_log(filename, lines_to_read):
return None, None


def get_address(redis_address):
parts = redis_address.split(":https://", 1)
enable_redis_ssl = False
if len(parts) == 1:
redis_ip_address, redis_port = parts[0].rsplit(":", 1)
else:
# rediss for SSL
if len(parts) != 2 or parts[0] not in ("redis", "rediss"):
raise ValueError(
f"Invalid redis address {redis_address}."
"Expected format is ip:port or redis:https://ip:port, "
"or rediss:https://ip:port for SSL."
)
redis_ip_address, redis_port = parts[1].rsplit(":", 1)
if parts[0] == "rediss":
enable_redis_ssl = True
return redis_ip_address, redis_port, enable_redis_ssl


def start_gcs_server(
redis_address: str,
log_dir: str,
Expand Down Expand Up @@ -1339,21 +1358,12 @@ def start_gcs_server(
f"--session-name={session_name}",
]
if redis_address:
parts = redis_address.split(":https://", 1)
enable_redis_ssl = "false"
if len(parts) == 1:
redis_ip_address, redis_port = parts[0].rsplit(":", 1)
else:
if len(parts) != 2 or parts[0] not in ("redis", "rediss"):
raise ValueError(f"Invalid redis address {redis_address}")
redis_ip_address, redis_port = parts[1].rsplit(":", 1)
if parts[0] == "rediss":
enable_redis_ssl = "true"
redis_ip_address, redis_port, enable_redis_ssl = get_address(redis_address)

command += [
f"--redis_address={redis_ip_address}",
f"--redis_port={redis_port}",
f"--redis_enable_ssl={enable_redis_ssl}",
f"--redis_enable_ssl={'true' if enable_redis_ssl else 'false'}",
]
if redis_password:
command += [f"--redis_password={redis_password}"]
Expand Down
24 changes: 23 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ from ray.includes.libcoreworker cimport (

from ray.includes.ray_config cimport RayConfig
from ray.includes.global_state_accessor cimport CGlobalStateAccessor
from ray.includes.global_state_accessor cimport RedisDelKeySync
from ray.includes.global_state_accessor cimport RedisDelKeySync, RedisGetKeySync
from ray.includes.optional cimport (
optional, nullopt
)
Expand Down Expand Up @@ -4579,3 +4579,25 @@ cdef void async_callback(shared_ptr[CRayObject] obj,

def del_key_from_storage(host, port, password, use_ssl, key):
return RedisDelKeySync(host, port, password, use_ssl, key)


def get_session_key_from_storage(host, port, password, use_ssl, config, key):
"""
Get the session key from the storage.
Intended to be used for session_name only.
Args:
host: The address of the owner (caller) of the
generator task.
port: The task ID of the generator task.
password: The redis password.
use_ssl: Whether to use SSL.
config: The Ray config. Used to get storage namespace.
key: The key to retrieve.
"""
cdef:
c_string data
result = RedisGetKeySync(host, port, password, use_ssl, config, key, &data)
if result:
return data
else:
return None
66 changes: 66 additions & 0 deletions python/ray/includes/global_state_accessor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,72 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
const c_string &node_ip_address,
c_string *node_to_connect)

cdef extern from * namespace "ray::gcs" nogil:
"""
#include <thread>
#include "ray/gcs/gcs_server/store_client_kv.h"
namespace ray {
namespace gcs {
bool RedisGetKeySync(const std::string& host,
int32_t port,
const std::string& password,
bool use_ssl,
const std::string& config,
const std::string& key,
std::string* data) {
InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog,
ray::RayLog::ShutDownRayLog,
"ray_init",
ray::RayLogLevel::WARNING,
"" /* log_dir */);
RedisClientOptions options(host, port, password, false, use_ssl);
std::string config_list;
RAY_CHECK(absl::Base64Unescape(config, &config_list));
RayConfig::instance().initialize(config_list);
instrumented_io_context io_service;
auto redis_client = std::make_shared<RedisClient>(options);
auto status = redis_client->Connect(io_service);
if(!status.ok()) {
RAY_LOG(ERROR) << "Failed to connect to redis: " << status.ToString();
return false;
}
auto cli = std::make_unique<StoreClientInternalKV>(
std::make_unique<RedisStoreClient>(std::move(redis_client)));
bool ret_val = false;
cli->Get("session", key, [&](std::optional<std::string> result) {
if (result.has_value()) {
*data = result.value();
ret_val = true;
} else {
RAY_LOG(INFO) << "Failed to retrieve the key " << key
<< " from persistent storage.";
ret_val = false;
}
});
io_service.run_for(std::chrono::milliseconds(1000));
return ret_val;
}
}
}
"""
c_bool RedisGetKeySync(const c_string& host,
c_int32_t port,
const c_string& password,
c_bool use_ssl,
const c_string& config,
const c_string& key,
c_string* data)


cdef extern from * namespace "ray::gcs" nogil:
"""
#include <thread>
Expand Down
3 changes: 2 additions & 1 deletion python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ py_test_module_list(
"test_multinode_failures_2.py",
"test_node_manager.py",
"test_object_assign_owner.py",
"test_placement_group.py",
"test_placement_group_2.py",
"test_placement_group_4.py",
"test_placement_group_failover.py",
Expand Down Expand Up @@ -224,6 +223,7 @@ py_test_module_list(
py_test_module_list(
files = [
"test_gcs_ha_e2e.py",
"test_gcs_ha_e2e_2.py",
"test_memory_pressure.py",
"test_node_labels.py",
],
Expand Down Expand Up @@ -317,6 +317,7 @@ py_test_module_list(
"test_reference_counting_2.py",
"test_exit_observability.py",
"test_usage_stats.py",
"test_placement_group.py",
"test_placement_group_3.py",
"test_placement_group_5.py",
"test_cancel.py",
Expand Down
12 changes: 10 additions & 2 deletions python/ray/tests/conftest_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ def print_logs(self):
"9379",
],
volumes={"{head_node_vol.name}": {"bind": "/tmp", "mode": "rw"}},
environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"},
environment={
"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379",
"RAY_raylet_client_num_connect_attempts": "10",
"RAY_raylet_client_connect_timeout_milliseconds": "100",
},
wrapper_class=Container,
ports={
"8000/tcp": None,
Expand All @@ -118,7 +122,11 @@ def print_logs(self):
"9379",
],
volumes={"{worker_node_vol.name}": {"bind": "/tmp", "mode": "rw"}},
environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"},
environment={
"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379",
"RAY_raylet_client_num_connect_attempts": "10",
"RAY_raylet_client_connect_timeout_milliseconds": "100",
},
wrapper_class=Container,
ports={
"8000/tcp": None,
Expand Down
12 changes: 5 additions & 7 deletions python/ray/tests/test_advanced_9.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,31 +355,29 @@ def check_demands(n):

@pytest.mark.skipif(enable_external_redis(), reason="Only valid in non redis env")
def test_redis_not_available(monkeypatch, call_ray_stop_only):
monkeypatch.setenv("RAY_NUM_REDIS_GET_RETRIES", "2")
monkeypatch.setenv("RAY_redis_db_connect_retries", "5")
monkeypatch.setenv("RAY_REDIS_ADDRESS", "localhost:12345")

p = subprocess.run(
"ray start --head",
shell=True,
capture_output=True,
)
assert "Could not establish connection to Redis" in p.stderr.decode()
assert "Please check" in p.stderr.decode()
assert "gcs_server.out for details" in p.stderr.decode()
assert "RuntimeError: Failed to start GCS" in p.stderr.decode()
assert "Please check " in p.stderr.decode()
assert "redis storage is alive or not." in p.stderr.decode()


@pytest.mark.skipif(not enable_external_redis(), reason="Only valid in redis env")
def test_redis_wrong_password(monkeypatch, external_redis, call_ray_stop_only):
monkeypatch.setenv("RAY_NUM_REDIS_GET_RETRIES", "2")
monkeypatch.setenv("RAY_redis_db_connect_retries", "5")
p = subprocess.run(
"ray start --head --redis-password=1234",
shell=True,
capture_output=True,
)

assert "RedisError: ERR AUTH <password> called" in p.stderr.decode()
assert "Please check /tmp/ray/session" in p.stderr.decode()
assert "RuntimeError: Failed to start GCS" in p.stderr.decode()


@pytest.mark.skipif(not enable_external_redis(), reason="Only valid in redis env")
Expand Down
Loading

0 comments on commit 8d63078

Please sign in to comment.