Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Expose Redis getter to Python and use to retrieve session name #39194

Merged
merged 17 commits into from
Sep 5, 2023
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2531,6 +2531,7 @@ pyx_library(
deps = [
"//:core_worker_lib",
"//:global_state_accessor_lib",
"//:gcs_server_lib",
"//:raylet_lib",
"//:redis_client",
"//:src/ray/ray_exported_symbols.lds",
Expand Down
38 changes: 30 additions & 8 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
import ray._private.services
import ray._private.utils
from ray._private import storage
from ray._raylet import GcsClient
from ray._raylet import GcsClient, get_key_from_storage
from ray._private.resource_spec import ResourceSpec
from ray._private.services import serialize_config
from ray._private.utils import open_log, try_to_create_directory, try_to_symlink

# Logger for this module. It should be configured at the entry point
Expand Down Expand Up @@ -177,9 +178,34 @@ def __init__(

# Register the temp dir.
if head:
# date including microsecond
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f")
self._session_name = f"session_{date_str}_{os.getpid()}"
maybe_key = None
if self._ray_params.external_addresses is not None:
vitsai marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: What happens if Redis is not started here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need Redis to be started by the time the client tries to connect to it, which it does with retries + exponential backoff and a timeout, so there is some leeway, but if it fails to connect then ray start will fail. In these lines specifically, we are only working with the parameters passed in, which are static from the start.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be clear, if redis is not started, this will fail in 1 second? what's going to be error messageS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will fail saying that it can't connect to Redis. This is unfortunately a fatal log in the C++ Redis context, but the cause should be clear.

self._redis_address = self._ray_params.external_addresses[0]
parts = self._redis_address.split(":https://", 1)
vitsai marked this conversation as resolved.
Show resolved Hide resolved
enable_redis_ssl = False
if len(parts) == 1:
redis_ip_address, redis_port = parts[0].rsplit(":", 1)
else:
if len(parts) != 2 or parts[0] not in ("redis", "rediss"):
raise ValueError(f"Invalid redis address {self._redis_address}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you improve the error message to include what's the definition of "valid redis address"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elaborated. This part comes from the start_gcs_server logic in services.py though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

many of our error messages are bad lol... We even had a project to "write better error message" in the past haha..

redis_ip_address, redis_port = parts[1].rsplit(":", 1)
if parts[0] == "rediss":
enable_redis_ssl = True
maybe_key = get_key_from_storage(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the same validation logic as cleanup_redis_storage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it for ports because the other type checks will happen during runtime and yield a very helpful error anyway (at least that was my experience with pytest on a type mismatch here).

redis_ip_address,
int(redis_port),
self._ray_params.redis_password,
enable_redis_ssl,
serialize_config(self._config),
b"session_name",
)

if maybe_key is None:
vitsai marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iycheng do we need to handle edge cases like there are 2 head nodes started with the same redis (without storage namespace)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. It's not working right now if the namespace is not set. So not a regression IMO.
Application needs to set it up.

# date including microsecond
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f")
self._session_name = f"session_{date_str}_{os.getpid()}"
else:
self._session_name = ray._private.utils.decode(maybe_key)
else:
if ray_params.session_name is None:
assert not self._default_worker
Expand Down Expand Up @@ -1215,13 +1241,9 @@ def start_head_processes(self):
logger.debug(
f"Process STDOUT and STDERR is being " f"redirected to {self._logs_dir}."
)
assert self._redis_address is None
assert self._gcs_address is None
assert self._gcs_client is None

if self._ray_params.external_addresses is not None:
self._redis_address = self._ray_params.external_addresses[0]

self.start_gcs_server()
assert self.get_gcs_client() is not None
self._write_cluster_info_to_kv()
Expand Down
12 changes: 11 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ from ray.includes.libcoreworker cimport (

from ray.includes.ray_config cimport RayConfig
from ray.includes.global_state_accessor cimport CGlobalStateAccessor
from ray.includes.global_state_accessor cimport RedisDelKeySync
from ray.includes.global_state_accessor cimport RedisDelKeySync, RedisGetKeySync
from ray.includes.optional cimport (
optional, nullopt
)
Expand Down Expand Up @@ -4574,3 +4574,13 @@ cdef void async_callback(shared_ptr[CRayObject] obj,

def del_key_from_storage(host, port, password, use_ssl, key):
return RedisDelKeySync(host, port, password, use_ssl, key)


def get_key_from_storage(host, port, password, use_ssl, config, key):
vitsai marked this conversation as resolved.
Show resolved Hide resolved
cdef:
c_string data
result = RedisGetKeySync(host, port, password, use_ssl, config, key, &data)
if result:
return data
else:
return None
59 changes: 59 additions & 0 deletions python/ray/includes/global_state_accessor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,65 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
const c_string &node_ip_address,
c_string *node_to_connect)

cdef extern from * namespace "ray::gcs" nogil:
"""
#include <thread>
#include "ray/gcs/gcs_server/store_client_kv.h"
namespace ray {
namespace gcs {

bool RedisGetKeySync(const std::string& host,
int32_t port,
const std::string& password,
bool use_ssl,
const std::string& config,
const std::string& key,
std::string* data) {
RedisClientOptions options(host, port, password, false, use_ssl);

std::string config_list;
RAY_CHECK(absl::Base64Unescape(config, &config_list));
RayConfig::instance().initialize(config_list);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm is this safe? What's going to happen if we call ray.init with the same config later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not thread-safe, but this part is single-threaded. Currently, each process (raylet, gcs, worker), calls it once, but if it is called multiple times serially, the values will just be reset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reset sounds like the right behavior,

can you add an unit test to set this config from redis level and set it differently from ray.init level and see if it resolves to ray.init config correctly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comment here this will be reset when a core worker initializes the config again

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Manually tested by setting the value before and after the new Redis call, and running the new test with Redis enabled. It resolves to the correct config. Maybe we can defer the unit test to after this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I fail to get it. Why do we need setup this? Is it about redis configs?


instrumented_io_context io_service;

auto redis_client = std::make_shared<RedisClient>(options);
auto status = redis_client->Connect(io_service);

if(!status.ok()) {
RAY_LOG(ERROR) << "Failed to connect to redis: " << status.ToString();
return false;
}
auto cli = std::make_unique<StoreClientInternalKV>(
std::make_unique<RedisStoreClient>(std::move(redis_client)));

bool ret_val = false;
cli->Get("session", key, [&](std::optional<std::string> result) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's happening if the CLI fails by other error? E.g., timeout or random Redis related issues? Is it just result contains no value? Would this contain error message in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will just not contain a value. Internally, on the C++ side, we do retry in redis_context.cc with exponential backoff, but the whole thing is bounded here by the io_context.run_for(duration). In terms of error propagation, the existing code isn't great about that, unfortunately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this means we cannot distinguish redis failure vs the first time cluster start?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the redis failed, GCS will crash too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I meant GET failure. But we workaround this by checking the session name if put failed (because override=False)

if (result.has_value()) {
*data = result.value();
ret_val = true;
} else {
RAY_LOG(ERROR) << "Failed to get " << key;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RAY_LOG(ERROR) << "Failed to get " << key;
RAY_LOG(ERROR) << "Failed to get a key, " << key << " from Redis storage.";

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, doesn't this mean it is printed every time you ray.init() first time? Can we just not log here and log in the node.py layer instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to log(info)

ret_val = false;
}
});
io_service.run_for(std::chrono::milliseconds(1000));
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved

return ret_val;
}

}
}
"""
c_bool RedisGetKeySync(const c_string& host,
c_int32_t port,
const c_string& password,
c_bool use_ssl,
const c_string& config,
const c_string& key,
c_string* data)


cdef extern from * namespace "ray::gcs" nogil:
"""
#include <thread>
Expand Down
30 changes: 30 additions & 0 deletions python/ray/tests/test_gcs_fault_tolerance.py
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,36 @@ def check_raylet_healthy():
sleep(1)


def test_session_name(ray_start_cluster):
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
# Kill GCS and check that raylets kill themselves when not backed by Redis,
# and stay alive when backed by Redis.
# Raylets should kill themselves due to cluster ID mismatch in the
# non-persisted case.
cluster = ray_start_cluster
cluster.add_node()
cluster.wait_for_nodes()

head_node = cluster.head_node
session_dir = head_node.get_session_dir_path()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CAn you also submit a driver and check if their session name is correct?

import ray
ray.init()
# Make sure this is correct
ray._private.worker._global_node.get_session_dir_path()

We need this

  1. Start the first head node. Submit a driver to a head node and get a session dir
  2. Start the worker node. Submit a driver to "worker node" and get a session dir.
  3. Kill head node. Restart head node.
  4. Submit a driver to a head node and get a session dir
  5. Submit a driver to a worker node and get a session dir.

This could also be done in a similar way as test_gcs_ha_e2e.py (it starts head / worker in docker containers, so it is easy to test it)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add the same suite of tests in this commit; 748ddf8?


gcs_server_process = head_node.all_processes["gcs_server"][0].process
gcs_server_pid = gcs_server_process.pid
cluster.remove_node(head_node, allow_graceful=False)
# Wait to prevent the gcs server process becoming zombie.
gcs_server_process.wait()
wait_for_pid_to_exit(gcs_server_pid, 1000)

# Add head node back
cluster.add_node()
head_node = cluster.head_node
new_session_dir = head_node.get_session_dir_path()

if not enable_external_redis():
assert session_dir != new_session_dir
else:
assert session_dir == new_session_dir
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add tests inside this commit 748ddf8#diff-8e02c2ad08f47c6f22d3a04682d0c3867bffe7cf5f9f2838e6f0f999e97952d5?

The one inside test_ray_init.py and test_gcs_ha_e2e_2.py. I think test_gcs_ha_e2e_2 should just work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests inside test_ray_init only work if redis is disabled. Right now for the tests, we either disable redis or enable redis for the duration of the entire test. In the latter case, we expect the session to be the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it is to verify session dir is not changed when redis is disabled!



@pytest.mark.parametrize(
"ray_start_regular_with_external_redis",
[
Expand Down
Loading