Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[autoscaler][kuberay] Deflake KubeRay autoscaling test #26411

Merged
10 changes: 9 additions & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,13 @@
- label: ":kubernetes: operator"
conditions: ["RAY_CI_LINUX_WHEELS_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- |
cleanup() {
if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi
python python/ray/tests/kuberay/setup/teardown_kuberay.py
kind delete cluster
}
trap cleanup EXIT
- echo "--- Setting up Python 3.7 environment."
- PYTHON=3.7 ./ci/env/install-dependencies.sh
# Specifying PYTHON=3.7 above somehow messes up the Ray install.
Expand All @@ -442,6 +448,8 @@
- docker tag rayproject/ray:nightly-py37-cpu ray-ci:kuberay-test
# Load the image into the kind node.
- kind load docker-image ray-ci:kuberay-test
- echo "--- Setting up KubeRay operator."
- python python/ray/tests/kuberay/setup/setup_kuberay.py
- echo "--- Running the test."
- bazel test --config=ci $(./ci/run/bazel_export_options)
--test_tag_filters=kuberay_operator
Expand Down
3 changes: 3 additions & 0 deletions ci/k8s/prep-k8s-environment.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ chmod +x kubectl
mv ./kubectl /usr/bin/kubectl
kubectl version --client

# Delete dangling clusters
kind delete clusters --all

# Create the cluster
time kind create cluster --wait 120s --config ./ci/k8s/kind.config.yaml
docker ps
Expand Down
15 changes: 15 additions & 0 deletions python/ray/tests/kuberay/setup/raycluster_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
apiVersion: ray.io/v1alpha1
kind: RayCluster
metadata:
name: raycluster-test
spec:
headGroupSpec:
serviceType: ClusterIP
replicas: 1
rayStartParams: {}
template:
metadata:
spec:
containers:
- name: ray-test
10 changes: 10 additions & 0 deletions python/ray/tests/kuberay/setup/setup_kuberay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from ray.tests.kuberay.utils import (
setup_kuberay_operator,
wait_for_raycluster_crd,
setup_logging,
)

if __name__ == "__main__":
setup_logging()
setup_kuberay_operator()
wait_for_raycluster_crd()
5 changes: 5 additions & 0 deletions python/ray/tests/kuberay/setup/teardown_kuberay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from ray.tests.kuberay.utils import teardown_kuberay_operator, setup_logging

if __name__ == "__main__":
setup_logging()
teardown_kuberay_operator()
66 changes: 6 additions & 60 deletions python/ray/tests/kuberay/test_autoscaling_e2e.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import copy
import logging
import os
import pathlib
import tempfile
import unittest
import subprocess
Expand All @@ -16,14 +15,15 @@
get_raycluster,
ray_client_port_forward,
ray_job_submit,
setup_logging,
switch_to_ray_parent_dir,
kubectl_exec_python_script,
kubectl_logs,
kubectl_patch,
kubectl_delete,
wait_for_pods,
wait_for_pod_to_start,
wait_for_ray_health,
wait_for_crd,
)

from ray.tests.kuberay.scripts import (
Expand All @@ -33,26 +33,18 @@
)

logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="[%(levelname)s %(asctime)s] " "%(filename)s: %(lineno)d " "%(message)s",
)

# This image will be used for both the Ray nodes and the autoscaler.
# The CI should pass an image built from the test branch.
RAY_IMAGE = os.environ.get("RAY_IMAGE", "rayproject/ray:448f52")
RAY_IMAGE = os.environ.get("RAY_IMAGE", "rayproject/ray:nightly-py38")
Copy link
Contributor Author

@DmitriGekhtman DmitriGekhtman Jul 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason for the py38 besides the fact that I use a py38 environment locally (Ray images are Py37 by default)
Nightly seems a reasonable enough default for a test whose primary purpose is to test PRs going into the master branch.

Copy link
Contributor Author

@DmitriGekhtman DmitriGekhtman Jul 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(The CI specifies a image built from the PR branch.)

# By default, use the same image for the autoscaler and Ray containers.
AUTOSCALER_IMAGE = os.environ.get("AUTOSCALER_IMAGE", RAY_IMAGE)
# Set to IfNotPresent in kind CI.
PULL_POLICY = os.environ.get("PULL_POLICY", "Always")
logger.info(f"Using image `{RAY_IMAGE}` for Ray containers.")
logger.info(f"Using image `{AUTOSCALER_IMAGE}` for Autoscaler containers.")
logger.info(f"Using pull policy `{PULL_POLICY}` for all images.")
# The default "rayproject/ray:413fe0" is the currently pinned autoscaler image
# (to be replaced with rayproject/ray:1.12.0 upon 1.12.0 release).

# Parent directory of Ray repository
RAY_PARENT = str(pathlib.Path(__file__).resolve().parents[5])
# Path to example config rel RAY_PARENT
EXAMPLE_CLUSTER_PATH = "ray/python/ray/autoscaler/kuberay/ray-cluster.complete.yaml"

Expand All @@ -68,38 +60,6 @@ class KubeRayAutoscalingTest(unittest.TestCase):
kubectl is used throughout, as that reflects the instructions in the docs.
"""

def setUp(self):
"""Set up KubeRay operator and Ray autoscaler RBAC."""

# Switch to parent of Ray repo, because that's what the doc examples do.
logger.info("Switching to parent of Ray directory.")
os.chdir(RAY_PARENT)

logger.info("Cloning KubeRay and setting up KubeRay configuration.")
# For faster run-time when triggering the test locally, don't run the init
# script if it has already been run.
subprocess.check_call(
[
"bash",
"-c",
(
"ls ray/python/ray/autoscaler/kuberay/config ||"
" ./ray/python/ray/autoscaler/kuberay/init-config.sh"
),
]
)
logger.info("Creating KubeRay operator.")
subprocess.check_call(
[
"kubectl",
"create",
"-k",
"ray/python/ray/autoscaler/kuberay/config/default",
]
)
logger.info("Making sure RayCluster CRD has been registered.")
wait_for_crd("rayclusters.ray.io")

def _get_ray_cr_config(
self, min_replicas=0, cpu_replicas=0, gpu_replicas=0
) -> Dict[str, Any]:
Expand Down Expand Up @@ -242,6 +202,8 @@ def testAutoscaling(self):
The `num-cpus` arg to Ray start is 1 for each Ray container; thus Ray accounts
1 CPU for each Ray node in the test.
"""
switch_to_ray_parent_dir()

# Cluster creation
logger.info("Creating a RayCluster with no worker pods.")
self._apply_ray_cr(min_replicas=0, cpu_replicas=0, gpu_replicas=0)
Expand Down Expand Up @@ -424,26 +386,10 @@ def testAutoscaling(self):
logger.info("Confirming Ray pods are gone.")
wait_for_pods(goal_num_pods=0, namespace=RAY_CLUSTER_NAMESPACE)

def tearDown(self):
"""Clean resources following the instructions in the docs."""

logger.info("Deleting operator.")
subprocess.check_call(
[
"kubectl",
"delete",
"-k",
"ray/python/ray/autoscaler/kuberay/config/default",
]
)

logger.info("Double-checking no pods left over.")
wait_for_pods(goal_num_pods=0, namespace=RAY_CLUSTER_NAMESPACE)
wait_for_pods(goal_num_pods=0, namespace="ray-system")


if __name__ == "__main__":
import pytest
import sys

setup_logging()
sys.exit(pytest.main(["-vv", __file__]))
108 changes: 102 additions & 6 deletions python/ray/tests/kuberay/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import time
from typing import Any, Dict, Generator, List, Optional
import yaml
import os

import ray
from ray.job_submission import JobStatus, JobSubmissionClient
Expand All @@ -18,22 +19,117 @@
logger = logging.getLogger(__name__)

SCRIPTS_DIR = pathlib.Path(__file__).resolve().parent / "scripts"
TEST_CR_PATH = (
pathlib.Path(__file__).resolve().parent / "setup" / "raycluster_test.yaml"
)
TEST_CLUSTER_NAME = "raycluster-test"

# Parent directory of Ray repository
RAY_PARENT = str(pathlib.Path(__file__).resolve().parents[5])

def wait_for_crd(crd_name: str, tries=60, backoff_s=5):
RAYCLUSTERS_QUALIFIED = "rayclusters.ray.io"

LOG_FORMAT = "[%(levelname)s %(asctime)s] " "%(filename)s: %(lineno)d " "%(message)s"


def setup_logging():
logging.basicConfig(
level=logging.INFO,
format=LOG_FORMAT,
)


def switch_to_ray_parent_dir():
# Switch to parent of Ray repo, because that's what the doc examples do.
logger.info("Switching to parent of Ray directory.")
os.chdir(RAY_PARENT)


def setup_kuberay_operator():
"""Set up KubeRay operator and Ray autoscaler RBAC."""
switch_to_ray_parent_dir()
logger.info("Cloning KubeRay and setting up KubeRay configuration.")
# For faster run-time when triggering the test locally, don't run the init
# script if it has already been run.
subprocess.check_call(
[
"bash",
"-c",
(
"ls ray/python/ray/autoscaler/kuberay/config ||"
" ./ray/python/ray/autoscaler/kuberay/init-config.sh"
),
]
)
logger.info("Creating KubeRay operator.")
subprocess.check_call(
[
"kubectl",
"create",
"-k",
"ray/python/ray/autoscaler/kuberay/config/default",
]
)


def teardown_kuberay_operator():
logger.info("Switching to parent of Ray directory.")
os.chdir(RAY_PARENT)

logger.info("Deleting operator.")
subprocess.check_call(
[
"kubectl",
"delete",
"--ignore-not-found",
"-k",
"ray/python/ray/autoscaler/kuberay/config/default",
]
)

logger.info("Double-checking no pods left over in namespace ray-system.")
wait_for_pods(goal_num_pods=0, namespace="ray-system")


def wait_for_raycluster_crd(tries=60, backoff_s=5):
"""CRD creation can take a bit of time after the client request.
This function waits until the crd with the provided name is registered.
"""
switch_to_ray_parent_dir()
logger.info("Making sure RayCluster CRD has been registered.")
for i in range(tries):
get_crd_output = subprocess.check_output(["kubectl", "get", "crd"]).decode()
if crd_name in get_crd_output:
logger.info(f"Confirmed existence of CRD {crd_name}.")
return
if RAYCLUSTERS_QUALIFIED in get_crd_output:
logger.info("Confirmed existence of RayCluster CRD.")
break
elif i < tries - 1:
logger.info(f"Still waiting to register CRD {crd_name}")
logger.info("Still waiting to register RayCluster CRD.")
time.sleep(backoff_s)
else:
raise Exception(f"Failed to register CRD {crd_name}")
raise Exception("Failed to register RayCluster CRD.")

# Create a test RayCluster CR to make sure that the CRD is fully registered.
for i in range(tries):
try:
subprocess.check_call(["kubectl", "apply", "-f", TEST_CR_PATH])
break
except subprocess.CalledProcessError as e:
logger.info("Can't create RayCluster CR.")
if i < tries - 1:
logger.info("Retrying.")
time.sleep(backoff_s)
else:
logger.info("Giving up.")
raise e from None

# Confirm the test RayCluster exists.
out = subprocess.check_output(["kubectl", "get", RAYCLUSTERS_QUALIFIED]).decode()
assert TEST_CLUSTER_NAME in out, out

# Delete the test RayCluster.
subprocess.check_call(["kubectl", "delete", "-f", TEST_CR_PATH])
# Make sure the associated resources are gone before proceeding.
wait_for_pods(goal_num_pods=0, namespace="default")


def wait_for_pods(goal_num_pods: int, namespace: str, tries=60, backoff_s=5) -> None:
Expand Down