Skip to content

Commit

Permalink
[core] Reenable GCS test with redis as backend. (ray-project#23506)
Browse files Browse the repository at this point in the history
Since ray supports Redis as a storage backend, we should ensure the code path with Redis as storage is still being covered e2e.

The tests don't run for a while after we switch to memory mode by default. This PR tries to fix this and make it run with every commit.

In the future, if we support more and more storage backends, this should be revised to be more efficient and selective. But now I think the cost should be ok.

This PR is part of GCS HA testing-related work.
  • Loading branch information
fishbone committed May 20, 2022
1 parent 401db46 commit 8ec558d
Show file tree
Hide file tree
Showing 12 changed files with 96 additions and 114 deletions.
2 changes: 1 addition & 1 deletion .buildkite/pipeline.macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ prelude_commands: &prelude_commands |-

epilogue_commands: &epilogue_commands |-
# Cleanup runtime environment to save storage
rm -rf /tmp/ray
rm -rf /tmp/ray || true
# Cleanup local caches (this shouldn't clean up global disk cache)
bazel clean

Expand Down
30 changes: 30 additions & 0 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,36 @@
- bazel test --config=ci $(./ci/run/bazel_export_options)
--test_tag_filters=-kubernetes,medium_size_python_tests_k_to_z
python/ray/tests/...
- label: ":redis: (External Redis) (Small & Client)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- bazel test --config=ci $(./scripts/bazel_export_options)
--test_tag_filters=client_tests,small_size_python_tests
--test_env=TEST_EXTERNAL_REDIS=1
-- python/ray/tests/...
- label: ":redis: (External Redis) (Large)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
parallelism: 3
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TEST_EXTERNAL_REDIS=1 . ./ci/ci.sh test_large
- label: ":redis: (External Redis) (Medium A-J)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- bazel test --config=ci $(./scripts/bazel_export_options)
--test_tag_filters=-kubernetes,medium_size_python_tests_a_to_j
--test_env=TEST_EXTERNAL_REDIS=1
-- //python/ray/tests/...
- label: ":redis: (External Redis) (Medium K-Z)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- bazel test --config=ci $(./scripts/bazel_export_options)
--test_tag_filters=-kubernetes,medium_size_python_tests_k_to_z
--test_env=TEST_EXTERNAL_REDIS=1
-- //python/ray/tests/...
- label: ":python: Debug Test"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
Expand Down
1 change: 1 addition & 0 deletions bazel/python.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ load("@bazel_skylib//lib:paths.bzl", "paths")

# py_test_module_list creates a py_test target for each
# Python file in `files`

def py_test_module_list(files, size, deps, extra_srcs, name_suffix="", **kwargs):
for file in files:
# remove .py
Expand Down
6 changes: 6 additions & 0 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ def make_global_state_accessor(ray_context):
return global_state_accessor


def test_external_redis():
import os

return os.environ.get("TEST_EXTERNAL_REDIS") == "1"


def _pid_alive(pid):
"""Check if the process with this PID is alive or not.
Expand Down
1 change: 0 additions & 1 deletion python/ray/autoscaler/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ def env_integer(key, default):
["gcs_server", True],
["monitor.py", False],
["ray.util.client.server", False],
["redis-server", False],
["default_worker.py", False], # Python worker.
["setup_worker.py", False], # Python environment setup worker.
# For mac osx, setproctitle doesn't change the process name returned
Expand Down
12 changes: 0 additions & 12 deletions python/ray/scripts/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,19 +984,7 @@ def stop(force, grace_period):
proc, proc_cmd, proc_args = candidate
corpus = proc_cmd if filter_by_cmd else subprocess.list2cmdline(proc_args)
if keyword in corpus:
# This is a way to avoid killing redis server that's not started by Ray.
# We are using a simple hacky solution here since
# Redis server will anyway removed soon from the ray repository.
# This feature is only supported on MacOS/Linux temporarily until
# Redis is removed from Ray.
if (
keyword == "redis-server"
and sys.platform != "win32"
and "core/src/ray/thirdparty/redis/src/redis-server" not in corpus
):
continue
found.append(candidate)

for proc, proc_cmd, proc_args in found:
proc_string = str(subprocess.list2cmdline(proc_args))
try:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_cross_language.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from ray.serve.config import ReplicaConfig, DeploymentConfig
from ray.serve.utils import msgpack_serialize
from ray.serve.generated.serve_pb2 import JAVA, RequestMetadata, RequestWrapper
from ray.tests.conftest import shutdown_only # noqa: F401
from ray.tests.conftest import shutdown_only, maybe_external_redis # noqa: F401


def test_controller_starts_java_replica(shutdown_only): # noqa: F811
Expand Down
1 change: 1 addition & 0 deletions python/ray/serve/tests/test_standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
# Explicitly importing it here because it is a ray core tests utility (
# not in the tree)
from ray.tests.conftest import ray_start_with_dashboard # noqa: F401
from ray.tests.conftest import maybe_external_redis # noqa: F401


@pytest.fixture
Expand Down
68 changes: 46 additions & 22 deletions python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,11 @@
setup_tls,
teardown_tls,
get_and_run_node_killer,
test_external_redis,
)
from ray.cluster_utils import Cluster, AutoscalingCluster, cluster_not_supported


@pytest.fixture
def shutdown_only():
yield None
# The code after the yield will run as teardown code.
ray.shutdown()


def get_default_fixure_system_config():
system_config = {
"object_timeout_milliseconds": 200,
Expand All @@ -64,10 +58,11 @@ def get_default_fixture_ray_kwargs():
return ray_kwargs


@pytest.fixture
def external_redis(request, monkeypatch):
@contextmanager
def _setup_redis(request):
# Setup external Redis and env var for initialization.
param = getattr(request, "param", {})

external_redis_ports = param.get("external_redis_ports")
if external_redis_ports is None:
with socket.socket() as s:
Expand All @@ -88,12 +83,41 @@ def external_redis(request, monkeypatch):
processes.append(proc)
wait_for_redis_to_start("127.0.0.1", port, ray_constants.REDIS_DEFAULT_PASSWORD)
address_str = ",".join(map(lambda x: f"127.0.0.1:{x}", external_redis_ports))
monkeypatch.setenv("RAY_REDIS_ADDRESS", address_str)
yield None
import os

old_addr = os.environ.get("RAY_REDIS_ADDRESS")
os.environ["RAY_REDIS_ADDRESS"] = address_str
yield
if old_addr is not None:
os.environ["RAY_REDIS_ADDRESS"] = old_addr
else:
del os.environ["RAY_REDIS_ADDRESS"]
for proc in processes:
proc.process.terminate()


@pytest.fixture
def maybe_external_redis(request):
if test_external_redis():
with _setup_redis(request):
yield
else:
yield


@pytest.fixture
def external_redis(request):
with _setup_redis(request):
yield


@pytest.fixture
def shutdown_only(maybe_external_redis):
yield None
# The code after the yield will run as teardown code.
ray.shutdown()


@contextmanager
def _ray_start(**kwargs):
init_kwargs = get_default_fixture_ray_kwargs()
Expand All @@ -107,7 +131,7 @@ def _ray_start(**kwargs):


@pytest.fixture
def ray_start_with_dashboard(request):
def ray_start_with_dashboard(request, maybe_external_redis):
param = getattr(request, "param", {})
if param.get("num_cpus") is None:
param["num_cpus"] = 1
Expand All @@ -117,15 +141,15 @@ def ray_start_with_dashboard(request):

# The following fixture will start ray with 0 cpu.
@pytest.fixture
def ray_start_no_cpu(request):
def ray_start_no_cpu(request, maybe_external_redis):
param = getattr(request, "param", {})
with _ray_start(num_cpus=0, **param) as res:
yield res


# The following fixture will start ray with 1 cpu.
@pytest.fixture
def ray_start_regular(request):
def ray_start_regular(request, maybe_external_redis):
param = getattr(request, "param", {})
with _ray_start(**param) as res:
yield res
Expand Down Expand Up @@ -156,14 +180,14 @@ def ray_start_shared_local_modes(request):


@pytest.fixture
def ray_start_2_cpus(request):
def ray_start_2_cpus(request, maybe_external_redis):
param = getattr(request, "param", {})
with _ray_start(num_cpus=2, **param) as res:
yield res


@pytest.fixture
def ray_start_10_cpus(request):
def ray_start_10_cpus(request, maybe_external_redis):
param = getattr(request, "param", {})
with _ray_start(num_cpus=10, **param) as res:
yield res
Expand Down Expand Up @@ -206,29 +230,29 @@ def _ray_start_cluster(**kwargs):

# This fixture will start a cluster with empty nodes.
@pytest.fixture
def ray_start_cluster(request):
def ray_start_cluster(request, maybe_external_redis):
param = getattr(request, "param", {})
with _ray_start_cluster(**param) as res:
yield res


@pytest.fixture
def ray_start_cluster_enabled(request):
def ray_start_cluster_enabled(request, maybe_external_redis):
param = getattr(request, "param", {})
param["skip_cluster"] = False
with _ray_start_cluster(**param) as res:
yield res


@pytest.fixture
def ray_start_cluster_init(request):
def ray_start_cluster_init(request, maybe_external_redis):
param = getattr(request, "param", {})
with _ray_start_cluster(do_init=True, **param) as res:
yield res


@pytest.fixture
def ray_start_cluster_head(request):
def ray_start_cluster_head(request, maybe_external_redis):
param = getattr(request, "param", {})
with _ray_start_cluster(do_init=True, num_nodes=1, **param) as res:
yield res
Expand All @@ -245,14 +269,14 @@ def ray_start_cluster_head_with_external_redis(request, external_redis):


@pytest.fixture
def ray_start_cluster_2_nodes(request):
def ray_start_cluster_2_nodes(request, maybe_external_redis):
param = getattr(request, "param", {})
with _ray_start_cluster(do_init=True, num_nodes=2, **param) as res:
yield res


@pytest.fixture
def ray_start_object_store_memory(request):
def ray_start_object_store_memory(request, maybe_external_redis):
# Start the Ray processes.
store_size = request.param
system_config = get_default_fixure_system_config()
Expand Down
5 changes: 3 additions & 2 deletions python/ray/tests/test_client_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import grpc

import ray
from ray.ray_constants import REDIS_DEFAULT_PASSWORD
import ray.core.generated.ray_client_pb2 as ray_client_pb2
from ray.cloudpickle.compat import pickle
from ray.job_config import JobConfig
Expand All @@ -18,12 +19,12 @@


def start_ray_and_proxy_manager(n_ports=2):
ray_instance = ray.init(_redis_password="test")
ray_instance = ray.init(_redis_password=REDIS_DEFAULT_PASSWORD)
agent_port = ray.worker.global_worker.node.metrics_agent_port
pm = proxier.ProxyManager(
ray_instance["address"],
session_dir=ray_instance["session_dir"],
redis_password="test",
redis_password=REDIS_DEFAULT_PASSWORD,
runtime_env_agent_port=agent_port,
)
free_ports = random.choices(range(45000, 45100), k=n_ports)
Expand Down
7 changes: 7 additions & 0 deletions python/ray/tests/test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ray.util.joblib import register_ray

from joblib import parallel_backend, Parallel, delayed
from ray._private.test_utils import test_external_redis


def teardown_function(function):
Expand Down Expand Up @@ -68,6 +69,9 @@ def ray_start_4_cpu():
ray.shutdown()


@pytest.mark.skipif(
test_external_redis(), reason="The same Redis is used within the test."
)
def test_ray_init(shutdown_only):
def getpid(args):
return os.getpid()
Expand Down Expand Up @@ -117,6 +121,9 @@ def check_pool_size(pool, size):
],
indirect=True,
)
@pytest.mark.skipif(
test_external_redis(), reason="The same Redis is used within the test."
)
def test_connect_to_ray(ray_start_cluster):
def getpid(args):
return os.getpid()
Expand Down
Loading

0 comments on commit 8ec558d

Please sign in to comment.