From 11bd816fc75dbdb8f87f3afde37a3c338aa0f61e Mon Sep 17 00:00:00 2001 From: vitsai Date: Fri, 1 Sep 2023 02:57:52 +0000 Subject: [PATCH 01/16] initial change Signed-off-by: vitsai --- BUILD.bazel | 1 + python/ray/_private/node.py | 29 ++++++++-- python/ray/_raylet.pyx | 12 ++++- python/ray/includes/global_state_accessor.pxd | 53 +++++++++++++++++++ 4 files changed, 90 insertions(+), 5 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index eb1c77e291c01..edfeb0e56d358 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -2531,6 +2531,7 @@ pyx_library( deps = [ "//:core_worker_lib", "//:global_state_accessor_lib", + "//:gcs_server_lib", "//:raylet_lib", "//:redis_client", "//:src/ray/ray_exported_symbols.lds", diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 288f071d06d35..1715f8e5d12e1 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -24,7 +24,7 @@ import ray._private.services import ray._private.utils from ray._private import storage -from ray._raylet import GcsClient +from ray._raylet import GcsClient, get_key_from_storage from ray._private.resource_spec import ResourceSpec from ray._private.utils import open_log, try_to_create_directory, try_to_symlink @@ -177,9 +177,30 @@ def __init__( # Register the temp dir. if head: - # date including microsecond - date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f") - self._session_name = f"session_{date_str}_{os.getpid()}" + maybe_key = None + if self._redis_address: + parts = self._redis_address.split("://", 1) + enable_redis_ssl = "false" + if len(parts) == 1: + redis_ip_address, redis_port = parts[0].rsplit(":", 1) + else: + if len(parts) != 2 or parts[0] not in ("redis", "rediss"): + raise ValueError(f"Invalid redis address {self._redis_address}") + redis_ip_address, redis_port = parts[1].rsplit(":", 1) + if parts[0] == "rediss": + enable_redis_ssl = "true" + maybe_key = get_key_from_storage( + self._redis_address, + self._ray_params.redis_password, + enable_redis_ssl, + b"session_name", + ) + if maybe_key is None: + # date including microsecond + date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f") + self._session_name = f"session_{date_str}_{os.getpid()}" + else: + self._session_name = maybe_key else: if ray_params.session_name is None: assert not self._default_worker diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index eed3560bda782..3c9f98dfe3e28 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -156,7 +156,7 @@ from ray.includes.libcoreworker cimport ( from ray.includes.ray_config cimport RayConfig from ray.includes.global_state_accessor cimport CGlobalStateAccessor -from ray.includes.global_state_accessor cimport RedisDelKeySync +from ray.includes.global_state_accessor cimport RedisDelKeySync, RedisGetKeySync from ray.includes.optional cimport ( optional, nullopt ) @@ -4574,3 +4574,13 @@ cdef void async_callback(shared_ptr[CRayObject] obj, def del_key_from_storage(host, port, password, use_ssl, key): return RedisDelKeySync(host, port, password, use_ssl, key) + + +def get_key_from_storage(host, port, password, use_ssl, key): + cdef: + c_string data + result = RedisGetKeySync(host, port, password, use_ssl, key, &data) + if result: + return data + else: + return None diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index e9cd93e0e7a29..708d02e25b1b9 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -46,6 +46,59 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil: const c_string &node_ip_address, c_string *node_to_connect) +cdef extern from * namespace "ray::gcs" nogil: + """ + #include + #include "ray/gcs/gcs_server/store_client_kv.h" + namespace ray { + namespace gcs { + + bool RedisGetKeySync(const std::string& host, + int32_t port, + const std::string& password, + bool use_ssl, + const std::string& key, + std::string* data) { + RedisClientOptions options(host, port, password, false, use_ssl); + + instrumented_io_context io_service; + + auto redis_client = std::make_shared(options); + auto status = redis_client->Connect(io_service); + + if(!status.ok()) { + RAY_LOG(ERROR) << "Failed to connect to redis: " << status.ToString(); + return false; + } + auto cli = std::make_unique( + std::make_unique(std::move(redis_client))); + + bool ret_val = false; + cli->Get("session", key, [&](std::optional result) { + if (result.has_value()) { + *data = result.value(); + ret_val = true; + } else { + RAY_LOG(ERROR) << "Failed to get " << key; + ret_val = false; + } + }); + + io_service.run(); + return ret_val; + } + + } + } + """ + c_bool RedisGetKeySync(const c_string& host, + c_int32_t port, + const c_string& password, + c_bool use_ssl, + const c_string& key, + c_string* data) + + cdef extern from * namespace "ray::gcs" nogil: """ #include From bf509ded38fd53b47d21a8fc43a151c0ca9901ae Mon Sep 17 00:00:00 2001 From: vitsai Date: Fri, 1 Sep 2023 03:29:56 +0000 Subject: [PATCH 02/16] test Signed-off-by: vitsai --- python/ray/tests/test_gcs_fault_tolerance.py | 30 ++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index dea003d5631ef..c2227382285b9 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -868,6 +868,36 @@ def check_raylet_healthy(): sleep(1) +def test_session_name(ray_start_cluster): + # Kill GCS and check that raylets kill themselves when not backed by Redis, + # and stay alive when backed by Redis. + # Raylets should kill themselves due to cluster ID mismatch in the + # non-persisted case. + cluster = ray_start_cluster + cluster.add_node() + cluster.wait_for_nodes() + + head_node = cluster.head_node + session_dir = head_node.get_session_dir_path() + + gcs_server_process = head_node.all_processes["gcs_server"][0].process + gcs_server_pid = gcs_server_process.pid + cluster.remove_node(head_node, allow_graceful=False) + # Wait to prevent the gcs server process becoming zombie. + gcs_server_process.wait() + wait_for_pid_to_exit(gcs_server_pid, 1000) + + # Add head node back + cluster.add_node() + head_node = cluster.head_node + new_session_dir = head_node.get_session_dir_path() + + if not enable_external_redis(): + assert session_dir != new_session_dir + else: + assert session_dir == new_session_dir + + @pytest.mark.parametrize( "ray_start_regular_with_external_redis", [ From 3db4e18193f0d8ebca247668a93db18ba2c09208 Mon Sep 17 00:00:00 2001 From: vitsai Date: Fri, 1 Sep 2023 08:02:35 +0000 Subject: [PATCH 03/16] some fixes Signed-off-by: vitsai --- python/ray/_private/node.py | 35 +++++++++++-------- python/ray/_raylet.pyx | 5 +-- python/ray/includes/global_state_accessor.pxd | 13 ++++++- python/ray/tests/conftest.py | 1 + python/ray/tests/test_gcs_fault_tolerance.py | 1 + src/ray/gcs/redis_context.cc | 2 ++ .../gcs/store_client/redis_store_client.cc | 1 + 7 files changed, 41 insertions(+), 17 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 1715f8e5d12e1..90a7357d63aa7 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -26,6 +26,7 @@ from ray._private import storage from ray._raylet import GcsClient, get_key_from_storage from ray._private.resource_spec import ResourceSpec +from ray._private.services import serialize_config from ray._private.utils import open_log, try_to_create_directory, try_to_symlink # Logger for this module. It should be configured at the entry point @@ -178,9 +179,10 @@ def __init__( # Register the temp dir. if head: maybe_key = None - if self._redis_address: + if self._ray_params.external_addresses is not None: + self._redis_address = self._ray_params.external_addresses[0] parts = self._redis_address.split("://", 1) - enable_redis_ssl = "false" + enable_redis_ssl = False if len(parts) == 1: redis_ip_address, redis_port = parts[0].rsplit(":", 1) else: @@ -188,19 +190,22 @@ def __init__( raise ValueError(f"Invalid redis address {self._redis_address}") redis_ip_address, redis_port = parts[1].rsplit(":", 1) if parts[0] == "rediss": - enable_redis_ssl = "true" - maybe_key = get_key_from_storage( - self._redis_address, - self._ray_params.redis_password, - enable_redis_ssl, - b"session_name", - ) + enable_redis_ssl = True + maybe_key = get_key_from_storage( + redis_ip_address, + int(redis_port), + self._ray_params.redis_password, + enable_redis_ssl, + serialize_config(self._config), + b"session_name", + ) + if maybe_key is None: # date including microsecond date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f") self._session_name = f"session_{date_str}_{os.getpid()}" else: - self._session_name = maybe_key + self._session_name = ray._private.utils.decode(maybe_key) else: if ray_params.session_name is None: assert not self._default_worker @@ -1202,6 +1207,12 @@ def _write_cluster_info_to_kv(self): True, ray_constants.KV_NAMESPACE_SESSION, ) + v = self.get_gcs_client().internal_kv_get( + b"session_name", + ray_constants.KV_NAMESPACE_SESSION, + ) + print("vct gotten value ", v) + print("just put session name vct " + self._session_name) self.get_gcs_client().internal_kv_put( b"session_dir", self._session_dir.encode(), @@ -1236,13 +1247,9 @@ def start_head_processes(self): logger.debug( f"Process STDOUT and STDERR is being " f"redirected to {self._logs_dir}." ) - assert self._redis_address is None assert self._gcs_address is None assert self._gcs_client is None - if self._ray_params.external_addresses is not None: - self._redis_address = self._ray_params.external_addresses[0] - self.start_gcs_server() assert self.get_gcs_client() is not None self._write_cluster_info_to_kv() diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 3c9f98dfe3e28..734dd03ce6cac 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -4576,10 +4576,11 @@ def del_key_from_storage(host, port, password, use_ssl, key): return RedisDelKeySync(host, port, password, use_ssl, key) -def get_key_from_storage(host, port, password, use_ssl, key): +def get_key_from_storage(host, port, password, use_ssl, config, key): cdef: c_string data - result = RedisGetKeySync(host, port, password, use_ssl, key, &data) + result = RedisGetKeySync(host, port, password, use_ssl, config, key, &data) + print("result is " + str(result)) if result: return data else: diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index 708d02e25b1b9..b6e128dbcc069 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -57,11 +57,19 @@ cdef extern from * namespace "ray::gcs" nogil: int32_t port, const std::string& password, bool use_ssl, + const std::string& config, const std::string& key, std::string* data) { RedisClientOptions options(host, port, password, false, use_ssl); + std::string config_list; + RAY_CHECK(absl::Base64Unescape(config, &config_list)); + RayConfig::instance().initialize(config_list); + instrumented_io_context io_service; + RAY_UNUSED(std::async(std::launch::async, [&]() { + io_service.run(); + })); auto redis_client = std::make_shared(options); auto status = redis_client->Connect(io_service); @@ -76,15 +84,17 @@ cdef extern from * namespace "ray::gcs" nogil: bool ret_val = false; cli->Get("session", key, [&](std::optional result) { if (result.has_value()) { + RAY_LOG(INFO) << "vct E"; *data = result.value(); ret_val = true; } else { + *data = "gggg"; RAY_LOG(ERROR) << "Failed to get " << key; ret_val = false; } }); + RAY_LOG(INFO) << "vct F"; - io_service.run(); return ret_val; } @@ -95,6 +105,7 @@ cdef extern from * namespace "ray::gcs" nogil: c_int32_t port, const c_string& password, c_bool use_ssl, + const c_string& config, const c_string& key, c_string* data) diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 5d8293b1b6f57..b18a71a9b8e1d 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -290,6 +290,7 @@ def _setup_redis(request): import uuid ns = str(uuid.uuid4()) + print("vct @@@@ ", ns) old_ns = os.environ.get("RAY_external_storage_namespace") os.environ["RAY_external_storage_namespace"] = ns diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index c2227382285b9..e4edb7151341b 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -869,6 +869,7 @@ def check_raylet_healthy(): def test_session_name(ray_start_cluster): + print("are we using redis " + str(enable_external_redis())) # Kill GCS and check that raylets kill themselves when not backed by Redis, # and stay alive when backed by Redis. # Raylets should kill themselves due to cluster ID mismatch in the diff --git a/src/ray/gcs/redis_context.cc b/src/ray/gcs/redis_context.cc index 35a65dce32d4e..90c6912ed4c99 100644 --- a/src/ray/gcs/redis_context.cc +++ b/src/ray/gcs/redis_context.cc @@ -184,6 +184,8 @@ void RedisRequestContext::Run() { +[](struct redisAsyncContext *async_context, void *raw_reply, void *privdata) { auto *request_cxt = (RedisRequestContext *)privdata; auto redis_reply = reinterpret_cast(raw_reply); + RAY_LOG(ERROR) << "Redis request [" + << absl::StrJoin(request_cxt->redis_cmds_, " ") << "] completed."; // Error happened. if (redis_reply == nullptr || redis_reply->type == REDIS_REPLY_ERROR) { auto error_msg = redis_reply ? redis_reply->str : async_context->errstr; diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index a20aa744de96f..15f5f4872302a 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -138,6 +138,7 @@ void RedisStoreClient::MGetValues(const std::string &table_name, RedisStoreClient::RedisStoreClient(std::shared_ptr redis_client) : external_storage_namespace_(::RayConfig::instance().external_storage_namespace()), redis_client_(std::move(redis_client)) { + RAY_LOG(INFO) << "vct ext namespace " << external_storage_namespace_; RAY_CHECK(!absl::StrContains(external_storage_namespace_, kClusterSeparator)) << "Storage namespace (" << external_storage_namespace_ << ") shouldn't contain " << kClusterSeparator << "."; From f4537f81d03b9b202fe87374e09dd4f6d4790b86 Mon Sep 17 00:00:00 2001 From: vitsai Date: Fri, 1 Sep 2023 08:28:40 +0000 Subject: [PATCH 04/16] more fixes Signed-off-by: vitsai --- python/ray/_private/node.py | 6 ------ python/ray/_raylet.pyx | 1 - python/ray/includes/global_state_accessor.pxd | 7 +------ python/ray/tests/conftest.py | 1 - python/ray/tests/test_gcs_fault_tolerance.py | 1 - src/ray/gcs/redis_context.cc | 2 -- src/ray/gcs/store_client/redis_store_client.cc | 1 - 7 files changed, 1 insertion(+), 18 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 90a7357d63aa7..c3d17898cb729 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -1207,12 +1207,6 @@ def _write_cluster_info_to_kv(self): True, ray_constants.KV_NAMESPACE_SESSION, ) - v = self.get_gcs_client().internal_kv_get( - b"session_name", - ray_constants.KV_NAMESPACE_SESSION, - ) - print("vct gotten value ", v) - print("just put session name vct " + self._session_name) self.get_gcs_client().internal_kv_put( b"session_dir", self._session_dir.encode(), diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 734dd03ce6cac..cafc136fcfbcd 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -4580,7 +4580,6 @@ def get_key_from_storage(host, port, password, use_ssl, config, key): cdef: c_string data result = RedisGetKeySync(host, port, password, use_ssl, config, key, &data) - print("result is " + str(result)) if result: return data else: diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index b6e128dbcc069..3d70e73d268a1 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -67,9 +67,6 @@ cdef extern from * namespace "ray::gcs" nogil: RayConfig::instance().initialize(config_list); instrumented_io_context io_service; - RAY_UNUSED(std::async(std::launch::async, [&]() { - io_service.run(); - })); auto redis_client = std::make_shared(options); auto status = redis_client->Connect(io_service); @@ -84,16 +81,14 @@ cdef extern from * namespace "ray::gcs" nogil: bool ret_val = false; cli->Get("session", key, [&](std::optional result) { if (result.has_value()) { - RAY_LOG(INFO) << "vct E"; *data = result.value(); ret_val = true; } else { - *data = "gggg"; RAY_LOG(ERROR) << "Failed to get " << key; ret_val = false; } }); - RAY_LOG(INFO) << "vct F"; + io_service.run_for(std::chrono::milliseconds(1000)); return ret_val; } diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index b18a71a9b8e1d..5d8293b1b6f57 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -290,7 +290,6 @@ def _setup_redis(request): import uuid ns = str(uuid.uuid4()) - print("vct @@@@ ", ns) old_ns = os.environ.get("RAY_external_storage_namespace") os.environ["RAY_external_storage_namespace"] = ns diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index e4edb7151341b..c2227382285b9 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -869,7 +869,6 @@ def check_raylet_healthy(): def test_session_name(ray_start_cluster): - print("are we using redis " + str(enable_external_redis())) # Kill GCS and check that raylets kill themselves when not backed by Redis, # and stay alive when backed by Redis. # Raylets should kill themselves due to cluster ID mismatch in the diff --git a/src/ray/gcs/redis_context.cc b/src/ray/gcs/redis_context.cc index 90c6912ed4c99..35a65dce32d4e 100644 --- a/src/ray/gcs/redis_context.cc +++ b/src/ray/gcs/redis_context.cc @@ -184,8 +184,6 @@ void RedisRequestContext::Run() { +[](struct redisAsyncContext *async_context, void *raw_reply, void *privdata) { auto *request_cxt = (RedisRequestContext *)privdata; auto redis_reply = reinterpret_cast(raw_reply); - RAY_LOG(ERROR) << "Redis request [" - << absl::StrJoin(request_cxt->redis_cmds_, " ") << "] completed."; // Error happened. if (redis_reply == nullptr || redis_reply->type == REDIS_REPLY_ERROR) { auto error_msg = redis_reply ? redis_reply->str : async_context->errstr; diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index 15f5f4872302a..a20aa744de96f 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -138,7 +138,6 @@ void RedisStoreClient::MGetValues(const std::string &table_name, RedisStoreClient::RedisStoreClient(std::shared_ptr redis_client) : external_storage_namespace_(::RayConfig::instance().external_storage_namespace()), redis_client_(std::move(redis_client)) { - RAY_LOG(INFO) << "vct ext namespace " << external_storage_namespace_; RAY_CHECK(!absl::StrContains(external_storage_namespace_, kClusterSeparator)) << "Storage namespace (" << external_storage_namespace_ << ") shouldn't contain " << kClusterSeparator << "."; From 5fc76abc00cc5fad36a9c9e4c94ac1add3f0d05a Mon Sep 17 00:00:00 2001 From: vitsai Date: Fri, 1 Sep 2023 11:18:31 +0000 Subject: [PATCH 05/16] comments Signed-off-by: vitsai --- python/ray/_private/node.py | 56 ++++++++++++++++++++++--------------- python/ray/_raylet.pyx | 14 +++++++++- 2 files changed, 46 insertions(+), 24 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index c3d17898cb729..6e0a5abb37ae6 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -24,7 +24,7 @@ import ray._private.services import ray._private.utils from ray._private import storage -from ray._raylet import GcsClient, get_key_from_storage +from ray._raylet import GcsClient, get_session_key_from_storage from ray._private.resource_spec import ResourceSpec from ray._private.services import serialize_config from ray._private.utils import open_log, try_to_create_directory, try_to_symlink @@ -178,28 +178,7 @@ def __init__( # Register the temp dir. if head: - maybe_key = None - if self._ray_params.external_addresses is not None: - self._redis_address = self._ray_params.external_addresses[0] - parts = self._redis_address.split("://", 1) - enable_redis_ssl = False - if len(parts) == 1: - redis_ip_address, redis_port = parts[0].rsplit(":", 1) - else: - if len(parts) != 2 or parts[0] not in ("redis", "rediss"): - raise ValueError(f"Invalid redis address {self._redis_address}") - redis_ip_address, redis_port = parts[1].rsplit(":", 1) - if parts[0] == "rediss": - enable_redis_ssl = True - maybe_key = get_key_from_storage( - redis_ip_address, - int(redis_port), - self._ray_params.redis_password, - enable_redis_ssl, - serialize_config(self._config), - b"session_name", - ) - + maybe_key = self.check_persisted_session_name() if maybe_key is None: # date including microsecond date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f") @@ -343,6 +322,37 @@ def __init__( self.validate_ip_port(self.gcs_address) self._record_stats() + def check_persisted_session_name(self): + if self._ray_params.external_addresses is None: + return None + self._redis_address = self._ray_params.external_addresses[0] + # Address is ip:port or redis://ip:port + parts = self._redis_address.split("://", 1) + enable_redis_ssl = False + if len(parts) == 1: + redis_ip_address, redis_port = parts[0].rsplit(":", 1) + else: + if len(parts) != 2 or parts[0] not in ("redis", "rediss"): + raise ValueError( + f"Invalid redis address {self._redis_address}." + "Expected format is ip:port or redis://ip:port, " + "or rediss://ip:port for SSL." + ) + redis_ip_address, redis_port = parts[1].rsplit(":", 1) + if parts[0] == "rediss": + enable_redis_ssl = True + if int(redis_port) < 0: + raise ValueError(f"Invalid port: {redis_port}") + + return get_session_key_from_storage( + redis_ip_address, + int(redis_port), + self._ray_params.redis_password, + enable_redis_ssl, + serialize_config(self._config), + b"session_name", + ) + @staticmethod def validate_ip_port(ip_port): """Validates the address is in the ip:port format""" diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index cafc136fcfbcd..d0d64578021d4 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -4576,7 +4576,19 @@ def del_key_from_storage(host, port, password, use_ssl, key): return RedisDelKeySync(host, port, password, use_ssl, key) -def get_key_from_storage(host, port, password, use_ssl, config, key): +def get_session_key_from_storage(host, port, password, use_ssl, config, key): + """ + Get the session key from the storage. + Intended to be used for session_name only. + Args: + host: The address of the owner (caller) of the + generator task. + port: The task ID of the generator task. + password: The redis password. + use_ssl: Whether to use SSL. + config: The Ray config. Used to get storage namespace. + key: The key to retrieve. + """ cdef: c_string data result = RedisGetKeySync(host, port, password, use_ssl, config, key, &data) From 037aabd4e6967630a46dbf9189e52d0677353a91 Mon Sep 17 00:00:00 2001 From: vitsai Date: Fri, 1 Sep 2023 23:29:05 +0000 Subject: [PATCH 06/16] comments Signed-off-by: vitsai --- python/ray/_private/node.py | 8 ++++++- python/ray/includes/global_state_accessor.pxd | 5 +++- python/ray/tests/test_advanced_9.py | 5 ++-- .../test/gcs_client_reconnection_test.cc | 24 +++++++++++++++++++ 4 files changed, 37 insertions(+), 5 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 6e0a5abb37ae6..346bf81e71751 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -178,6 +178,8 @@ def __init__( # Register the temp dir. if head: + # We expect this the first time we initialize a cluster, but not during + # subsequent restarts of the head node. maybe_key = self.check_persisted_session_name() if maybe_key is None: # date including microsecond @@ -332,6 +334,7 @@ def check_persisted_session_name(self): if len(parts) == 1: redis_ip_address, redis_port = parts[0].rsplit(":", 1) else: + # rediss for SSL if len(parts) != 2 or parts[0] not in ("redis", "rediss"): raise ValueError( f"Invalid redis address {self._redis_address}." @@ -342,7 +345,10 @@ def check_persisted_session_name(self): if parts[0] == "rediss": enable_redis_ssl = True if int(redis_port) < 0: - raise ValueError(f"Invalid port: {redis_port}") + raise ValueError( + f"Invalid Redis port provided: {redis_port}." + "The port must be a non-negative integer." + ) return get_session_key_from_storage( redis_ip_address, diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index 3d70e73d268a1..0f4e696a2752b 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -65,6 +65,7 @@ cdef extern from * namespace "ray::gcs" nogil: std::string config_list; RAY_CHECK(absl::Base64Unescape(config, &config_list)); RayConfig::instance().initialize(config_list); + RayConfig::instance().redis instrumented_io_context io_service; @@ -75,6 +76,7 @@ cdef extern from * namespace "ray::gcs" nogil: RAY_LOG(ERROR) << "Failed to connect to redis: " << status.ToString(); return false; } + auto cli = std::make_unique( std::make_unique(std::move(redis_client))); @@ -84,7 +86,8 @@ cdef extern from * namespace "ray::gcs" nogil: *data = result.value(); ret_val = true; } else { - RAY_LOG(ERROR) << "Failed to get " << key; + RAY_LOG(INFO) << "Failed to retrieve the key " << key + << "from persistent storage."; ret_val = false; } }); diff --git a/python/ray/tests/test_advanced_9.py b/python/ray/tests/test_advanced_9.py index a4ba35d1756d9..5fc7a53b94116 100644 --- a/python/ray/tests/test_advanced_9.py +++ b/python/ray/tests/test_advanced_9.py @@ -363,9 +363,8 @@ def test_redis_not_available(monkeypatch, call_ray_stop_only): capture_output=True, ) assert "Could not establish connection to Redis" in p.stderr.decode() - assert "Please check" in p.stderr.decode() - assert "gcs_server.out for details" in p.stderr.decode() - assert "RuntimeError: Failed to start GCS" in p.stderr.decode() + assert "Please check " in p.stderr.decode() + assert "redis storage is alive or not." in p.stderr.decode() @pytest.mark.skipif(not enable_external_redis(), reason="Only valid in redis env") diff --git a/src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc b/src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc index 5b70732392e26..adf4b2b1ded46 100644 --- a/src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc +++ b/src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc @@ -211,6 +211,30 @@ TEST_F(GcsClientReconnectionTest, ReconnectionBasic) { ASSERT_EQ(f1.get(), "B"); } +TEST_F(GcsClientReconnectionTest, TestRayConfig) { + RayConfig::instance().initialize( + R"( +{ + "gcs_rpc_server_reconnect_timeout_s": 60, + "gcs_storage": "redis", + "gcs_grpc_initial_reconnect_backoff_ms": 2000, + "gcs_grpc_max_reconnect_backoff_ms": 2000 +} + )"); + StartGCS(); + RayConfig::instance().initialize( + R"( +{ + "gcs_rpc_server_reconnect_timeout_s": 60, + "gcs_storage": "redis", + "gcs_grpc_initial_reconnect_backoff_ms": 2000, + "gcs_grpc_max_reconnect_backoff_ms": 3000 +} + )"); + + ASSERT_EQ(RayConfig::instance().gcs_grpc_max_reconnect_backoff_ms, 3000); +} + TEST_F(GcsClientReconnectionTest, ReconnectionBackoff) { // This test is to ensure that during reconnection, we got the right status // of the channel and also very basic test to verify gRPC's backoff is working. From ba58ad03eee676991c70156564416657fd6a640d Mon Sep 17 00:00:00 2001 From: vitsai Date: Fri, 1 Sep 2023 23:54:23 +0000 Subject: [PATCH 07/16] more comments Signed-off-by: vitsai --- python/ray/includes/global_state_accessor.pxd | 1 - python/ray/tests/BUILD | 1 + python/ray/tests/test_gcs_ha_e2e.py | 37 ++++++++++++------- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index 0f4e696a2752b..9a616a41b39b4 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -65,7 +65,6 @@ cdef extern from * namespace "ray::gcs" nogil: std::string config_list; RAY_CHECK(absl::Base64Unescape(config, &config_list)); RayConfig::instance().initialize(config_list); - RayConfig::instance().redis instrumented_io_context io_service; diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index bf6663440d959..6239f78d02406 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -225,6 +225,7 @@ py_test_module_list( py_test_module_list( files = [ "test_gcs_ha_e2e.py", + "test_gcs_ha_e2e_2.py", "test_memory_pressure.py", "test_node_labels.py", ], diff --git a/python/ray/tests/test_gcs_ha_e2e.py b/python/ray/tests/test_gcs_ha_e2e.py index e9e64599c15f0..380f527be1719 100644 --- a/python/ray/tests/test_gcs_ha_e2e.py +++ b/python/ray/tests/test_gcs_ha_e2e.py @@ -6,35 +6,44 @@ @pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") -def test_ray_nodes_liveness(docker_cluster): +def test_ray_session_name_preserved(docker_cluster): get_nodes_script = """ import ray ray.init("auto") -print(sum([1 if n["Alive"] else 0 for n in ray.nodes()])) +print(ray._private.worker._global_node.session_name) """ head, worker = docker_cluster - def check_alive(n): - output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'") + def get_session_name(to_head=True): + if to_head: + output = head.exec_run(cmd=f"python -c '{get_nodes_script}'") + else: + output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'") + session_name = output.output.decode().strip().split("\n")[-1] + print("Output: ", output.output.decode().strip().split("\n")) assert output.exit_code == 0 - text = output.output.decode().strip().split("\n")[-1] - print("Alive nodes: ", text) - return n == int(text) + return session_name # Make sure two nodes are alive - wait_for_condition(check_alive, n=2) + wait_for_condition(get_session_name, to_head=True) + session_name_head = get_session_name(to_head=True) + wait_for_condition(get_session_name, to_head=False) + session_name_worker = get_session_name(to_head=False) + assert session_name_head == session_name_worker print("head killed") head.kill() sleep(2) head.restart() - # When GCS restarts, a new raylet is added - # and the old dead raylet is going to take a while to be marked dead. - # So there should be 3 alive nodes - wait_for_condition(check_alive, timeout=10, n=3) - # Later, GCS detect the old raylet dead and the alive nodes will be 2 - wait_for_condition(check_alive, timeout=30, n=2) + + wait_for_condition(get_session_name, to_head=True) + session_name_head_after_restart = get_session_name(to_head=True) + wait_for_condition(get_session_name, to_head=False) + session_name_worker_after_restart = get_session_name(to_head=False) + assert session_name_worker_after_restart == session_name_head_after_restart + assert session_name_head == session_name_head_after_restart + assert session_name_worker_after_restart == session_name_worker if __name__ == "__main__": From 41f98e504c9f8b1631cb2808825c322ca7257246 Mon Sep 17 00:00:00 2001 From: vitsai Date: Sat, 2 Sep 2023 00:09:39 +0000 Subject: [PATCH 08/16] test file revert Signed-off-by: vitsai --- ...est_gcs_ha_e2e.py => test_gcs_ha_e2e_2.py} | 0 python/ray/tests/test_ray_init.py | 1 - test_gcs_ha_e2e.py | 46 +++++++++++++++++++ 3 files changed, 46 insertions(+), 1 deletion(-) rename python/ray/tests/{test_gcs_ha_e2e.py => test_gcs_ha_e2e_2.py} (100%) create mode 100644 test_gcs_ha_e2e.py diff --git a/python/ray/tests/test_gcs_ha_e2e.py b/python/ray/tests/test_gcs_ha_e2e_2.py similarity index 100% rename from python/ray/tests/test_gcs_ha_e2e.py rename to python/ray/tests/test_gcs_ha_e2e_2.py diff --git a/python/ray/tests/test_ray_init.py b/python/ray/tests/test_ray_init.py index 38e351a8a025b..a195eb623b4da 100644 --- a/python/ray/tests/test_ray_init.py +++ b/python/ray/tests/test_ray_init.py @@ -233,7 +233,6 @@ def test_ray_init_using_hostname(ray_start_cluster): assert len(node_table) == 1 assert node_table[0].get("NodeManagerHostname", "") == hostname - if __name__ == "__main__": import sys diff --git a/test_gcs_ha_e2e.py b/test_gcs_ha_e2e.py new file mode 100644 index 0000000000000..63c28cb50d19c --- /dev/null +++ b/test_gcs_ha_e2e.py @@ -0,0 +1,46 @@ +import pytest +import sys +from time import sleep +from ray._private.test_utils import wait_for_condition +from ray.tests.conftest_docker import * # noqa + + +@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") +def test_ray_nodes_liveness(docker_cluster): + get_nodes_script = """ +import ray +ray.init("auto") +print(sum([1 if n["Alive"] else 0 for n in ray.nodes()])) +""" + head, worker = docker_cluster + + def check_alive(n): + output = worker.exec_run(cmd=f"python -c '{get_nodes_script}'") + assert output.exit_code == 0 + text = output.output.decode().strip().split("\n")[-1] + print("Alive nodes: ", text) + return n == int(text) + + # Make sure two nodes are alive + wait_for_condition(check_alive, n=2) + print("head killed") + head.kill() + + sleep(2) + + head.restart() + # When GCS restarts, a new raylet is added + # and the old dead raylet is going to take a while to be marked dead. + # So there should be 3 alive nodes + wait_for_condition(check_alive, timeout=10, n=3) + # Later, GCS detect the old raylet dead and the alive nodes will be 2 + wait_for_condition(check_alive, timeout=30, n=2) + + +if __name__ == "__main__": + import os + + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) \ No newline at end of file From 7c4613efc90d09056294395899f2d0a2e8799ffa Mon Sep 17 00:00:00 2001 From: vitsai Date: Sat, 2 Sep 2023 00:10:12 +0000 Subject: [PATCH 09/16] lint Signed-off-by: vitsai --- python/ray/tests/test_ray_init.py | 1 + test_gcs_ha_e2e.py | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/python/ray/tests/test_ray_init.py b/python/ray/tests/test_ray_init.py index a195eb623b4da..38e351a8a025b 100644 --- a/python/ray/tests/test_ray_init.py +++ b/python/ray/tests/test_ray_init.py @@ -233,6 +233,7 @@ def test_ray_init_using_hostname(ray_start_cluster): assert len(node_table) == 1 assert node_table[0].get("NodeManagerHostname", "") == hostname + if __name__ == "__main__": import sys diff --git a/test_gcs_ha_e2e.py b/test_gcs_ha_e2e.py index 63c28cb50d19c..3a83afaa0c6b5 100644 --- a/test_gcs_ha_e2e.py +++ b/test_gcs_ha_e2e.py @@ -1,6 +1,8 @@ -import pytest import sys from time import sleep + +import pytest + from ray._private.test_utils import wait_for_condition from ray.tests.conftest_docker import * # noqa @@ -43,4 +45,4 @@ def check_alive(n): if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) else: - sys.exit(pytest.main(["-sv", __file__])) \ No newline at end of file + sys.exit(pytest.main(["-sv", __file__])) From b324a738a8d546db33b42ee64b68b221b15987f2 Mon Sep 17 00:00:00 2001 From: vitsai Date: Sat, 2 Sep 2023 01:25:59 +0000 Subject: [PATCH 10/16] comments Signed-off-by: vitsai --- python/ray/_private/node.py | 13 +++++++- python/ray/tests/test_ray_init.py | 30 ++++++++++++++++++- .../test/gcs_client_reconnection_test.cc | 24 --------------- 3 files changed, 41 insertions(+), 26 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 346bf81e71751..be1b1848f4ede 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -350,7 +350,11 @@ def check_persisted_session_name(self): "The port must be a non-negative integer." ) - return get_session_key_from_storage( + # Redis context spews logs + old_debug_level = os.environ.get("RAY_BACKEND_DEBUG_LEVEL") + os.environ["RAY_BACKEND_DEBUG_LEVEL"] = "warning" + + ret_val = get_session_key_from_storage( redis_ip_address, int(redis_port), self._ray_params.redis_password, @@ -359,6 +363,13 @@ def check_persisted_session_name(self): b"session_name", ) + if old_debug_level is None: + os.environ.pop("RAY_BACKEND_DEBUG_LEVEL", None) + else: + os.environ["RAY_BACKEND_DEBUG_LEVEL"] = old_debug_level + + return ret_val + @staticmethod def validate_ip_port(ip_port): """Validates the address is in the ip:port format""" diff --git a/python/ray/tests/test_ray_init.py b/python/ray/tests/test_ray_init.py index 38e351a8a025b..1e139d7490211 100644 --- a/python/ray/tests/test_ray_init.py +++ b/python/ray/tests/test_ray_init.py @@ -13,7 +13,7 @@ from ray.util.client.common import ClientObjectRef from ray.util.client.ray_client_helpers import ray_start_client_server from ray.util.client.worker import Worker -from ray._private.test_utils import wait_for_condition +from ray._private.test_utils import wait_for_condition, enable_external_redis @pytest.mark.skipif( @@ -234,6 +234,34 @@ def test_ray_init_using_hostname(ray_start_cluster): assert node_table[0].get("NodeManagerHostname", "") == hostname +def test_new_ray_instance_new_session_dir(shutdown_only): + ray.init() + session_dir = ray._private.worker._global_node.get_session_dir_path() + ray.shutdown() + ray.init() + if enable_external_redis(): + assert ray._private.worker._global_node.get_session_dir_path() == session_dir + else: + assert ray._private.worker._global_node.get_session_dir_path() != session_dir + + +def test_new_cluster_new_session_dir(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node() + ray.init(address=cluster.address) + session_dir = ray._private.worker._global_node.get_session_dir_path() + ray.shutdown() + cluster.shutdown() + cluster.add_node() + ray.init(address=cluster.address) + if enable_external_redis(): + assert ray._private.worker._global_node.get_session_dir_path() == session_dir + else: + assert ray._private.worker._global_node.get_session_dir_path() != session_dir + ray.shutdown() + cluster.shutdown() + + if __name__ == "__main__": import sys diff --git a/src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc b/src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc index adf4b2b1ded46..5b70732392e26 100644 --- a/src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc +++ b/src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc @@ -211,30 +211,6 @@ TEST_F(GcsClientReconnectionTest, ReconnectionBasic) { ASSERT_EQ(f1.get(), "B"); } -TEST_F(GcsClientReconnectionTest, TestRayConfig) { - RayConfig::instance().initialize( - R"( -{ - "gcs_rpc_server_reconnect_timeout_s": 60, - "gcs_storage": "redis", - "gcs_grpc_initial_reconnect_backoff_ms": 2000, - "gcs_grpc_max_reconnect_backoff_ms": 2000 -} - )"); - StartGCS(); - RayConfig::instance().initialize( - R"( -{ - "gcs_rpc_server_reconnect_timeout_s": 60, - "gcs_storage": "redis", - "gcs_grpc_initial_reconnect_backoff_ms": 2000, - "gcs_grpc_max_reconnect_backoff_ms": 3000 -} - )"); - - ASSERT_EQ(RayConfig::instance().gcs_grpc_max_reconnect_backoff_ms, 3000); -} - TEST_F(GcsClientReconnectionTest, ReconnectionBackoff) { // This test is to ensure that during reconnection, we got the right status // of the channel and also very basic test to verify gRPC's backoff is working. From 2ab8069cdbe4792c7826fdb321f02da54b54244a Mon Sep 17 00:00:00 2001 From: vitsai Date: Sat, 2 Sep 2023 01:37:12 +0000 Subject: [PATCH 11/16] forgot a couple files Signed-off-by: vitsai --- python/ray/_private/node.py | 14 ++++++++++++-- python/ray/includes/global_state_accessor.pxd | 2 +- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index be1b1848f4ede..e6fa6929de7d3 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -1228,12 +1228,22 @@ def _write_cluster_info_to_kv(self): ray_usage_lib.put_cluster_metadata(self.get_gcs_client()) # Make sure GCS is up. - self.get_gcs_client().internal_kv_put( + added = self.get_gcs_client().internal_kv_put( b"session_name", self._session_name.encode(), - True, + False, ray_constants.KV_NAMESPACE_SESSION, ) + if not added: + curr_val = self.get_gcs_client().internal_kv_get( + b"session_name", ray_constants.KV_NAMESPACE_SESSION + ) + assert curr_val != self._session_name, ( + f"Session name {self._session_name} does not match " + f"persisted value {curr_val}. Perhaps there was an " + f"error connecting to Redis." + ) + self.get_gcs_client().internal_kv_put( b"session_dir", self._session_dir.encode(), diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index 9a616a41b39b4..eeec02795aad5 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -86,7 +86,7 @@ cdef extern from * namespace "ray::gcs" nogil: ret_val = true; } else { RAY_LOG(INFO) << "Failed to retrieve the key " << key - << "from persistent storage."; + << " from persistent storage."; ret_val = false; } }); From b0e76b4ef5ac7383a22515da28d7c503ae9b0346 Mon Sep 17 00:00:00 2001 From: vitsai Date: Sat, 2 Sep 2023 02:58:42 +0000 Subject: [PATCH 12/16] clean up logging Signed-off-by: vitsai --- python/ray/_private/node.py | 13 +------------ python/ray/includes/global_state_accessor.pxd | 6 ++++++ .../ray/tests/test_gcs_ha_e2e.py | 0 python/ray/tests/test_output.py | 10 ++++++++++ 4 files changed, 17 insertions(+), 12 deletions(-) rename test_gcs_ha_e2e.py => python/ray/tests/test_gcs_ha_e2e.py (100%) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index e6fa6929de7d3..eb0a8044d9dff 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -350,11 +350,7 @@ def check_persisted_session_name(self): "The port must be a non-negative integer." ) - # Redis context spews logs - old_debug_level = os.environ.get("RAY_BACKEND_DEBUG_LEVEL") - os.environ["RAY_BACKEND_DEBUG_LEVEL"] = "warning" - - ret_val = get_session_key_from_storage( + return get_session_key_from_storage( redis_ip_address, int(redis_port), self._ray_params.redis_password, @@ -363,13 +359,6 @@ def check_persisted_session_name(self): b"session_name", ) - if old_debug_level is None: - os.environ.pop("RAY_BACKEND_DEBUG_LEVEL", None) - else: - os.environ["RAY_BACKEND_DEBUG_LEVEL"] = old_debug_level - - return ret_val - @staticmethod def validate_ip_port(ip_port): """Validates the address is in the ip:port format""" diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index eeec02795aad5..acd759f8964b6 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -60,6 +60,12 @@ cdef extern from * namespace "ray::gcs" nogil: const std::string& config, const std::string& key, std::string* data) { + InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog, + ray::RayLog::ShutDownRayLog, + "ray_init", + ray::RayLogLevel::WARNING, + "" /* log_dir */); + RedisClientOptions options(host, port, password, false, use_ssl); std::string config_list; diff --git a/test_gcs_ha_e2e.py b/python/ray/tests/test_gcs_ha_e2e.py similarity index 100% rename from test_gcs_ha_e2e.py rename to python/ray/tests/test_gcs_ha_e2e.py diff --git a/python/ray/tests/test_output.py b/python/ray/tests/test_output.py index 849ffe2a4cfeb..d539a35f14200 100644 --- a/python/ray/tests/test_output.py +++ b/python/ray/tests/test_output.py @@ -15,6 +15,16 @@ ) +def test_ray_init_no_redis_logs(): + script = """ +import ray + +ray.init() + """ + out_str = run_string_as_driver(script) + assert "redis" not in out_str, out_str + + def test_dedup_logs(): script = """ import ray From 3cf34dcba66d1259b8ce2152e8beefe09ba59c7c Mon Sep 17 00:00:00 2001 From: vitsai Date: Sat, 2 Sep 2023 04:28:42 +0000 Subject: [PATCH 13/16] comments Signed-off-by: vitsai --- python/ray/_private/node.py | 20 +++--------- python/ray/_private/services.py | 32 +++++++++++++------- python/ray/tests/conftest_docker.py | 12 ++++++-- python/ray/tests/test_gcs_fault_tolerance.py | 24 +++++++++++++++ python/ray/tests/test_output.py | 10 ------ 5 files changed, 59 insertions(+), 39 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index eb0a8044d9dff..13eb9f3dd6bf0 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -26,7 +26,7 @@ from ray._private import storage from ray._raylet import GcsClient, get_session_key_from_storage from ray._private.resource_spec import ResourceSpec -from ray._private.services import serialize_config +from ray._private.services import serialize_config, get_address from ray._private.utils import open_log, try_to_create_directory, try_to_symlink # Logger for this module. It should be configured at the entry point @@ -328,22 +328,10 @@ def check_persisted_session_name(self): if self._ray_params.external_addresses is None: return None self._redis_address = self._ray_params.external_addresses[0] + redis_ip_address, redis_port, enable_redis_ssl = get_address( + self._redis_address, + ) # Address is ip:port or redis://ip:port - parts = self._redis_address.split("://", 1) - enable_redis_ssl = False - if len(parts) == 1: - redis_ip_address, redis_port = parts[0].rsplit(":", 1) - else: - # rediss for SSL - if len(parts) != 2 or parts[0] not in ("redis", "rediss"): - raise ValueError( - f"Invalid redis address {self._redis_address}." - "Expected format is ip:port or redis://ip:port, " - "or rediss://ip:port for SSL." - ) - redis_ip_address, redis_port = parts[1].rsplit(":", 1) - if parts[0] == "rediss": - enable_redis_ssl = True if int(redis_port) < 0: raise ValueError( f"Invalid Redis port provided: {redis_port}." diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 7f7f882d1de89..29ad8e616bc56 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -1291,6 +1291,25 @@ def read_log(filename, lines_to_read): return None, None +def get_address(redis_address): + parts = redis_address.split("://", 1) + enable_redis_ssl = False + if len(parts) == 1: + redis_ip_address, redis_port = parts[0].rsplit(":", 1) + else: + # rediss for SSL + if len(parts) != 2 or parts[0] not in ("redis", "rediss"): + raise ValueError( + f"Invalid redis address {redis_address}." + "Expected format is ip:port or redis://ip:port, " + "or rediss://ip:port for SSL." + ) + redis_ip_address, redis_port = parts[1].rsplit(":", 1) + if parts[0] == "rediss": + enable_redis_ssl = True + return redis_ip_address, redis_port, enable_redis_ssl + + def start_gcs_server( redis_address: str, log_dir: str, @@ -1336,21 +1355,12 @@ def start_gcs_server( f"--session-name={session_name}", ] if redis_address: - parts = redis_address.split("://", 1) - enable_redis_ssl = "false" - if len(parts) == 1: - redis_ip_address, redis_port = parts[0].rsplit(":", 1) - else: - if len(parts) != 2 or parts[0] not in ("redis", "rediss"): - raise ValueError(f"Invalid redis address {redis_address}") - redis_ip_address, redis_port = parts[1].rsplit(":", 1) - if parts[0] == "rediss": - enable_redis_ssl = "true" + redis_ip_address, redis_port, enable_redis_ssl = get_address(redis_address) command += [ f"--redis_address={redis_ip_address}", f"--redis_port={redis_port}", - f"--redis_enable_ssl={enable_redis_ssl}", + f"--redis_enable_ssl={'true' if enable_redis_ssl else 'false'}", ] if redis_password: command += [f"--redis_password={redis_password}"] diff --git a/python/ray/tests/conftest_docker.py b/python/ray/tests/conftest_docker.py index 9120022dbe5b9..5dea63fcf9095 100644 --- a/python/ray/tests/conftest_docker.py +++ b/python/ray/tests/conftest_docker.py @@ -93,7 +93,11 @@ def print_logs(self): "9379", ], volumes={"{head_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, - environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"}, + environment={ + "RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379", + "RAY_raylet_client_num_connect_attempts": "10", + "RAY_raylet_client_connect_timeout_milliseconds": "1000", + }, wrapper_class=Container, ports={ "8000/tcp": None, @@ -118,7 +122,11 @@ def print_logs(self): "9379", ], volumes={"{worker_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, - environment={"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379"}, + environment={ + "RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379", + "RAY_raylet_client_num_connect_attempts": "10", + "RAY_raylet_client_connect_timeout_milliseconds": "1000", + }, wrapper_class=Container, ports={ "8000/tcp": None, diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index c2227382285b9..40f4d1565d6f8 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -947,6 +947,30 @@ def check_raylet_healthy(): wait_for_condition(lambda: not check_raylet_healthy()) +def test_redis_logs(external_redis): + try: + import subprocess + + process = subprocess.Popen( + ["ray", "start", "--head"], stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + stdout, stderr = process.communicate(timeout=30) + assert "redis_context.cc" not in stderr.decode() + assert "Resolve Redis address" not in stderr.decode() + # assert "redis_context.cc" not in result.output + finally: + from click.testing import CliRunner + import ray.scripts.scripts as scripts + + runner = CliRunner(env={"RAY_USAGE_STATS_PROMPT_ENABLED": "0"}) + runner.invoke( + scripts.stop, + [ + "--force", + ], + ) + + if __name__ == "__main__": import pytest diff --git a/python/ray/tests/test_output.py b/python/ray/tests/test_output.py index d539a35f14200..849ffe2a4cfeb 100644 --- a/python/ray/tests/test_output.py +++ b/python/ray/tests/test_output.py @@ -15,16 +15,6 @@ ) -def test_ray_init_no_redis_logs(): - script = """ -import ray - -ray.init() - """ - out_str = run_string_as_driver(script) - assert "redis" not in out_str, out_str - - def test_dedup_logs(): script = """ import ray From 1983a15b4f0fe95e047b35a3cb6e9c38ca2f8a6f Mon Sep 17 00:00:00 2001 From: vitsai Date: Sat, 2 Sep 2023 05:56:15 +0000 Subject: [PATCH 14/16] change 1000 to 100 Signed-off-by: vitsai --- python/ray/tests/conftest_docker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/tests/conftest_docker.py b/python/ray/tests/conftest_docker.py index 5dea63fcf9095..dbd18ed233932 100644 --- a/python/ray/tests/conftest_docker.py +++ b/python/ray/tests/conftest_docker.py @@ -96,7 +96,7 @@ def print_logs(self): environment={ "RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379", "RAY_raylet_client_num_connect_attempts": "10", - "RAY_raylet_client_connect_timeout_milliseconds": "1000", + "RAY_raylet_client_connect_timeout_milliseconds": "100", }, wrapper_class=Container, ports={ @@ -125,7 +125,7 @@ def print_logs(self): environment={ "RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379", "RAY_raylet_client_num_connect_attempts": "10", - "RAY_raylet_client_connect_timeout_milliseconds": "1000", + "RAY_raylet_client_connect_timeout_milliseconds": "100", }, wrapper_class=Container, ports={ From c1853c26de8fe75bbc1927f43f8626c6c0fafc6d Mon Sep 17 00:00:00 2001 From: vitsai Date: Sun, 3 Sep 2023 08:03:45 +0000 Subject: [PATCH 15/16] some fixes Signed-off-by: vitsai --- python/ray/includes/global_state_accessor.pxd | 12 ++++++++---- python/ray/tests/BUILD | 2 +- python/ray/tests/test_advanced_9.py | 2 -- python/ray/tests/test_tempfile.py | 2 +- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index acd759f8964b6..4237ccae085f1 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -75,10 +75,14 @@ cdef extern from * namespace "ray::gcs" nogil: instrumented_io_context io_service; auto redis_client = std::make_shared(options); - auto status = redis_client->Connect(io_service); - - if(!status.ok()) { - RAY_LOG(ERROR) << "Failed to connect to redis: " << status.ToString(); + try { + auto status = redis_client->Connect(io_service); + if(!status.ok()) { + RAY_LOG(ERROR) << "Failed to connect to redis: " << status.ToString(); + return false; + } + } catch (std::exception& e) { + RAY_LOG(ERROR) << "Failed to connect to redis: " << e.what(); return false; } diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 6239f78d02406..5959fe50e3eee 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -141,7 +141,6 @@ py_test_module_list( "test_multinode_failures_2.py", "test_node_manager.py", "test_object_assign_owner.py", - "test_placement_group.py", "test_placement_group_2.py", "test_placement_group_4.py", "test_placement_group_failover.py", @@ -319,6 +318,7 @@ py_test_module_list( "test_reference_counting_2.py", "test_exit_observability.py", "test_usage_stats.py", + "test_placement_group.py", "test_placement_group_3.py", "test_placement_group_5.py", "test_cancel.py", diff --git a/python/ray/tests/test_advanced_9.py b/python/ray/tests/test_advanced_9.py index 5fc7a53b94116..a37559d07185a 100644 --- a/python/ray/tests/test_advanced_9.py +++ b/python/ray/tests/test_advanced_9.py @@ -377,8 +377,6 @@ def test_redis_wrong_password(monkeypatch, external_redis, call_ray_stop_only): ) assert "RedisError: ERR AUTH called" in p.stderr.decode() - assert "Please check /tmp/ray/session" in p.stderr.decode() - assert "RuntimeError: Failed to start GCS" in p.stderr.decode() @pytest.mark.skipif(not enable_external_redis(), reason="Only valid in redis env") diff --git a/python/ray/tests/test_tempfile.py b/python/ray/tests/test_tempfile.py index 081df60044922..4bf838eefdabc 100644 --- a/python/ray/tests/test_tempfile.py +++ b/python/ray/tests/test_tempfile.py @@ -134,7 +134,7 @@ def check_all_log_file_exists(): assert sum(1 for filename in log_files if filename.startswith("worker")) == 4 socket_files = set(os.listdir(node.get_sockets_dir_path())) - assert socket_files == expected_socket_files + assert socket_files.issuperset(expected_socket_files) def test_tempdir_privilege(shutdown_only): From 5c2ea1d212b7721779eda5a465d9c68b63da296b Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 5 Sep 2023 16:14:59 +0900 Subject: [PATCH 16/16] Fix test advanced 9 --- python/ray/includes/global_state_accessor.pxd | 11 +++-------- python/ray/tests/test_advanced_9.py | 5 +++-- python/ray/tests/test_gcs_fault_tolerance.py | 4 ++++ 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index 4237ccae085f1..4c571f5d8390e 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -75,14 +75,9 @@ cdef extern from * namespace "ray::gcs" nogil: instrumented_io_context io_service; auto redis_client = std::make_shared(options); - try { - auto status = redis_client->Connect(io_service); - if(!status.ok()) { - RAY_LOG(ERROR) << "Failed to connect to redis: " << status.ToString(); - return false; - } - } catch (std::exception& e) { - RAY_LOG(ERROR) << "Failed to connect to redis: " << e.what(); + auto status = redis_client->Connect(io_service); + if(!status.ok()) { + RAY_LOG(ERROR) << "Failed to connect to redis: " << status.ToString(); return false; } diff --git a/python/ray/tests/test_advanced_9.py b/python/ray/tests/test_advanced_9.py index a37559d07185a..ff7986c329e3d 100644 --- a/python/ray/tests/test_advanced_9.py +++ b/python/ray/tests/test_advanced_9.py @@ -355,8 +355,9 @@ def check_demands(n): @pytest.mark.skipif(enable_external_redis(), reason="Only valid in non redis env") def test_redis_not_available(monkeypatch, call_ray_stop_only): - monkeypatch.setenv("RAY_NUM_REDIS_GET_RETRIES", "2") + monkeypatch.setenv("RAY_redis_db_connect_retries", "5") monkeypatch.setenv("RAY_REDIS_ADDRESS", "localhost:12345") + p = subprocess.run( "ray start --head", shell=True, @@ -369,7 +370,7 @@ def test_redis_not_available(monkeypatch, call_ray_stop_only): @pytest.mark.skipif(not enable_external_redis(), reason="Only valid in redis env") def test_redis_wrong_password(monkeypatch, external_redis, call_ray_stop_only): - monkeypatch.setenv("RAY_NUM_REDIS_GET_RETRIES", "2") + monkeypatch.setenv("RAY_redis_db_connect_retries", "5") p = subprocess.run( "ray start --head --redis-password=1234", shell=True, diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index 40f4d1565d6f8..1d3611c7cba93 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -955,8 +955,12 @@ def test_redis_logs(external_redis): ["ray", "start", "--head"], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = process.communicate(timeout=30) + print(stdout.decode()) + print(stderr.decode()) assert "redis_context.cc" not in stderr.decode() + assert "redis_context.cc" not in stdout.decode() assert "Resolve Redis address" not in stderr.decode() + assert "Resolve Redis address" not in stdout.decode() # assert "redis_context.cc" not in result.output finally: from click.testing import CliRunner