diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index b61bff3b7bace..da02cc8e3002e 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -3,6 +3,7 @@ import fnmatch import functools import io +import json import logging import math import os @@ -1801,3 +1802,19 @@ def wandb_populate_run_location_hook(): os.environ[WANDB_PROJECT_ENV_VAR] = "test_project" os.environ[WANDB_GROUP_ENV_VAR] = "test_group" + + +def safe_write_to_results_json( + result: str, + default_file_name: str = "/tmp/release_test_output.json", + env_var: Optional[str] = "TEST_OUTPUT_JSON", +): + """ + Safe (atomic) write to file to guard against malforming the json + if the job gets interrupted in the middle of writing. + """ + test_output_json = os.environ.get(env_var, default_file_name) + test_output_json_tmp = test_output_json + ".tmp" + with open(test_output_json_tmp, "wt") as f: + json.dump(result, f) + os.replace(test_output_json_tmp, test_output_json) diff --git a/python/ray/tune/utils/release_test_util.py b/python/ray/tune/utils/release_test_util.py index cb57ff10dd70a..6106567e9c56d 100644 --- a/python/ray/tune/utils/release_test_util.py +++ b/python/ray/tune/utils/release_test_util.py @@ -8,6 +8,7 @@ from ray import tune from ray.tune.callback import Callback +from ray._private.test_utils import safe_write_to_results_json class ProgressCallback(Callback): @@ -23,11 +24,7 @@ def on_step_end(self, iteration, trials, **kwargs): "iteration": iteration, "trial_states": dict(Counter([trial.status for trial in trials])), } - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result, "/tmp/release_test_out.json") self.last_update = now diff --git a/release/long_running_tests/workloads/actor_deaths.py b/release/long_running_tests/workloads/actor_deaths.py index 281435ba11c03..a9b2bd39fd410 100644 --- a/release/long_running_tests/workloads/actor_deaths.py +++ b/release/long_running_tests/workloads/actor_deaths.py @@ -1,23 +1,16 @@ # This workload tests repeatedly killing actors and submitting tasks to them. - -import json import numpy as np -import os import sys import time import ray from ray.cluster_utils import Cluster -from ray._private.test_utils import monitor_memory_usage +from ray._private.test_utils import monitor_memory_usage, safe_write_to_results_json def update_progress(result): result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) num_redis_shards = 1 diff --git a/release/long_running_tests/workloads/long_running_many_jobs.py b/release/long_running_tests/workloads/long_running_many_jobs.py index 69061dc2eb3ed..11b7867b42cea 100644 --- a/release/long_running_tests/workloads/long_running_many_jobs.py +++ b/release/long_running_tests/workloads/long_running_many_jobs.py @@ -8,7 +8,6 @@ """ import argparse -import json import os import time import random @@ -17,6 +16,7 @@ from ray.dashboard.modules.job.pydantic_models import JobDetails import ray from ray.job_submission import JobSubmissionClient +from ray._private.test_utils import safe_write_to_results_json NUM_CLIENTS = 4 NUM_JOBS_PER_BATCH = 4 @@ -130,8 +130,6 @@ def submit_batch_jobs( result = { "time_taken": time_taken, } - test_output_json = os.environ.get("TEST_OUTPUT_JSON", "/tmp/jobs_basic.json") - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result, "/tmp/jobs_basic.json") print("PASSED") diff --git a/release/long_running_tests/workloads/many_actor_tasks.py b/release/long_running_tests/workloads/many_actor_tasks.py index 037f70fd8a3d7..df7b997c7a3d6 100644 --- a/release/long_running_tests/workloads/many_actor_tasks.py +++ b/release/long_running_tests/workloads/many_actor_tasks.py @@ -1,21 +1,16 @@ # This workload tests submitting many actor methods. -import json -import os import time import numpy as np import ray from ray.cluster_utils import Cluster +from ray._private.test_utils import safe_write_to_results_json def update_progress(result): result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) num_redis_shards = 5 diff --git a/release/long_running_tests/workloads/many_drivers.py b/release/long_running_tests/workloads/many_drivers.py index a15d58fa4ca12..f14420062f700 100644 --- a/release/long_running_tests/workloads/many_drivers.py +++ b/release/long_running_tests/workloads/many_drivers.py @@ -1,21 +1,15 @@ # This workload tests many drivers using the same cluster. -import json -import os import time import argparse import ray from ray.cluster_utils import Cluster -from ray._private.test_utils import run_string_as_driver +from ray._private.test_utils import run_string_as_driver, safe_write_to_results_json def update_progress(result): result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) num_redis_shards = 5 diff --git a/release/long_running_tests/workloads/many_tasks.py b/release/long_running_tests/workloads/many_tasks.py index 22271cd5a7494..380ba951ee335 100644 --- a/release/long_running_tests/workloads/many_tasks.py +++ b/release/long_running_tests/workloads/many_tasks.py @@ -1,21 +1,16 @@ # This workload tests submitting and getting many tasks over and over. -import json -import os import time import numpy as np import ray from ray.cluster_utils import Cluster +from ray._private.test_utils import safe_write_to_results_json def update_progress(result): result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) num_redis_shards = 5 diff --git a/release/long_running_tests/workloads/many_tasks_serialized_ids.py b/release/long_running_tests/workloads/many_tasks_serialized_ids.py index 172bbf442fe54..27dcf59952844 100644 --- a/release/long_running_tests/workloads/many_tasks_serialized_ids.py +++ b/release/long_running_tests/workloads/many_tasks_serialized_ids.py @@ -1,7 +1,5 @@ # This workload stresses distributed reference counting by passing and # returning serialized ObjectRefs. -import json -import os import time import random @@ -9,15 +7,12 @@ import ray from ray.cluster_utils import Cluster +from ray._private.test_utils import safe_write_to_results_json def update_progress(result): result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) num_redis_shards = 5 diff --git a/release/long_running_tests/workloads/node_failures.py b/release/long_running_tests/workloads/node_failures.py index 1d77e2c2e33b6..1aff8b86193a5 100644 --- a/release/long_running_tests/workloads/node_failures.py +++ b/release/long_running_tests/workloads/node_failures.py @@ -1,20 +1,14 @@ # This workload tests repeatedly killing a node and adding a new node. -import json -import os import time import ray from ray.cluster_utils import Cluster -from ray._private.test_utils import get_other_nodes +from ray._private.test_utils import get_other_nodes, safe_write_to_results_json def update_progress(result): result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) num_redis_shards = 5 diff --git a/release/long_running_tests/workloads/serve.py b/release/long_running_tests/workloads/serve.py index 4327aae4d5607..bc3598fe3e878 100644 --- a/release/long_running_tests/workloads/serve.py +++ b/release/long_running_tests/workloads/serve.py @@ -1,5 +1,3 @@ -import json -import os import re import time import subprocess @@ -10,6 +8,7 @@ import ray from ray import serve from ray.cluster_utils import Cluster +from ray._private.test_utils import safe_write_to_results_json # Global variables / constants appear only right after imports. # Ray serve deployment setup constants @@ -36,11 +35,7 @@ def update_progress(result): anyscale product runs in each releaser test """ result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) cluster = Cluster() diff --git a/release/long_running_tests/workloads/serve_failure.py b/release/long_running_tests/workloads/serve_failure.py index d9301cd383fcc..fa94bfeef31cc 100644 --- a/release/long_running_tests/workloads/serve_failure.py +++ b/release/long_running_tests/workloads/serve_failure.py @@ -1,4 +1,3 @@ -import json import os import random import string @@ -9,6 +8,7 @@ import ray from ray import serve from ray.cluster_utils import Cluster +from ray._private.test_utils import safe_write_to_results_json # Global variables / constants appear only right after imports. # Ray serve deployment setup constants @@ -33,11 +33,7 @@ def update_progress(result): anyscale product runs in each releaser test """ result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) cluster = Cluster() diff --git a/release/runtime_env_tests/workloads/rte_many_tasks_actors.py b/release/runtime_env_tests/workloads/rte_many_tasks_actors.py index 0e7caf1306c93..a83507a5da439 100644 --- a/release/runtime_env_tests/workloads/rte_many_tasks_actors.py +++ b/release/runtime_env_tests/workloads/rte_many_tasks_actors.py @@ -11,17 +11,13 @@ import ray import random import os -import json import time +from ray._private.test_utils import safe_write_to_results_json def update_progress(result): result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) if __name__ == "__main__": diff --git a/release/runtime_env_tests/workloads/wheel_urls.py b/release/runtime_env_tests/workloads/wheel_urls.py index 0b0a4ae6953c4..c32c7646a51f6 100644 --- a/release/runtime_env_tests/workloads/wheel_urls.py +++ b/release/runtime_env_tests/workloads/wheel_urls.py @@ -15,23 +15,18 @@ """ import ray -import os -import json import time import requests import pprint import ray._private.ray_constants as ray_constants from ray._private.utils import get_master_wheel_url, get_release_wheel_url +from ray._private.test_utils import safe_write_to_results_json def update_progress(result): result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) if __name__ == "__main__":