From ed131f87dafcc2a85ba69cccc9363ee263d840ad Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Wed, 14 Jul 2021 21:39:07 +0200 Subject: [PATCH] [release] move release testing end to end script to main ray repo (#17070) --- release/.buildkite/build_pipeline.py | 307 +++++ release/__init__.py | 0 release/alert.py | 401 ++++++ release/alerts/__init__.py | 0 release/alerts/default.py | 13 + release/alerts/long_running_tests.py | 32 + release/alerts/rllib_tests.py | 14 + release/alerts/tune_tests.py | 60 + release/alerts/xgboost_tests.py | 58 + release/config_generator.html | 214 ++++ release/e2e.py | 1709 ++++++++++++++++++++++++++ release/requirements.txt | 15 + 12 files changed, 2823 insertions(+) create mode 100644 release/.buildkite/build_pipeline.py create mode 100644 release/__init__.py create mode 100644 release/alert.py create mode 100644 release/alerts/__init__.py create mode 100644 release/alerts/default.py create mode 100644 release/alerts/long_running_tests.py create mode 100644 release/alerts/rllib_tests.py create mode 100644 release/alerts/tune_tests.py create mode 100644 release/alerts/xgboost_tests.py create mode 100644 release/config_generator.html create mode 100644 release/e2e.py create mode 100644 release/requirements.txt diff --git a/release/.buildkite/build_pipeline.py b/release/.buildkite/build_pipeline.py new file mode 100644 index 0000000000000..05a4fe450c09d --- /dev/null +++ b/release/.buildkite/build_pipeline.py @@ -0,0 +1,307 @@ +import copy +import logging +import os +import sys + +import yaml + +# Env variables: + +# RAY_REPO Repo to use for finding the wheel +# RAY_BRANCH Branch to find the wheel +# RAY_TEST_REPO Repo to use for test scripts +# RAY_TEST_BRANCH Branch for test scripts +# FILTER_FILE File filter +# FILTER_TEST Test name filter +# RELEASE_TEST_SUITE Release test suite (e.g. manual, nightly) + + +class ReleaseTest: + def __init__(self, name: str, smoke_test: bool = False, retry: int = 0): + self.name = name + self.smoke_test = smoke_test + self.retry = retry + + def __str__(self): + return self.name + + def __repr__(self): + return self.name + + def __contains__(self, item): + return self.name.__contains__(item) + + def __iter__(self): + return iter(self.name) + + def __len__(self): + return len(self.name) + + +class SmokeTest(ReleaseTest): + def __init__(self, name: str, retry: int = 0): + super(SmokeTest, self).__init__( + name=name, smoke_test=True, retry=retry) + + +CORE_NIGHTLY_TESTS = { + "~/ray/release/nightly_tests/nightly_tests.yaml": [ + "shuffle_10gb", + "shuffle_50gb", + "shuffle_50gb_large_partition", + "shuffle_100gb", + "non_streaming_shuffle_100gb", + "non_streaming_shuffle_50gb_large_partition", + "non_streaming_shuffle_50gb", + "dask_on_ray_10gb_sort", + "dask_on_ray_100gb_sort", + "dask_on_ray_large_scale_test_no_spilling", + "dask_on_ray_large_scale_test_spilling", + "stress_test_placement_group", + "shuffle_1tb_1000_partition", + "non_streaming_shuffle_1tb_1000_partition", + "shuffle_1tb_5000_partitions", + "non_streaming_shuffle_1tb_5000_partitions", + "decision_tree_autoscaling", + "autoscaling_shuffle_1tb_1000_partitions", + SmokeTest("stress_test_many_tasks"), + SmokeTest("stress_test_dead_actors"), + ], + "~/ray/benchmarks/benchmark_tests.yaml": [ + "single_node", + "object_store", + ], +} + +NIGHTLY_TESTS = { + # "~/ray/release/horovod_tests/horovod_tests.yaml": [ + # SmokeTest("horovod_test"), + # ], # Should we enable this? + "~/ray/release/golden_notebook_tests/golden_notebook_tests.yaml": [ + "dask_xgboost_test", + "modin_xgboost_test", + "torch_tune_serve_test", + ], + "~/ray/release/long_running_tests/long_running_tests.yaml": [ + SmokeTest("actor_deaths"), + SmokeTest("apex"), + SmokeTest("impala"), + SmokeTest("many_actor_tasks"), + SmokeTest("many_drivers"), + SmokeTest("many_ppo"), + SmokeTest("many_tasks"), + SmokeTest("many_tasks_serialized_ids"), + SmokeTest("node_failures"), + SmokeTest("pbt"), + # SmokeTest("serve"), + # SmokeTest("serve_failure"), + ], + "~/ray/release/microbenchmark/microbenchmark.yaml": [ + "microbenchmark", + ], + "~/ray/release/sgd_tests/sgd_tests.yaml": [ + "sgd_gpu", + ], + "~/ray/release/tune_tests/scalability_tests/tune_tests.yaml": [ + "bookkeeping_overhead", + "durable_trainable", + SmokeTest("long_running_large_checkpoints"), + SmokeTest("network_overhead"), + "result_throughput_cluster", + "result_throughput_single_node", + "xgboost_sweep", + ], + "~/ray/release/xgboost_tests/xgboost_tests.yaml": [ + "train_small", + "train_moderate", + "train_gpu", + "tune_small", + "tune_4x32", + "tune_32x4", + "ft_small_elastic", + "ft_small_non_elastic", + "distributed_api_test", + ], +} + +WEEKLY_TESTS = { + "~/ray/benchmarks/benchmark_tests.yaml": [ + "distributed", + ], + "~/ray/release/nightly_tests/nightly_tests.yaml": [ + "stress_test_many_tasks", + "stress_test_dead_actors", + ], + "~/ray/release/horovod_tests/horovod_tests.yaml": [ + "horovod_test", + ], + "~/ray/release/long_running_distributed_tests" + "/long_running_distributed.yaml": [ + "pytorch_pbt_failure", + ], + # Full long running tests (1 day runtime) + "~/ray/release/long_running_tests/long_running_tests.yaml": [ + "actor_deaths", + "apex", + "impala", + "many_actor_tasks", + "many_drivers", + "many_ppo", + "many_tasks", + "many_tasks_serialized_ids", + "node_failures", + "pbt", + # "serve", + # "serve_failure", + ], + "~/ray/release/tune_tests/scalability_tests/tune_tests.yaml": [ + "network_overhead", + "long_running_large_checkpoints", + ], +} + +MANUAL_TESTS = { + "~/ray/release/rllib_tests/rllib_tests.yaml": [ + "learning_tests", + "example_scripts_on_gpu_tests", + "stress_tests", + ], + "~/ray/release/long_running_tests/long_running_tests.yaml": [ + SmokeTest("serve"), + SmokeTest("serve_failure"), + ] +} + +SUITES = { + "core-nightly": CORE_NIGHTLY_TESTS, + "nightly": NIGHTLY_TESTS, + "weekly": WEEKLY_TESTS, + "manual": MANUAL_TESTS, +} + +DEFAULT_STEP_TEMPLATE = { + "env": { + "ANYSCALE_CLOUD_ID": "cld_4F7k8814aZzGG8TNUGPKnc", + "ANYSCALE_PROJECT": "prj_2xR6uT6t7jJuu1aCwWMsle", + "RELEASE_AWS_BUCKET": "ray-release-automation-results", + "RELEASE_AWS_LOCATION": "dev", + "RELEASE_AWS_DB_NAME": "ray_ci", + "RELEASE_AWS_DB_TABLE": "release_test_result", + "AWS_REGION": "us-west-2" + }, + "agents": { + "queue": "runner_queue_branch" + }, + "plugins": [{ + "docker#v3.8.0": { + "image": "rayproject/ray", + "propagate-environment": True + } + }], + "commands": [] +} + + +def build_pipeline(steps): + all_steps = [] + + RAY_BRANCH = os.environ.get("RAY_BRANCH", "master") + RAY_REPO = os.environ.get("RAY_REPO", + "https://github.com/ray-project/ray.git") + + RAY_TEST_BRANCH = os.environ.get("RAY_TEST_BRANCH", RAY_BRANCH) + RAY_TEST_REPO = os.environ.get("RAY_TEST_REPO", RAY_REPO) + + FILTER_FILE = os.environ.get("FILTER_FILE", "") + FILTER_TEST = os.environ.get("FILTER_TEST", "") + + logging.info( + f"Building pipeline \n" + f"Ray repo/branch to test:\n" + f" RAY_REPO = {RAY_REPO}\n" + f" RAY_BRANCH = {RAY_BRANCH}\n\n" + f"Ray repo/branch containing the test configurations and scripts:" + f" RAY_TEST_REPO = {RAY_TEST_REPO}\n" + f" RAY_TEST_BRANCH = {RAY_TEST_BRANCH}\n\n" + f"Filtering for these tests:\n" + f" FILTER_FILE = {FILTER_FILE}\n" + f" FILTER_TEST = {FILTER_TEST}\n\n") + + for test_file, test_names in steps.items(): + if FILTER_FILE and FILTER_FILE not in test_file: + continue + + test_base = os.path.basename(test_file) + for test_name in test_names: + if FILTER_TEST and FILTER_TEST not in test_name: + continue + + if not isinstance(test_name, ReleaseTest): + test_name = ReleaseTest(name=test_name) + + logging.info(f"Adding test: {test_base}/{test_name}") + + cmd = str(f"python release/e2e.py " + f"--ray-branch {RAY_BRANCH} " + f"--category {RAY_BRANCH} " + f"--test-config {test_file} " + f"--test-name {test_name}") + + if test_name.smoke_test: + logging.info("This test will run as a smoke test.") + cmd += " --smoke-test" + + step_conf = copy.deepcopy(DEFAULT_STEP_TEMPLATE) + + if test_name.retry: + logging.info(f"This test will be retried up to " + f"{test_name.retry} times.") + step_conf["retry"] = { + "automatic": [{ + "exit_status": "*", + "limit": test_name.retry + }] + } + + step_conf["commands"] = [ + "pip install -q -r release/requirements.txt", + "pip install -U boto3 botocore", + f"git clone -b {RAY_TEST_BRANCH} {RAY_TEST_REPO} ~/ray", + cmd, + ] + + step_conf["label"] = f"{test_name} ({RAY_BRANCH}) - " \ + f"{RAY_TEST_BRANCH}/{test_base}" + all_steps.append(step_conf) + + return all_steps + + +def alert_pipeline(stats: bool = False): + step_conf = copy.deepcopy(DEFAULT_STEP_TEMPLATE) + + cmd = "python release/alert.py" + if stats: + cmd += " --stats" + + step_conf["commands"] = [ + "pip install -q -r release/requirements.txt", + "pip install -U boto3 botocore", + cmd, + ] + step_conf["label"] = f"Send periodic alert (stats_only = {stats})" + return [step_conf] + + +if __name__ == "__main__": + alert = os.environ.get("RELEASE_ALERT", "0") + + if alert in ["1", "stats"]: + steps = alert_pipeline(alert == "stats") + else: + TEST_SUITE = os.environ.get("RELEASE_TEST_SUITE", "nightly") + PIPELINE_SPEC = SUITES[TEST_SUITE] + + steps = build_pipeline(PIPELINE_SPEC) + + yaml.dump({"steps": steps}, sys.stdout) diff --git a/release/__init__.py b/release/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/release/alert.py b/release/alert.py new file mode 100644 index 0000000000000..fdda56872fc9e --- /dev/null +++ b/release/alert.py @@ -0,0 +1,401 @@ +import argparse +from collections import defaultdict, Counter +from typing import Any, List, Tuple, Mapping, Optional +import datetime +import hashlib +import json +import logging +import os +import requests +import sys + +import boto3 + +from e2e import GLOBAL_CONFIG + +from alerts.default import handle_result as default_handle_result +from alerts.rllib_tests import handle_result as rllib_tests_handle_result +from alerts.long_running_tests import handle_result as \ + long_running_tests_handle_result +from alerts.tune_tests import handle_result as tune_tests_handle_result +from alerts.xgboost_tests import handle_result as xgboost_tests_handle_result + +SUITE_TO_FN = { + "long_running_tests": long_running_tests_handle_result, + "rllib_tests": rllib_tests_handle_result, + "tune_tests": tune_tests_handle_result, + "xgboost_tests": xgboost_tests_handle_result, +} + +GLOBAL_CONFIG["RELEASE_AWS_DB_STATE_TABLE"] = "alert_state" +GLOBAL_CONFIG["SLACK_WEBHOOK"] = os.environ.get("SLACK_WEBHOOK", "") +GLOBAL_CONFIG["SLACK_CHANNEL"] = os.environ.get("SLACK_CHANNEL", + "#oss-test-cop") + +logger = logging.getLogger() +logger.setLevel(logging.INFO) +handler = logging.StreamHandler(stream=sys.stdout) +formatter = logging.Formatter(fmt="[%(levelname)s %(asctime)s] " + "%(filename)s: %(lineno)d " + "%(message)s") +handler.setFormatter(formatter) +logger.addHandler(handler) + + +def maybe_fetch_slack_webhook(): + if GLOBAL_CONFIG["SLACK_WEBHOOK"] in [None, ""]: + print("Missing SLACK_WEBHOOK, retrieving from AWS secrets store") + GLOBAL_CONFIG["SLACK_WEBHOOK"] = boto3.client( + "secretsmanager", region_name="us-west-2" + ).get_secret_value( + SecretId="arn:aws:secretsmanager:us-west-2:029272617770:secret:" + "release-automation/" + "slack-webhook-Na0CFP")["SecretString"] + + +def _obj_hash(obj: Any) -> str: + json_str = json.dumps(obj, sort_keys=True, ensure_ascii=True) + sha = hashlib.sha256() + sha.update(json_str.encode()) + return sha.hexdigest() + + +def fetch_latest_alerts(rds_data_client): + schema = GLOBAL_CONFIG["RELEASE_AWS_DB_STATE_TABLE"] + + sql = (f""" + SELECT DISTINCT ON (category, test_suite, test_name) + category, test_suite, test_name, last_result_hash, + last_notification_dt + FROM {schema} + ORDER BY category, test_suite, test_name, last_notification_dt DESC + """) + + result = rds_data_client.execute_statement( + database=GLOBAL_CONFIG["RELEASE_AWS_DB_NAME"], + secretArn=GLOBAL_CONFIG["RELEASE_AWS_DB_SECRET_ARN"], + resourceArn=GLOBAL_CONFIG["RELEASE_AWS_DB_RESOURCE_ARN"], + schema=schema, + sql=sql, + ) + for row in result["records"]: + category, test_suite, test_name, last_result_hash, \ + last_notification_dt = ( + r["stringValue"] + if "stringValue" in r else None + for r in row + ) + last_notification_dt = datetime.datetime.strptime( + last_notification_dt, "%Y-%m-%d %H:%M:%S") + yield category, test_suite, test_name, last_result_hash, \ + last_notification_dt + + +def fetch_latest_results(rds_data_client, + fetch_since: Optional[datetime.datetime] = None): + schema = GLOBAL_CONFIG["RELEASE_AWS_DB_TABLE"] + + sql = (f""" + SELECT DISTINCT ON (category, test_suite, test_name) + created_on, category, test_suite, test_name, status, results, + artifacts, last_logs + FROM {schema} """) + + parameters = [] + if fetch_since is not None: + sql += "WHERE created_on >= :created_on " + parameters = [ + { + "name": "created_on", + "typeHint": "TIMESTAMP", + "value": { + "stringValue": fetch_since.strftime("%Y-%m-%d %H:%M:%S") + }, + }, + ] + + sql += "ORDER BY category, test_suite, test_name, created_on DESC" + + result = rds_data_client.execute_statement( + database=GLOBAL_CONFIG["RELEASE_AWS_DB_NAME"], + secretArn=GLOBAL_CONFIG["RELEASE_AWS_DB_SECRET_ARN"], + resourceArn=GLOBAL_CONFIG["RELEASE_AWS_DB_RESOURCE_ARN"], + schema=schema, + sql=sql, + parameters=parameters, + ) + for row in result["records"]: + created_on, category, test_suite, test_name, status, results, \ + artifacts, last_logs = ( + r["stringValue"] if "stringValue" in r else None for r in row) + + # Calculate hash before converting strings to objects + result_obj = (created_on, category, test_suite, test_name, status, + results, artifacts, last_logs) + result_json = json.dumps(result_obj) + result_hash = _obj_hash(result_json) + + # Convert some strings to python objects + created_on = datetime.datetime.strptime(created_on, + "%Y-%m-%d %H:%M:%S") + results = json.loads(results) + artifacts = json.loads(artifacts) + + yield result_hash, created_on, category, test_suite, test_name, \ + status, results, artifacts, last_logs + + +def mark_as_handled(rds_data_client, update: bool, category: str, + test_suite: str, test_name: str, result_hash: str, + last_notification_dt: datetime.datetime): + schema = GLOBAL_CONFIG["RELEASE_AWS_DB_STATE_TABLE"] + + if not update: + sql = (f""" + INSERT INTO {schema} + (category, test_suite, test_name, + last_result_hash, last_notification_dt) + VALUES (:category, :test_suite, :test_name, + :last_result_hash, :last_notification_dt) + """) + else: + sql = (f""" + UPDATE {schema} + SET last_result_hash=:last_result_hash, + last_notification_dt=:last_notification_dt + WHERE category=:category AND test_suite=:test_suite + AND test_name=:test_name + """) + + rds_data_client.execute_statement( + database=GLOBAL_CONFIG["RELEASE_AWS_DB_NAME"], + parameters=[ + { + "name": "category", + "value": { + "stringValue": category + } + }, + { + "name": "test_suite", + "value": { + "stringValue": test_suite or "" + } + }, + { + "name": "test_name", + "value": { + "stringValue": test_name + } + }, + { + "name": "last_result_hash", + "value": { + "stringValue": result_hash + } + }, + { + "name": "last_notification_dt", + "typeHint": "TIMESTAMP", + "value": { + "stringValue": last_notification_dt.strftime( + "%Y-%m-%d %H:%M:%S") + }, + }, + ], + secretArn=GLOBAL_CONFIG["RELEASE_AWS_DB_SECRET_ARN"], + resourceArn=GLOBAL_CONFIG["RELEASE_AWS_DB_RESOURCE_ARN"], + schema=schema, + sql=sql, + ) + + +def post_alerts_to_slack(channel: str, alerts: List[Tuple[str, str, str, str]], + non_alerts: Mapping[str, int]): + if len(alerts) == 0: + logger.info("No alerts to post to slack.") + return + + markdown_lines = [ + f"* {len(alerts)} new release test failures found!*", + "", + ] + + category_alerts = defaultdict(list) + for (category, test_suite, test_name, alert) in alerts: + category_alerts[category].append( + f" *{test_suite}/{test_name}* failed: {alert}") + + for category, alert_list in category_alerts.items(): + markdown_lines.append(f"Branch: *{category}*") + markdown_lines.extend(alert_list) + markdown_lines.append("") + + total_non_alerts = sum(n for n in non_alerts.values()) + non_alert_detail = [f"{n} on {c}" for c, n in non_alerts.items()] + + markdown_lines += [ + f"Additionally, {total_non_alerts} tests passed successfully " + f"({', '.join(non_alert_detail)})." + ] + + slack_url = GLOBAL_CONFIG["SLACK_WEBHOOK"] + + resp = requests.post( + slack_url, + json={ + "text": "\n".join(markdown_lines), + "channel": channel, + "username": "Fail Bot", + "icon_emoji": ":red_circle:", + }, + ) + print(resp.status_code) + print(resp.text) + + +def post_statistics_to_slack(channel: str, + alerts: List[Tuple[str, str, str, str]], + non_alerts: Mapping[str, int]): + total_alerts = len(alerts) + + category_alerts = defaultdict(list) + for (category, test_suite, test_name, alert) in alerts: + category_alerts[category].append(f"`{test_suite}/{test_name}`") + + alert_detail = [f"{len(a)} on {c}" for c, a in category_alerts.items()] + + total_non_alerts = sum(n for n in non_alerts.values()) + non_alert_detail = [f"{n} on {c}" for c, n in non_alerts.items()] + + markdown_lines = [ + "*Periodic release test report*", "", f"In the past 24 hours, " + f"*{total_non_alerts}* release tests finished successfully, and " + f"*{total_alerts}* release tests failed." + ] + + markdown_lines.append("") + + if total_alerts: + markdown_lines.append(f"*Failing:* {', '.join(alert_detail)}") + for c, a in category_alerts.items(): + markdown_lines.append(f" *{c}*: {', '.join(sorted(a))}") + else: + markdown_lines.append("*Failing:* None") + + markdown_lines.append("") + + if total_non_alerts: + markdown_lines.append(f"*Passing:* {', '.join(non_alert_detail)}") + else: + markdown_lines.append("*Passing:* None") + + slack_url = GLOBAL_CONFIG["SLACK_WEBHOOK"] + + resp = requests.post( + slack_url, + json={ + "text": "\n".join(markdown_lines), + "channel": channel, + "username": "Fail Bot", + "icon_emoji": ":red_circle:", + }, + ) + print(resp.status_code) + print(resp.text) + + +def handle_results_and_get_alerts( + rds_data_client, + fetch_since: Optional[datetime.datetime] = None, + always_try_alert: bool = False, + no_status_update: bool = False): + # First build a map of last notifications + last_notifications_map = {} + for category, test_suite, test_name, last_result_hash, \ + last_notification_dt in fetch_latest_alerts(rds_data_client): + last_notifications_map[(category, test_suite, + test_name)] = (last_result_hash, + last_notification_dt) + + alerts = [] + non_alerts = Counter() + + # Then fetch latest results + for result_hash, created_on, category, test_suite, test_name, status, \ + results, artifacts, last_logs in fetch_latest_results( + rds_data_client, fetch_since=fetch_since): + key = (category, test_suite, test_name) + + try_alert = always_try_alert + if key in last_notifications_map: + # If we have an alert for this key, fetch info + last_result_hash, last_notification_dt = last_notifications_map[ + key] + + if last_result_hash != result_hash: + # If we got a new result, handle new result + try_alert = True + # Todo: maybe alert again after some time? + else: + try_alert = True + + if try_alert: + handle_fn = SUITE_TO_FN.get(test_suite, None) + if not handle_fn: + logger.warning(f"No handle for suite {test_suite}") + alert = default_handle_result(created_on, category, test_suite, + test_name, status, results, + artifacts, last_logs) + else: + alert = handle_fn(created_on, category, test_suite, test_name, + status, results, artifacts, last_logs) + + if alert: + logger.warning( + f"Alert raised for test {test_suite}/{test_name} " + f"({category}): {alert}") + + alerts.append((category, test_suite, test_name, alert)) + else: + logger.debug( + f"No alert raised for test {test_suite}/{test_name} " + f"({category})") + non_alerts[category] += 1 + + if not no_status_update: + mark_as_handled(rds_data_client, key in last_notifications_map, + category, test_suite, test_name, result_hash, + datetime.datetime.now()) + + return alerts, non_alerts + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--stats", + action="store_true", + default=False, + help="Finish quickly for training.") + args = parser.parse_args() + + maybe_fetch_slack_webhook() + + rds_data_client = boto3.client("rds-data", region_name="us-west-2") + + if args.stats: + # Only update last 24 hour stats + fetch_since = datetime.datetime.now() - datetime.timedelta(days=1) + alerts, non_alerts = handle_results_and_get_alerts( + rds_data_client, + fetch_since=fetch_since, + always_try_alert=True, + no_status_update=True) + post_statistics_to_slack(GLOBAL_CONFIG["SLACK_CHANNEL"], alerts, + non_alerts) + + else: + alerts, non_alerts = handle_results_and_get_alerts(rds_data_client) + post_alerts_to_slack(GLOBAL_CONFIG["SLACK_CHANNEL"], alerts, + non_alerts) diff --git a/release/alerts/__init__.py b/release/alerts/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/release/alerts/default.py b/release/alerts/default.py new file mode 100644 index 0000000000000..fa6cf8384694f --- /dev/null +++ b/release/alerts/default.py @@ -0,0 +1,13 @@ +import datetime + +from typing import Dict, Optional + + +def handle_result(created_on: datetime.datetime, category: str, + test_suite: str, test_name: str, status: str, results: Dict, + artifacts: Dict, last_logs: str) -> Optional[str]: + + if not status == "finished": + return f"Test script did not finish successfully ({status})." + + return None diff --git a/release/alerts/long_running_tests.py b/release/alerts/long_running_tests.py new file mode 100644 index 0000000000000..fa0aab06a84de --- /dev/null +++ b/release/alerts/long_running_tests.py @@ -0,0 +1,32 @@ +import datetime + +from typing import Dict, Optional + + +def handle_result(created_on: datetime.datetime, category: str, + test_suite: str, test_name: str, status: str, results: Dict, + artifacts: Dict, last_logs: str) -> Optional[str]: + assert test_suite == "long_running_tests" + + # elapsed_time = results.get("elapsed_time", 0.) + last_update_diff = results.get("last_update_diff", float("inf")) + + if test_name in [ + "actor_deaths", "many_actor_tasks", "many_drivers", "many_tasks", + "many_tasks_serialized_ids", "node_failures", + "object_spilling_shuffle", "serve", "serve_failure" + ]: + # Core tests + target_update_diff = 120 + + elif test_name in ["apex", "impala", "many_ppo", "pbt"]: + # Tune/RLLib style tests + target_update_diff = 360 + else: + return None + + if last_update_diff > target_update_diff: + return f"Last update to results json was too long ago " \ + f"({last_update_diff:.2f} > {target_update_diff})" + + return None diff --git a/release/alerts/rllib_tests.py b/release/alerts/rllib_tests.py new file mode 100644 index 0000000000000..a97c3cad49f60 --- /dev/null +++ b/release/alerts/rllib_tests.py @@ -0,0 +1,14 @@ +import datetime + +from typing import Dict, Optional + + +def handle_result(created_on: datetime.datetime, category: str, + test_suite: str, test_name: str, status: str, results: Dict, + artifacts: Dict, last_logs: str) -> Optional[str]: + assert test_suite == "rllib_tests" + + if not status == "finished": + return f"Test script did not finish successfully ({status})." + + return None diff --git a/release/alerts/tune_tests.py b/release/alerts/tune_tests.py new file mode 100644 index 0000000000000..f4087f069b87f --- /dev/null +++ b/release/alerts/tune_tests.py @@ -0,0 +1,60 @@ +import datetime + +from typing import Dict, Optional + + +def handle_result(created_on: datetime.datetime, category: str, + test_suite: str, test_name: str, status: str, results: Dict, + artifacts: Dict, last_logs: str) -> Optional[str]: + assert test_suite == "tune_tests" + + msg = "" + success = status == "finished" + time_taken = results.get("time_taken", float("inf")) + num_terminated = results.get("trial_states", {}).get("TERMINATED", 0) + was_smoke_test = results.get("smoke_test", False) + + if not success: + if status == "timeout": + msg += "Test timed out." + else: + msg += "Test script failed. " + + if test_name == "long_running_large_checkpoints": + last_update_diff = results.get("last_update_diff", float("inf")) + target_update_diff = 360 + + if last_update_diff > target_update_diff: + return f"Last update to results json was too long ago " \ + f"({last_update_diff:.2f} > {target_update_diff})" + return None + + elif test_name == "bookkeeping_overhead": + target_terminated = 10000 + target_time = 800 + elif test_name == "durable_trainable": + target_terminated = 16 + target_time = 600 + elif test_name == "network_overhead": + target_terminated = 100 if not was_smoke_test else 20 + target_time = 900 if not was_smoke_test else 400 + elif test_name == "result_throughput_cluster": + target_terminated = 1000 + target_time = 120 + elif test_name == "result_throughput_single_node": + target_terminated = 96 + target_time = 120 + elif test_name == "xgboost_sweep": + target_terminated = 31 + target_time = 3600 + else: + return None + + if num_terminated < target_terminated: + msg += f"Some trials failed " \ + f"(num_terminated={num_terminated} < {target_terminated}). " + if time_taken > target_time: + msg += f"Took too long to complete " \ + f"(time_taken={time_taken:.2f} > {target_time}). " + + return msg or None diff --git a/release/alerts/xgboost_tests.py b/release/alerts/xgboost_tests.py new file mode 100644 index 0000000000000..59ab2880adf76 --- /dev/null +++ b/release/alerts/xgboost_tests.py @@ -0,0 +1,58 @@ +import datetime + +from typing import Dict, Optional + + +def handle_result(created_on: datetime.datetime, category: str, + test_suite: str, test_name: str, status: str, results: Dict, + artifacts: Dict, last_logs: str) -> Optional[str]: + assert test_suite == "xgboost_tests" + + time_taken = results.get("time_taken", float("inf")) + num_terminated = results.get("trial_states", {}).get("TERMINATED", 0) + + if test_name in [ + "distributed_api_test", "ft_small_elastic", "ft_small_nonelastic" + ]: + if not status == "finished": + return f"Test script did not finish successfully ({status})." + + return None + elif test_name.startswith("tune_"): + msg = "" + if test_name == "tune_small": + target_terminated = 4 + target_time = 90 + elif test_name == "tune_4x32": + target_terminated = 4 + target_time = 120 + elif test_name == "tune_32x4": + target_terminated = 32 + target_time = 600 + else: + return None + + if num_terminated < target_terminated: + msg += f"Some trials failed " \ + f"(num_terminated={num_terminated} < {target_terminated}). " + if time_taken > target_time: + msg += f"Took too long to complete " \ + f"(time_taken={time_taken} > {target_time}). " + + return msg or None + else: + # train scripts + if test_name == "train_small": + target_time = 30 + elif test_name == "train_moderate": + target_time = 60 + elif test_name == "train_gpu": + target_time = 40 + else: + return None + + if time_taken > target_time: + return f"Took too long to complete " \ + f"(time_taken={time_taken:.2f} > {target_time}). " + + return None diff --git a/release/config_generator.html b/release/config_generator.html new file mode 100644 index 0000000000000..179bd632061b6 --- /dev/null +++ b/release/config_generator.html @@ -0,0 +1,214 @@ + + + + + Releaser config generator + + + + +
+

Releaser config generator

+

Use this form to generate a list of environment variables.

+

These variables can be passed to Buildkite to run a subset of release tests + and choose the correct wheels/release test branch

+
+
+
+ + + + + + + +
SetValueDescription
+ +
+ +
+ +
+ +
+ +
+
+ + \ No newline at end of file diff --git a/release/e2e.py b/release/e2e.py new file mode 100644 index 0000000000000..7f3b0bfdb43bc --- /dev/null +++ b/release/e2e.py @@ -0,0 +1,1709 @@ +""" +This is an end to end release test automation script used to kick off periodic +release tests, running on Anyscale. + +The tool leverages app configs and compute templates. + +Calling this script will run a single release test. + +Example: + +python e2e.py --test-config ~/ray/release/xgboost_tests/xgboost_tests.yaml --test-name tune_small + +The following steps are then performed: + +1. It will look up the test tune_small in the file xgboost_tests.yaml +2. It will fetch the specified app config and compute template and register + those with anyscale (if they don’t exist yet) +3. It waits until the app config is built +4. It then kicks off the script defined in the run block +5. When the script is finished, it will fetch the latest logs, the full log + output, and any artifacts specified in the artifacts block. +6. The full logs and artifacts will be stored in a s3 bucket +7. It will also fetch the json file specified in the run block as results. + This is the file where you should write your metrics to. +8. All results are then stored in a database. + Specifically it will store the following fields: + - Timestamp + - Test name + - Status (finished, error, timeout, invalid) + - Last logs (50 lines) + - results (see above) + - artifacts (links to s3 files) + +Then the script exits. If an error occurs at any time, a fail result is +written to the database. + + +Writing a new release test +-------------------------- +Each release test requires the following: + +1. It has to be added in a release test yaml file, describing meta information + about the test (e.g. name, command to run, timeout) +2. You need an app config yaml +3. You need a compute template yaml +4. You need to define a command to run. This is usually a python script. + The command should accept (or ignore) a single optional + `--smoke-test` argument. + Usually the command should write its result metrics to a json file. + The json filename is available in the TEST_OUTPUT_JSON env variable. + +The script will have access to these environment variables: + + "RAY_ADDRESS": os.environ.get("RAY_ADDRESS", "auto") + "TEST_OUTPUT_JSON": results_json_filename + "IS_SMOKE_TEST": "1" if smoke_test else "0" + +For an example, take a look at the XGBoost test suite: + +https://github.com/ray-project/ray/blob/master/release/xgboost_tests/xgboost_tests.yaml + +These all use the same app configs and similar compute templates. This means +that app configs can be re-used across runs and only have to be built ones. + +App configs and compute templates can interpret environment variables. +A notable one is the `RAY_WHEELS` variable which points to the wheels that +should be tested (e.g. latest master wheels). You might want to include +something like this in your `post_build_cmds`: + + - pip3 install -U {{ env["RAY_WHEELS"] | default("ray") }} + +If you want to force rebuilds, consider using something like + + - echo {{ env["TIMESTAMP"] }} + +so that your app configs changes each time the script is executed. If you +only want to trigger rebuilds once per day, use `DATESTAMP` instead: + + - echo {{ env["DATESTAMP"] }} + +Local testing +------------- +For local testing, make sure to authenticate with the ray-ossci AWS user +(e.g. by setting the respective environment variables obtained from go/aws), +or use the `--no-report` command line argument. + +Also make sure to set these environment variables: + +- ANYSCALE_CLI_TOKEN (should contain your anyscale credential token) +- ANYSCALE_PROJECT (should point to a project ID you have access to) + +A test can then be run like this: + +python e2e.py --no-report --test-config ~/ray/release/xgboost_tests/xgboost_tests.yaml --test-name tune_small + +The `--no-report` option disables storing the results in the DB and +artifacts on S3. If you set this option, you do not need access to the +ray-ossci AWS user. + +Running on Head Node vs Running with Anyscale Connect +----------------------------------------------------- +By default release tests run their drivers on the head node. Support is being +added to run release tests that execute the driver as a subprocess and run +the workload on Anyscale product via Anyscale connect. +Note that when the driver in the test is a subprocess of releaser, releaser +cannot be terminated before the test finishes. +Other known feature gaps when running with Anyscale connect: +- Kicking off a test or checking progress is not supported. +- Downloading / uploading logs and artifacts are unsupported. +- Logs from remote may not have finished streaming, before the driver exits. + +Long running tests +------------------ +Long running tests can be kicked off with by adding the --kick-off-only +parameters to the e2e script. The status can then be checked with the +--check command. + +Long running test sessions will be terminated after `timeout` seconds, after +which the latest result in the TEST_OUTPUT_JSON will be reported. Thus, +long running release tests should update this file periodically. + +There are also two config options to configure behavior. The `time_key` is +needed to track the latest update of the TEST_OUTPUT_JSON and should contain +a floating point number (usually `time.time()`). The `max_update_delay` then +specified the maximum time in seconds that can be passed without an update +to the results json. If the output file hasn't been updated in e.g. 60 seconds, +this could indicate that the command is stale/frozen, and thus should fail. + +Release test yaml example +------------------------- +- name: example + owner: + mail: "kai@anyscale.com" # Currently not used + slack: "@tune-team" # Currentl not used + + cluster: + app_config: app_config.yaml # Relative to the release test yaml + compute_template: tpl_cpu.yaml + + run: + timeout: 600 # in seconds + prepare: python wait_cluster.py 4 600 # prepare cmd to run before test + script: python workloads/train.py # actual release test command + + # Only needed for long running test + time_key: last_update # Key in the results json indicating current time + max_update_delay: 30 # If state hasn't been updated in 30s, terminate + + # This block is optional + artifacts: + # Artifact name: location on head node + - detailed_output: detailed_output.csv + + # This block is optional. If present, the contents will be + # deep updated for smoke testing + smoke_test: + cluster: + compute_template: tpl_cpu_smoketest.yaml + +""" # noqa: E501 +import argparse +import boto3 +import collections +import copy +import datetime +import hashlib +import jinja2 +import json +import logging +import multiprocessing +import os +import requests +import shutil +import subprocess +import sys +import tempfile +import time +from queue import Empty +from typing import Any, Dict, Optional, Tuple, List + +import yaml + +import anyscale +import anyscale.conf +from anyscale.api import instantiate_api_client +from anyscale.controllers.session_controller import SessionController +from anyscale.sdk.anyscale_client.sdk import AnyscaleSDK + +logger = logging.getLogger() +logger.setLevel(logging.INFO) +handler = logging.StreamHandler(stream=sys.stdout) +formatter = logging.Formatter(fmt="[%(levelname)s %(asctime)s] " + "%(filename)s: %(lineno)d " + "%(message)s") +handler.setFormatter(formatter) +logger.addHandler(handler) + +GLOBAL_CONFIG = { + "ANYSCALE_USER": os.environ.get("ANYSCALE_USER", + "release-automation@anyscale.com"), + "ANYSCALE_HOST": os.environ.get("ANYSCALE_HOST", + "https://beta.anyscale.com"), + "ANYSCALE_CLI_TOKEN": os.environ.get("ANYSCALE_CLI_TOKEN"), + "ANYSCALE_CLOUD_ID": os.environ.get( + "ANYSCALE_CLOUD_ID", + "cld_4F7k8814aZzGG8TNUGPKnc"), # cld_4F7k8814aZzGG8TNUGPKnc + "ANYSCALE_PROJECT": os.environ.get("ANYSCALE_PROJECT", ""), + "RAY_VERSION": os.environ.get("RAY_VERSION", "2.0.0.dev0"), + "RAY_REPO": os.environ.get("RAY_REPO", + "https://github.com/ray-project/ray.git"), + "RAY_BRANCH": os.environ.get("RAY_BRANCH", "master"), + "RELEASE_AWS_BUCKET": os.environ.get("RELEASE_AWS_BUCKET", + "ray-release-automation-results"), + "RELEASE_AWS_LOCATION": os.environ.get("RELEASE_AWS_LOCATION", "dev"), + "RELEASE_AWS_DB_NAME": os.environ.get("RELEASE_AWS_DB_NAME", "ray_ci"), + "RELEASE_AWS_DB_TABLE": os.environ.get("RELEASE_AWS_DB_TABLE", + "release_test_result"), + "RELEASE_AWS_DB_SECRET_ARN": os.environ.get( + "RELEASE_AWS_DB_SECRET_ARN", + "arn:aws:secretsmanager:us-west-2:029272617770:secret:" + "rds-db-credentials/cluster-7RB7EYTTBK2EUC3MMTONYRBJLE/ray_ci-MQN2hh", + ), + "RELEASE_AWS_DB_RESOURCE_ARN": os.environ.get( + "RELEASE_AWS_DB_RESOURCE_ARN", + "arn:aws:rds:us-west-2:029272617770:cluster:ci-reporting", + ), + "DATESTAMP": str(datetime.datetime.now().strftime("%Y%m%d")), + "TIMESTAMP": str(int(datetime.datetime.now().timestamp())), + "EXPIRATION_1D": str((datetime.datetime.now() + + datetime.timedelta(days=1)).strftime("%Y-%m-%d")), + "EXPIRATION_2D": str((datetime.datetime.now() + + datetime.timedelta(days=2)).strftime("%Y-%m-%d")), + "EXPIRATION_3D": str((datetime.datetime.now() + + datetime.timedelta(days=3)).strftime("%Y-%m-%d")), +} + +REPORT_S = 30 + + +def maybe_fetch_api_token(): + if GLOBAL_CONFIG["ANYSCALE_CLI_TOKEN"] is None: + print("Missing ANYSCALE_CLI_TOKEN, retrieving from AWS secrets store") + # NOTE(simon) This should automatically retrieve + # release-automation@anyscale.com's anyscale token + GLOBAL_CONFIG["ANYSCALE_CLI_TOKEN"] = boto3.client( + "secretsmanager", region_name="us-west-2" + ).get_secret_value( + SecretId="arn:aws:secretsmanager:us-west-2:029272617770:secret:" + "release-automation/" + "anyscale-token20210505220406333800000001-BcUuKB")["SecretString"] + + +class ReleaseTestTimeoutError(RuntimeError): + pass + + +class State: + def __init__(self, state: str, timestamp: float, data: Any): + self.state = state + self.timestamp = timestamp + self.data = data + + +sys.path.insert(0, anyscale.ANYSCALE_RAY_DIR) + + +def anyscale_project_url(project_id: str): + return f"{GLOBAL_CONFIG['ANYSCALE_HOST']}" \ + f"/o/anyscale-internal/projects/{project_id}" \ + f"/?tab=session-list" + + +def anyscale_session_url(project_id: str, session_id: str): + return f"{GLOBAL_CONFIG['ANYSCALE_HOST']}" \ + f"/o/anyscale-internal/projects/{project_id}" \ + f"/clusters/{session_id}" + + +def anyscale_compute_tpl_url(compute_tpl_id: str): + return f"{GLOBAL_CONFIG['ANYSCALE_HOST']}" \ + f"/o/anyscale-internal/configurations/cluster-computes" \ + f"/{compute_tpl_id}" + + +def anyscale_app_config_build_url(build_id: str): + return f"{GLOBAL_CONFIG['ANYSCALE_HOST']}" \ + f"/o/anyscale-internal/configurations/app-config-details" \ + f"/{build_id}" + + +def wheel_url(ray_version, git_branch, git_commit): + return f"https://s3-us-west-2.amazonaws.com/ray-wheels/" \ + f"{git_branch}/{git_commit}/" \ + f"ray-{ray_version}-cp37-cp37m-manylinux2014_x86_64.whl" + + +def wheel_exists(ray_version, git_branch, git_commit): + url = wheel_url(ray_version, git_branch, git_commit) + return requests.head(url).status_code == 200 + + +def get_latest_commits(repo: str, branch: str = "master") -> List[str]: + cur = os.getcwd() + with tempfile.TemporaryDirectory() as tmpdir: + os.chdir(tmpdir) + + clone_cmd = [ + "git", + "clone", + "--filter=tree:0", + "--no-checkout", + # "--single-branch", + # "--depth=10", + f"--branch={branch}", + repo, + tmpdir, + ] + log_cmd = [ + "git", + "log", + "-n", + "10", + "--pretty=format:%H", + ] + + subprocess.check_output(clone_cmd) + commits = subprocess.check_output(log_cmd).decode( + sys.stdout.encoding).split("\n") + os.chdir(cur) + return commits + + +def find_ray_wheels(repo: str, branch: str, version: str): + url = None + commits = get_latest_commits(repo, branch) + logger.info(f"Latest 10 commits for branch {branch}: {commits}") + for commit in commits: + if wheel_exists(version, branch, commit): + url = wheel_url(version, branch, commit) + os.environ["RAY_WHEELS"] = url + logger.info( + f"Found wheels URL for Ray {version}, branch {branch}: " + f"{url}") + break + return url + + +def _check_stop(stop_event: multiprocessing.Event): + if stop_event.is_set(): + raise ReleaseTestTimeoutError("Process timed out.") + + +def _deep_update(d, u): + for k, v in u.items(): + if isinstance(v, collections.abc.Mapping): + d[k] = _deep_update(d.get(k, {}), v) + else: + d[k] = v + return d + + +def _dict_hash(dt: Dict[Any, Any]) -> str: + json_str = json.dumps(dt, sort_keys=True, ensure_ascii=True) + sha = hashlib.sha256() + sha.update(json_str.encode()) + return sha.hexdigest() + + +def _load_config(local_dir: str, config_file: Optional[str]) -> Optional[Dict]: + if not config_file: + return None + + config_path = os.path.join(local_dir, config_file) + with open(config_path, "rt") as f: + # Todo: jinja2 render + content = f.read() + + env = copy.deepcopy(os.environ) + env.update(GLOBAL_CONFIG) + + content = jinja2.Template(content).render(env=env) + return yaml.safe_load(content) + + +def has_errored(result: Dict[Any, Any]) -> bool: + return result.get("status", "invalid") != "finished" + + +def report_result(test_suite: str, test_name: str, status: str, logs: str, + results: Dict[Any, Any], artifacts: Dict[Any, Any], + category: str): + now = datetime.datetime.utcnow() + rds_data_client = boto3.client("rds-data", region_name="us-west-2") + + schema = GLOBAL_CONFIG["RELEASE_AWS_DB_TABLE"] + + sql = ( + f"INSERT INTO {schema} " + f"(created_on, test_suite, test_name, status, last_logs, " + f"results, artifacts, category) " + f"VALUES (:created_on, :test_suite, :test_name, :status, :last_logs, " + f":results, :artifacts, :category)") + + rds_data_client.execute_statement( + database=GLOBAL_CONFIG["RELEASE_AWS_DB_NAME"], + parameters=[ + { + "name": "created_on", + "typeHint": "TIMESTAMP", + "value": { + "stringValue": now.strftime("%Y-%m-%d %H:%M:%S") + }, + }, + { + "name": "test_suite", + "value": { + "stringValue": test_suite + } + }, + { + "name": "test_name", + "value": { + "stringValue": test_name + } + }, + { + "name": "status", + "value": { + "stringValue": status + } + }, + { + "name": "last_logs", + "value": { + "stringValue": logs + } + }, + { + "name": "results", + "typeHint": "JSON", + "value": { + "stringValue": json.dumps(results) + }, + }, + { + "name": "artifacts", + "typeHint": "JSON", + "value": { + "stringValue": json.dumps(artifacts) + }, + }, + { + "name": "category", + "value": { + "stringValue": category + } + }, + ], + secretArn=GLOBAL_CONFIG["RELEASE_AWS_DB_SECRET_ARN"], + resourceArn=GLOBAL_CONFIG["RELEASE_AWS_DB_RESOURCE_ARN"], + schema=schema, + sql=sql, + ) + + +def _cleanup_session(sdk: AnyscaleSDK, session_id: str): + if session_id: + # Just trigger a request. No need to wait until session shutdown. + sdk.stop_session(session_id=session_id, stop_session_options={}) + + +def search_running_session(sdk: AnyscaleSDK, project_id: str, + session_name: str) -> Optional[str]: + session_id = None + + logger.info(f"Looking for existing session with name {session_name}") + + result = sdk.search_sessions( + project_id=project_id, + sessions_query=dict(name=dict(equals=session_name))) + + if len(result.results) > 0 and result.results[0].state == "Running": + logger.info("Found existing session.") + session_id = result.results[0].id + return session_id + + +def create_or_find_compute_template( + sdk: AnyscaleSDK, + project_id: str, + compute_tpl: Dict[Any, Any], + _repeat: bool = True) -> Tuple[Optional[str], Optional[str]]: + compute_tpl_id = None + compute_tpl_name = None + if compute_tpl: + # As of Anyscale 0.4.1, it is an error to use the same compute template + # name within the same organization, between different projects. + compute_tpl_name = f"{project_id}/compute/{_dict_hash(compute_tpl)}" + + logger.info(f"Tests uses compute template " + f"with name {compute_tpl_name}. Looking up existing " + f"templates.") + + paging_token = None + while not compute_tpl_id: + result = sdk.search_compute_templates( + dict( + project_id=project_id, + name=dict(equals=compute_tpl_name), + include_anonymous=True), + paging_token=paging_token) + paging_token = result.metadata.next_paging_token + + for res in result.results: + if res.name == compute_tpl_name: + compute_tpl_id = res.id + logger.info( + f"Template already exists with ID {compute_tpl_id}") + break + + if not paging_token: + break + + if not compute_tpl_id: + logger.info(f"Compute template not found. " + f"Creating with name {compute_tpl_name}.") + try: + result = sdk.create_compute_template( + dict( + name=compute_tpl_name, + project_id=project_id, + config=compute_tpl)) + compute_tpl_id = result.result.id + except Exception as e: + if _repeat: + logger.warning( + f"Got exception when trying to create compute " + f"template: {e}. Sleeping for 10 seconds and then " + f"try again once...") + time.sleep(10) + return create_or_find_compute_template( + sdk=sdk, + project_id=project_id, + compute_tpl=compute_tpl, + _repeat=False) + + raise e + + logger.info(f"Compute template created with ID {compute_tpl_id}") + + return compute_tpl_id, compute_tpl_name + + +def create_or_find_app_config( + sdk: AnyscaleSDK, + project_id: str, + app_config: Dict[Any, Any], + _repeat: bool = True) -> Tuple[Optional[str], Optional[str]]: + app_config_id = None + app_config_name = None + if app_config: + app_config_name = f"{project_id}-{_dict_hash(app_config)}" + + logger.info(f"Test uses an app config with hash {app_config_name}. " + f"Looking up existing app configs with this name.") + + paging_token = None + while not app_config_id: + result = sdk.list_app_configs( + project_id=project_id, count=50, paging_token=paging_token) + paging_token = result.metadata.next_paging_token + + for res in result.results: + if res.name == app_config_name: + app_config_id = res.id + logger.info( + f"App config already exists with ID {app_config_id}") + break + + if not paging_token or app_config_id: + break + + if not app_config_id: + logger.info("App config not found. Creating new one.") + try: + result = sdk.create_app_config( + dict( + name=app_config_name, + project_id=project_id, + config_json=app_config)) + app_config_id = result.result.id + except Exception as e: + if _repeat: + logger.warning( + f"Got exception when trying to create app " + f"config: {e}. Sleeping for 10 seconds and then " + f"try again once...") + time.sleep(10) + return create_or_find_app_config( + sdk=sdk, + project_id=project_id, + app_config=app_config, + _repeat=False) + + raise e + + logger.info(f"App config created with ID {app_config_id}") + + return app_config_id, app_config_name + + +def install_app_config_packages(app_config: Dict[Any, Any]): + os.environ.update(app_config.get("env_vars", {})) + packages = app_config["python"]["pip_packages"] + for package in packages: + subprocess.check_output(["pip", "install", "-U", package], text=True) + + +def install_matching_ray(): + wheel = os.environ.get("RAY_WHEELS", None) + if not wheel: + return + assert "manylinux2014_x86_64" in wheel, wheel + if sys.platform == "darwin": + platform = "macosx_10_13_intel" + elif sys.platform == "win32": + platform = "win_amd64" + else: + platform = "manylinux2014_x86_64" + wheel = wheel.replace("manylinux2014_x86_64", platform) + subprocess.check_output(["pip", "uninstall", "-y", "ray"], text=True) + subprocess.check_output(["pip", "install", "-U", wheel], text=True) + + +def wait_for_build_or_raise(sdk: AnyscaleSDK, + app_config_id: Optional[str]) -> Optional[str]: + if not app_config_id: + return None + + # Fetch build + build_id = None + last_status = None + result = sdk.list_builds(app_config_id) + for build in sorted(result.results, key=lambda b: b.created_at): + build_id = build.id + last_status = build.status + + if build.status == "failed": + continue + + if build.status == "succeeded": + logger.info(f"Link to app config build: " + f"{anyscale_app_config_build_url(build_id)}") + return build_id + + if last_status == "failed": + raise RuntimeError("App config build failed.") + + if not build_id: + raise RuntimeError("No build found for app config.") + + # Build found but not failed/finished yet + completed = False + start_wait = time.time() + next_report = start_wait + REPORT_S + logger.info(f"Waiting for build {build_id} to finish...") + logger.info(f"Track progress here: " + f"{anyscale_app_config_build_url(build_id)}") + while not completed: + now = time.time() + if now > next_report: + logger.info(f"... still waiting for build {build_id} to finish " + f"({int(now - start_wait)} seconds) ...") + next_report = next_report + REPORT_S + + result = sdk.get_build(build_id) + build = result.result + + if build.status == "failed": + raise RuntimeError( + f"App config build failed. Please see " + f"{anyscale_app_config_build_url(build_id)} for details") + + if build.status == "succeeded": + logger.info("Build succeeded.") + return build_id + + completed = build.status not in ["in_progress", "pending"] + + if completed: + raise RuntimeError( + f"Unknown build status: {build.status}. Please see " + f"{anyscale_app_config_build_url(build_id)} for details") + + time.sleep(1) + + return build_id + + +def run_job(cluster_name: str, compute_tpl_name: str, cluster_env_name: str, + job_name: str, min_workers: str, script: str, + script_args: List[str], + env_vars: Dict[str, str]) -> Tuple[int, str]: + # Start cluster and job + address = f"anyscale://{cluster_name}?cluster_compute={compute_tpl_name}" \ + f"&cluster_env={cluster_env_name}&autosuspend=5&&update=True" + logger.info(f"Starting job {job_name} with Ray address: {address}") + env = copy.deepcopy(os.environ) + env.update(GLOBAL_CONFIG) + env.update(env_vars) + env["RAY_ADDRESS"] = address + env["RAY_JOB_NAME"] = job_name + env["RAY_RELEASE_MIN_WORKERS"] = str(min_workers) + proc = subprocess.Popen( + script.split(" ") + script_args, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True) + proc.stdout.reconfigure(line_buffering=True) + logs = "" + for line in proc.stdout: + logs += line + sys.stdout.write(line) + proc.wait() + return proc.returncode, logs + + +def create_and_wait_for_session( + sdk: AnyscaleSDK, + stop_event: multiprocessing.Event, + session_name: str, + session_options: Dict[Any, Any], +) -> str: + # Create session + logger.info(f"Creating session {session_name}") + result = sdk.create_session(session_options) + session_id = result.result.id + + # Trigger session start + logger.info(f"Starting session {session_name} ({session_id})") + session_url = anyscale_session_url( + project_id=GLOBAL_CONFIG["ANYSCALE_PROJECT"], session_id=session_id) + logger.info(f"Link to session: {session_url}") + + result = sdk.start_session(session_id, start_session_options={}) + sop_id = result.result.id + completed = result.result.completed + + # Wait for session + logger.info(f"Waiting for session {session_name}...") + start_wait = time.time() + next_report = start_wait + REPORT_S + while not completed: + _check_stop(stop_event) + now = time.time() + if now > next_report: + logger.info(f"... still waiting for session {session_name} " + f"({int(now - start_wait)} seconds) ...") + next_report = next_report + REPORT_S + + session_operation_response = sdk.get_session_operation( + sop_id, _request_timeout=30) + session_operation = session_operation_response.result + completed = session_operation.completed + time.sleep(1) + + return session_id + + +def run_session_command(sdk: AnyscaleSDK, + session_id: str, + cmd_to_run: str, + stop_event: multiprocessing.Event, + result_queue: multiprocessing.Queue, + env_vars: Dict[str, str], + state_str: str = "CMD_RUN", + kick_off_only: bool = False) -> Tuple[str, int]: + full_cmd = " ".join(f"{k}={v}" + for k, v in env_vars.items()) + " " + cmd_to_run + + logger.info(f"Running command in session {session_id}: \n" f"{full_cmd}") + session_url = anyscale_session_url( + project_id=GLOBAL_CONFIG["ANYSCALE_PROJECT"], session_id=session_id) + logger.info(f"Link to session: {session_url}") + result_queue.put(State(state_str, time.time(), None)) + result = sdk.create_session_command( + dict(session_id=session_id, shell_command=full_cmd)) + + scd_id = result.result.id + completed = result.result.finished_at is not None + + if kick_off_only: + return scd_id, 0 + + start_wait = time.time() + next_report = start_wait + REPORT_S + while not completed: + _check_stop(stop_event) + + now = time.time() + if now > next_report: + logger.info(f"... still waiting for command to finish " + f"({int(now - start_wait)} seconds) ...") + next_report = next_report + REPORT_S + + result = sdk.get_session_command(session_command_id=scd_id) + completed = result.result.finished_at + time.sleep(1) + + status_code = result.result.status_code + + if status_code != 0: + raise RuntimeError( + f"Command returned non-success status: {status_code}") + + return scd_id, status_code + + +def get_command_logs(session_controller: SessionController, + scd_id: str, + lines: int = 50): + result = session_controller.api_client.get_execution_logs_api_v2_session_commands_session_command_id_execution_logs_get( # noqa: E501 + session_command_id=scd_id, + start_line=-1 * lines, + end_line=0) + + return result.result.lines + + +def get_remote_json_content( + temp_dir: str, + session_name: str, + remote_file: Optional[str], + session_controller: SessionController, +): + if not remote_file: + logger.warning("No remote file specified, returning empty dict") + return {} + local_target_file = os.path.join(temp_dir, ".tmp.json") + session_controller.pull( + session_name=session_name, + source=remote_file, + target=local_target_file) + with open(local_target_file, "rt") as f: + return json.load(f) + + +def get_local_json_content(local_file: Optional[str], ): + if not local_file: + logger.warning("No local file specified, returning empty dict") + return {} + with open(local_file, "rt") as f: + return json.load(f) + + +def pull_artifacts_and_store_in_cloud( + temp_dir: str, + logs: str, + session_name: str, + test_name: str, + artifacts: Optional[Dict[Any, Any]], + session_controller: SessionController, +): + output_log_file = os.path.join(temp_dir, "output.log") + with open(output_log_file, "wt") as f: + f.write(logs) + + bucket = GLOBAL_CONFIG["RELEASE_AWS_BUCKET"] + location = f"{GLOBAL_CONFIG['RELEASE_AWS_LOCATION']}" \ + f"/{session_name}/{test_name}" + saved_artifacts = {} + + s3_client = boto3.client("s3") + s3_client.upload_file(output_log_file, bucket, f"{location}/output.log") + saved_artifacts["output.log"] = f"s3://{bucket}/{location}/output.log" + + # Download artifacts + if artifacts: + for name, remote_file in artifacts.items(): + logger.info(f"Downloading artifact `{name}` from " + f"{remote_file}") + local_target_file = os.path.join(temp_dir, name) + session_controller.pull( + session_name=session_name, + source=remote_file, + target=local_target_file) + + # Upload artifacts to s3 + s3_client.upload_file(local_target_file, bucket, + f"{location}/{name}") + saved_artifacts[name] = f"s3://{bucket}/{location}/{name}" + + return saved_artifacts + + +def find_session_by_test_name( + sdk: AnyscaleSDK, + session_controller: SessionController, + temp_dir: str, + state_json: str, + project_id: str, + test_name: str, +) -> Optional[Tuple[str, str, Dict[Any, Any]]]: + paging_token = None + + while True: # Will break if paging_token is None after first search + result = sdk.search_sessions( + project_id=project_id, + sessions_query=dict( + name=dict(contains=test_name), + state_filter=["Running"], + paging=dict(count=20, paging_token=paging_token))) + + for session in result.results: + logger.info(f"Found sessions {session.name}") + if not session.name.startswith(test_name): + continue + + try: + session_state = get_remote_json_content( + temp_dir=temp_dir, + session_name=session.name, + remote_file=state_json, + session_controller=session_controller) + except Exception as exc: + raise RuntimeError(f"Could not get remote json content " + f"for session {session.name}") from exc + + if session_state.get("test_name") == test_name: + return session.id, session.name, session_state + + session_token = result.metadata.next_paging_token + + if not session_token: + return None + + +def get_latest_running_command_id(sdk: AnyscaleSDK, session_id: str + ) -> Tuple[Optional[str], Optional[bool]]: + scd_id = None + paging_token = None + + success = None + + while not scd_id: + result = sdk.list_session_commands( + session_id=session_id, paging_token=paging_token) + + paging_token = result.metadata.next_paging_token + + for cmd in result.results: + if not scd_id: + scd_id = cmd.id + + completed = cmd.finished_at is not None + + if completed: + if success is None: + success = True + + success = success and cmd.status_code == 0 + + if not completed: + return cmd.id, None + + return scd_id, success or False + + +def run_test_config( + local_dir: str, + project_id: str, + test_name: str, + test_config: Dict[Any, Any], + smoke_test: bool = False, + no_terminate: bool = False, + kick_off_only: bool = False, + check_progress: bool = False, + upload_artifacts: bool = True, +) -> Dict[Any, Any]: + """ + + Returns: + Dict with the following entries: + status (str): One of [finished, error, timeout] + command_link (str): Link to command (Anyscale web UI) + last_logs (str): Last logs (excerpt) to send to owner + artifacts (dict): Dict of artifacts + Key: Name + Value: S3 URL + """ + # Todo (mid-term): Support other cluster definitions + # (not only cluster configs) + cluster_config_rel_path = test_config["cluster"].get( + "cluster_config", None) + cluster_config = _load_config(local_dir, cluster_config_rel_path) + + app_config_rel_path = test_config["cluster"].get("app_config", None) + app_config = _load_config(local_dir, app_config_rel_path) + + compute_tpl_rel_path = test_config["cluster"].get("compute_template", None) + compute_tpl = _load_config(local_dir, compute_tpl_rel_path) + + stop_event = multiprocessing.Event() + result_queue = multiprocessing.Queue() + + session_name = f"{test_name}_{int(time.time())}" + + temp_dir = tempfile.mkdtemp() + + # Result and state files + results_json = test_config["run"].get("results", None) + if results_json is None: + results_json = "/tmp/release_test_out.json" + + state_json = test_config["run"].get("state", None) + if state_json is None: + state_json = "/tmp/release_test_state.json" + + env_vars = { + "RAY_ADDRESS": os.environ.get("RAY_ADDRESS", "auto"), + "TEST_OUTPUT_JSON": results_json, + "TEST_STATE_JSON": state_json, + "IS_SMOKE_TEST": "1" if smoke_test else "0", + } + + with open(os.path.join(local_dir, ".anyscale.yaml"), "wt") as f: + f.write(f"project_id: {project_id}") + os.chdir(local_dir) + + # Setup interface + # Unfortunately, there currently seems to be no great way to + # transfer files with the Anyscale SDK. + # So we use the session controller instead. + sdk = AnyscaleSDK(auth_token=GLOBAL_CONFIG["ANYSCALE_CLI_TOKEN"]) + + session_controller = SessionController( + api_client=instantiate_api_client( + cli_token=GLOBAL_CONFIG["ANYSCALE_CLI_TOKEN"], + host=GLOBAL_CONFIG["ANYSCALE_HOST"], + ), + anyscale_api_client=sdk.api_client, + ) + + timeout = test_config["run"].get("timeout", 1800) + if "RELEASE_OVERRIDE_TIMEOUT" in os.environ: + previous_timeout = timeout + timeout = int(os.environ.get("RELEASE_OVERRIDE_TIMEOUT", str(timeout))) + logger.warning(f"Release test timeout override: {timeout} " + f"(would have been {previous_timeout})") + + # If a test is long running, timeout does not mean it failed + is_long_running = test_config["run"].get("long_running", False) + + if test_config["run"].get("use_connect"): + assert not kick_off_only, \ + "Unsupported for running with Anyscale connect." + install_app_config_packages(app_config) + install_matching_ray() + + # Add information to results dict + def _update_results(results: Dict): + if "last_update" in results: + results["last_update_diff"] = time.time() - results["last_update"] + if smoke_test: + results["smoke_test"] = True + + def _process_finished_command(session_controller: SessionController, + scd_id: str, + results: Optional[Dict] = None): + logger.info("Command finished successfully.") + if results_json: + results = results or get_remote_json_content( + temp_dir=temp_dir, + session_name=session_name, + remote_file=results_json, + session_controller=session_controller, + ) + else: + results = {"passed": 1} + + _update_results(results) + + if scd_id: + logs = get_command_logs(session_controller, scd_id, + test_config.get("log_lines", 50)) + else: + logs = "No command found to fetch logs for" + + if upload_artifacts: + saved_artifacts = pull_artifacts_and_store_in_cloud( + temp_dir=temp_dir, + logs=logs, # Also save logs in cloud + session_name=session_name, + test_name=test_name, + artifacts=test_config.get("artifacts", {}), + session_controller=session_controller, + ) + + logger.info("Fetched results and stored on the cloud. Returning.") + else: + saved_artifacts = {} + logger.info("Usually I would have fetched the results and " + "artifacts and stored them on S3.") + + result_queue.put( + State( + "END", + time.time(), + { + "status": "finished", + "last_logs": logs, + "results": results, + "artifacts": saved_artifacts, + }, + )) + + # When running the test script in client mode, the finish command is a + # completed local process. + def _process_finished_client_command(returncode: int, logs: str): + if upload_artifacts: + saved_artifacts = pull_artifacts_and_store_in_cloud( + temp_dir=temp_dir, + logs=logs, # Also save logs in cloud + session_name=session_name, + test_name=test_name, + artifacts=None, + session_controller=None, + ) + logger.info("Stored results on the cloud. Returning.") + else: + saved_artifacts = {} + logger.info("Usually I would have fetched the results and " + "artifacts and stored them on S3.") + + if results_json: + results = get_local_json_content(local_file=results_json, ) + else: + results = { + "passed": int(returncode == 0), + } + + results["returncode"]: returncode + + _update_results(results) + + result_queue.put( + State( + "END", + time.time(), + { + "status": "finished", + "last_logs": logs, + "results": results, + "artifacts": saved_artifacts, + }, + )) + + def _run(logger): + anyscale.conf.CLI_TOKEN = GLOBAL_CONFIG["ANYSCALE_CLI_TOKEN"] + + session_id = None + scd_id = None + try: + # First, look for running sessions + session_id = search_running_session(sdk, project_id, session_name) + compute_tpl_name = None + app_config_name = None + if not session_id: + logger.info("No session found.") + # Start session + session_options = dict( + name=session_name, project_id=project_id) + + if cluster_config is not None: + logging.info("Starting session with cluster config") + cluster_config_str = json.dumps(cluster_config) + session_options["cluster_config"] = cluster_config_str + session_options["cloud_id"] = ( + GLOBAL_CONFIG["ANYSCALE_CLOUD_ID"], ) + session_options["uses_app_config"] = False + else: + logging.info("Starting session with app/compute config") + + # Find/create compute template + compute_tpl_id, compute_tpl_name = \ + create_or_find_compute_template( + sdk, project_id, compute_tpl) + + logger.info(f"Link to compute template: " + f"{anyscale_compute_tpl_url(compute_tpl_id)}") + + # Find/create app config + app_config_id, app_config_name = create_or_find_app_config( + sdk, project_id, app_config) + build_id = wait_for_build_or_raise(sdk, app_config_id) + + session_options["compute_template_id"] = compute_tpl_id + session_options["build_id"] = build_id + session_options["uses_app_config"] = True + + if not test_config["run"].get("use_connect"): + session_id = create_and_wait_for_session( + sdk=sdk, + stop_event=stop_event, + session_name=session_name, + session_options=session_options, + ) + + if test_config["run"].get("use_connect"): + assert compute_tpl_name, "Compute template must exist." + assert app_config_name, "Cluster environment must exist." + script_args = test_config["run"].get("args", []) + if smoke_test: + script_args += ["--smoke-test"] + min_workers = 0 + for node_type in compute_tpl["worker_node_types"]: + min_workers += node_type["min_workers"] + returncode, logs = run_job( + cluster_name=test_name, + compute_tpl_name=compute_tpl_name, + cluster_env_name=app_config_name, + job_name=session_name, + min_workers=min_workers, + script=test_config["run"]["script"], + script_args=script_args, + env_vars=env_vars) + _process_finished_client_command(returncode, logs) + return + + # Write test state json + test_state_file = os.path.join(local_dir, "test_state.json") + with open(test_state_file, "wt") as f: + json.dump({ + "start_time": time.time(), + "test_name": test_name + }, f) + + # Rsync up + logger.info("Syncing files to session...") + session_controller.push( + session_name=session_name, + source=None, + target=None, + config=None, + all_nodes=False, + ) + + logger.info("Syncing test state to session...") + session_controller.push( + session_name=session_name, + source=test_state_file, + target=state_json, + config=None, + all_nodes=False, + ) + + _check_stop(stop_event) + + # Optionally run preparation command + prepare_command = test_config["run"].get("prepare") + if prepare_command: + logger.info(f"Running preparation command: {prepare_command}") + run_session_command( + sdk=sdk, + session_id=session_id, + cmd_to_run=prepare_command, + stop_event=stop_event, + result_queue=result_queue, + env_vars=env_vars, + state_str="CMD_PREPARE") + + # Run release test command + cmd_to_run = test_config["run"]["script"] + " " + + args = test_config["run"].get("args", []) + if args: + cmd_to_run += " ".join(args) + " " + + if smoke_test: + cmd_to_run += " --smoke-test" + + scd_id, status_code = run_session_command( + sdk=sdk, + session_id=session_id, + cmd_to_run=cmd_to_run, + stop_event=stop_event, + result_queue=result_queue, + env_vars=env_vars, + state_str="CMD_RUN", + kick_off_only=kick_off_only) + + if not kick_off_only: + _process_finished_command( + session_controller=session_controller, scd_id=scd_id) + else: + result_queue.put( + State("END", time.time(), { + "status": "kickoff", + "last_logs": "" + })) + + except (ReleaseTestTimeoutError, Exception) as e: + logger.error(e, exc_info=True) + + logs = str(e) + if scd_id is not None: + try: + logs = get_command_logs(session_controller, scd_id, + test_config.get("log_lines", 50)) + except Exception as e2: + logger.error(e2, exc_info=True) + + # Long running tests are "finished" successfully when + # timed out + if isinstance(e, ReleaseTestTimeoutError) and is_long_running: + _process_finished_command( + session_controller=session_controller, scd_id=scd_id) + else: + result_queue.put( + State("END", time.time(), { + "status": "timeout", + "last_logs": logs + })) + finally: + if no_terminate: + logger.warning( + "`no_terminate` is set to True, so the session will " + "*not* be terminated!") + else: + _cleanup_session(sdk, session_id) + + def _check_progress(logger): + anyscale.conf.CLI_TOKEN = GLOBAL_CONFIG["ANYSCALE_CLI_TOKEN"] + + should_terminate = False + session_id = None + scd_id = None + try: + existing_session = find_session_by_test_name( + sdk=sdk, + session_controller=session_controller, + temp_dir=temp_dir, + state_json=state_json, + project_id=project_id, + test_name=test_name) + + if existing_session is None: + logger.info(f"Found no existing session for {test_name}") + result_queue.put( + State("END", time.time(), { + "status": "nosession", + "last_logs": "" + })) + return + + session_id, session_name, session_state = existing_session + + logger.info(f"Found existing session for {test_name}: " + f"{session_name}") + + scd_id, success = get_latest_running_command_id( + sdk=sdk, session_id=session_id) + + latest_result = get_remote_json_content( + temp_dir=temp_dir, + session_name=session_name, + remote_file=results_json, + session_controller=session_controller, + ) + + # Fetch result json and check if it has been updated recently + result_time_key = test_config["run"].get("time_key", None) + maximum_update_delay = test_config["run"].get( + "max_update_delay", None) + + if result_time_key and maximum_update_delay: + last_update = latest_result.get(result_time_key, None) + + if not last_update: + result_queue.put( + State( + "END", time.time(), { + "status": "error", + "last_logs": f"Test did not store " + f"{result_time_key} in the " + f"results json." + })) + return + + delay = time.time() - last_update + logger.info(f"Last update was at {last_update:.2f}. " + f"This was {delay:.2f} seconds ago " + f"(maximum allowed: {maximum_update_delay})") + + if delay > maximum_update_delay: + raise RuntimeError( + f"Test did not update the results json within " + f"the last {maximum_update_delay} seconds.") + + if time.time() - session_state["start_time"] > timeout: + # Long running test reached timeout + logger.info( + f"Test command reached timeout after {timeout} seconds") + _process_finished_command( + session_controller=session_controller, + scd_id=scd_id, + results=latest_result) + should_terminate = True + + elif success: + logger.info("All commands finished.") + _process_finished_command( + session_controller=session_controller, + scd_id=scd_id, + results=latest_result) + should_terminate = True + + else: + rest_time = timeout - time.time() + session_state["start_time"] + logger.info(f"Test command should continue running " + f"for {rest_time} seconds") + result_queue.put( + State("END", time.time(), { + "status": "kickoff", + "last_logs": "Test is still running" + })) + + except Exception as e: + logger.error(e, exc_info=True) + + logs = str(e) + if scd_id is not None: + try: + logs = get_command_logs(session_controller, scd_id, + test_config.get("log_lines", 50)) + logs += f"\n{str(e)}" + except Exception as e2: + logger.error(e2, exc_info=True) + + result_queue.put( + State("END", time.time(), { + "status": "error", + "last_logs": logs + })) + should_terminate = True + finally: + if should_terminate: + logger.warning("Terminating session") + _cleanup_session(sdk, session_id) + + if not check_progress: + process = multiprocessing.Process(target=_run, args=(logger, )) + else: + process = multiprocessing.Process( + target=_check_progress, args=(logger, )) + + build_timeout = test_config["run"].get("build_timeout", 1800) + + project_url = anyscale_project_url( + project_id=GLOBAL_CONFIG["ANYSCALE_PROJECT"]) + logger.info(f"Link to project: {project_url}") + + msg = f"This will now run test {test_name}." + if smoke_test: + msg += " This is a smoke test." + if is_long_running: + msg += " This is a long running test." + logger.info(msg) + + logger.info(f"Starting process with timeout {timeout} " + f"(build timeout {build_timeout})") + process.start() + + # The timeout time will be updated after the build finished + # Build = App config + compute template build and session start + timeout_time = time.time() + build_timeout + + result = {} + while process.is_alive(): + try: + state: State = result_queue.get(timeout=1) + except (Empty, TimeoutError): + if time.time() > timeout_time: + stop_event.set() + logger.warning("Process timed out.") + + if not is_long_running: + logger.warning("Terminating process in 10 seconds.") + time.sleep(10) + logger.warning("Terminating process now.") + process.terminate() + else: + logger.info("Process is long running. Give 2 minutes to " + "fetch result and terminate.") + start_terminate = time.time() + while time.time( + ) < start_terminate + 120 and process.is_alive(): + time.sleep(1) + if process.is_alive(): + logger.warning("Terminating forcefully now.") + process.terminate() + else: + logger.info("Long running results collected.") + break + continue + + if not isinstance(state, State): + raise RuntimeError(f"Expected `State` object, got {result}") + + if state.state == "CMD_PREPARE": + # Reset timeout after build finished + timeout_time = state.timestamp + timeout + + if state.state == "CMD_RUN": + # Reset timeout after prepare command or build finished + timeout_time = state.timestamp + timeout + + elif state.state == "END": + result = state.data + break + + while not result_queue.empty(): + state = result_queue.get_nowait() + result = state.data + + logger.info("Final check if everything worked.") + try: + result.setdefault("status", "error (status not found)") + except (TimeoutError, Empty): + result = {"status": "timeout", "last_logs": "Test timed out."} + + logger.info(f"Final results: {result}") + + shutil.rmtree(temp_dir) + + return result + + +def run_test(test_config_file: str, + test_name: str, + project_id: str, + category: str = "unspecified", + smoke_test: bool = False, + no_terminate: bool = False, + kick_off_only: bool = False, + check_progress=False, + report=True): + with open(test_config_file, "rt") as f: + test_configs = yaml.load(f, Loader=yaml.FullLoader) + + test_config_dict = {} + for test_config in test_configs: + name = test_config.pop("name") + test_config_dict[name] = test_config + + if test_name not in test_config_dict: + raise ValueError( + f"Test with name `{test_name}` not found in test config file " + f"at `{test_config_file}`.") + + test_config = test_config_dict[test_name] + + if smoke_test and "smoke_test" in test_config: + smoke_test_config = test_config.pop("smoke_test") + test_config = _deep_update(test_config, smoke_test_config) + + local_dir = os.path.dirname(test_config_file) + if "local_dir" in test_config: + # local_dir is relative to test_config_file + local_dir = os.path.join(local_dir, test_config["local_dir"]) + + if test_config["run"].get("use_connect"): + assert not kick_off_only, \ + "--kick-off-only is unsupported when running with " \ + "Anyscale connect." + assert not check_progress, \ + "--check is unsupported when running with Anyscale connect." + if test_config.get("artifacts", {}): + logger.error( + "Saving artifacts are not yet supported when running with " + "Anyscale connect.") + + result = run_test_config( + local_dir, + project_id, + test_name, + test_config, + smoke_test=smoke_test, + no_terminate=no_terminate, + kick_off_only=kick_off_only, + check_progress=check_progress, + upload_artifacts=report) + + status = result.get("status", "invalid") + + if kick_off_only: + if status != "kickoff": + raise RuntimeError("Error kicking off test.") + + logger.info("Kicked off test. It's now up to the `--check` " + "part of the script to track its process.") + return + else: + # `--check` or no kick off only + + if status == "nosession": + logger.info(f"No running session found for test {test_name}, so " + f"assuming everything is fine.") + return + + if status == "kickoff": + logger.info(f"Test {test_name} is still running.") + return + + last_logs = result.get("last_logs", "No logs.") + + test_suite = os.path.basename(test_config_file).replace(".yaml", "") + + report_kwargs = dict( + test_suite=test_suite, + test_name=test_name, + status=status, + logs=last_logs, + results=result.get("results", {}), + artifacts=result.get("artifacts", {}), + category=category, + ) + + if report: + report_result(**report_kwargs) + else: + logger.info(f"Usually I would now report the following results:\n" + f"{report_kwargs}") + + if has_errored(result): + raise RuntimeError(last_logs) + + return + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--test-config", type=str, required=True, help="Test config file") + parser.add_argument("--test-name", type=str, help="Test name in config") + parser.add_argument( + "--ray-wheels", required=False, type=str, help="URL to ray wheels") + parser.add_argument( + "--no-terminate", + action="store_true", + default=False, + help="Don't terminate session after failure") + parser.add_argument( + "--no-report", + action="store_true", + default=False, + help="Do not report any results or upload to S3") + parser.add_argument( + "--kick-off-only", + action="store_true", + default=False, + help="Kick off only (don't wait for command to finish)") + parser.add_argument( + "--check", + action="store_true", + default=False, + help="Check (long running) status") + parser.add_argument( + "--category", + type=str, + default="unspecified", + help="Category name, e.g. `release-1.3.0` (will be saved in database)") + parser.add_argument( + "--smoke-test", action="store_true", help="Finish quickly for testing") + args, _ = parser.parse_known_args() + + if not GLOBAL_CONFIG["ANYSCALE_PROJECT"]: + raise RuntimeError( + "You have to set the ANYSCALE_PROJECT environment variable!") + + maybe_fetch_api_token() + + if args.ray_wheels: + os.environ["RAY_WHEELS"] = str(args.ray_wheels) + elif not args.check: + url = find_ray_wheels( + GLOBAL_CONFIG["RAY_REPO"], + GLOBAL_CONFIG["RAY_BRANCH"], + GLOBAL_CONFIG["RAY_VERSION"], + ) + if not url: + raise RuntimeError(f"Could not find wheels for " + f"Ray {GLOBAL_CONFIG['RAY_VERSION']}, " + f"branch {GLOBAL_CONFIG['RAY_BRANCH']}") + + test_config_file = os.path.abspath(os.path.expanduser(args.test_config)) + + run_test( + test_config_file=test_config_file, + test_name=args.test_name, + project_id=GLOBAL_CONFIG["ANYSCALE_PROJECT"], + category=args.category, + smoke_test=args.smoke_test, + no_terminate=args.no_terminate or args.kick_off_only, + kick_off_only=args.kick_off_only, + check_progress=args.check, + report=not args.no_report, + ) diff --git a/release/requirements.txt b/release/requirements.txt new file mode 100644 index 0000000000000..e5e41aefb3114 --- /dev/null +++ b/release/requirements.txt @@ -0,0 +1,15 @@ +ray +click +anyscale +slackclient +boto3 +PyGithub +pydantic +pyyaml +typer[all] +toml +python-dotenv +expiringdict +requests +pytz +git+https://github.com/ray-project/xgboost_ray.git#xgboost_ray