From 3735ba410ddfa63cc8516f69fa8b3997259a3f11 Mon Sep 17 00:00:00 2001 From: Antoni Baum Date: Thu, 19 Jan 2023 12:42:17 -0800 Subject: [PATCH] [release] Guard against malformed jsons (#31727) It is possible for a long running job to be interrupted in the middle of writing to results json. That will cause the file to be malformed and fail the test. This PR tries to guard against this by first saving to a temporary file and then using os.replace to change the target json. Signed-off-by: Antoni Baum --- python/ray/_private/test_utils.py | 17 +++++++++++++++++ python/ray/tune/utils/release_test_util.py | 7 ++----- .../workloads/actor_deaths.py | 11 ++--------- .../workloads/long_running_many_jobs.py | 6 ++---- .../workloads/many_actor_tasks.py | 9 ++------- .../workloads/many_drivers.py | 10 ++-------- .../long_running_tests/workloads/many_tasks.py | 9 ++------- .../workloads/many_tasks_serialized_ids.py | 9 ++------- .../workloads/node_failures.py | 10 ++-------- release/long_running_tests/workloads/serve.py | 9 ++------- .../workloads/serve_failure.py | 8 ++------ .../workloads/rte_many_tasks_actors.py | 8 ++------ .../runtime_env_tests/workloads/wheel_urls.py | 9 ++------- 13 files changed, 41 insertions(+), 81 deletions(-) diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index b61bff3b7bace..da02cc8e3002e 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -3,6 +3,7 @@ import fnmatch import functools import io +import json import logging import math import os @@ -1801,3 +1802,19 @@ def wandb_populate_run_location_hook(): os.environ[WANDB_PROJECT_ENV_VAR] = "test_project" os.environ[WANDB_GROUP_ENV_VAR] = "test_group" + + +def safe_write_to_results_json( + result: str, + default_file_name: str = "/tmp/release_test_output.json", + env_var: Optional[str] = "TEST_OUTPUT_JSON", +): + """ + Safe (atomic) write to file to guard against malforming the json + if the job gets interrupted in the middle of writing. + """ + test_output_json = os.environ.get(env_var, default_file_name) + test_output_json_tmp = test_output_json + ".tmp" + with open(test_output_json_tmp, "wt") as f: + json.dump(result, f) + os.replace(test_output_json_tmp, test_output_json) diff --git a/python/ray/tune/utils/release_test_util.py b/python/ray/tune/utils/release_test_util.py index cb57ff10dd70a..6106567e9c56d 100644 --- a/python/ray/tune/utils/release_test_util.py +++ b/python/ray/tune/utils/release_test_util.py @@ -8,6 +8,7 @@ from ray import tune from ray.tune.callback import Callback +from ray._private.test_utils import safe_write_to_results_json class ProgressCallback(Callback): @@ -23,11 +24,7 @@ def on_step_end(self, iteration, trials, **kwargs): "iteration": iteration, "trial_states": dict(Counter([trial.status for trial in trials])), } - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result, "/tmp/release_test_out.json") self.last_update = now diff --git a/release/long_running_tests/workloads/actor_deaths.py b/release/long_running_tests/workloads/actor_deaths.py index 281435ba11c03..a9b2bd39fd410 100644 --- a/release/long_running_tests/workloads/actor_deaths.py +++ b/release/long_running_tests/workloads/actor_deaths.py @@ -1,23 +1,16 @@ # This workload tests repeatedly killing actors and submitting tasks to them. - -import json import numpy as np -import os import sys import time import ray from ray.cluster_utils import Cluster -from ray._private.test_utils import monitor_memory_usage +from ray._private.test_utils import monitor_memory_usage, safe_write_to_results_json def update_progress(result): result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) num_redis_shards = 1 diff --git a/release/long_running_tests/workloads/long_running_many_jobs.py b/release/long_running_tests/workloads/long_running_many_jobs.py index 69061dc2eb3ed..11b7867b42cea 100644 --- a/release/long_running_tests/workloads/long_running_many_jobs.py +++ b/release/long_running_tests/workloads/long_running_many_jobs.py @@ -8,7 +8,6 @@ """ import argparse -import json import os import time import random @@ -17,6 +16,7 @@ from ray.dashboard.modules.job.pydantic_models import JobDetails import ray from ray.job_submission import JobSubmissionClient +from ray._private.test_utils import safe_write_to_results_json NUM_CLIENTS = 4 NUM_JOBS_PER_BATCH = 4 @@ -130,8 +130,6 @@ def submit_batch_jobs( result = { "time_taken": time_taken, } - test_output_json = os.environ.get("TEST_OUTPUT_JSON", "/tmp/jobs_basic.json") - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result, "/tmp/jobs_basic.json") print("PASSED") diff --git a/release/long_running_tests/workloads/many_actor_tasks.py b/release/long_running_tests/workloads/many_actor_tasks.py index 037f70fd8a3d7..df7b997c7a3d6 100644 --- a/release/long_running_tests/workloads/many_actor_tasks.py +++ b/release/long_running_tests/workloads/many_actor_tasks.py @@ -1,21 +1,16 @@ # This workload tests submitting many actor methods. -import json -import os import time import numpy as np import ray from ray.cluster_utils import Cluster +from ray._private.test_utils import safe_write_to_results_json def update_progress(result): result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) num_redis_shards = 5 diff --git a/release/long_running_tests/workloads/many_drivers.py b/release/long_running_tests/workloads/many_drivers.py index a15d58fa4ca12..f14420062f700 100644 --- a/release/long_running_tests/workloads/many_drivers.py +++ b/release/long_running_tests/workloads/many_drivers.py @@ -1,21 +1,15 @@ # This workload tests many drivers using the same cluster. -import json -import os import time import argparse import ray from ray.cluster_utils import Cluster -from ray._private.test_utils import run_string_as_driver +from ray._private.test_utils import run_string_as_driver, safe_write_to_results_json def update_progress(result): result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) num_redis_shards = 5 diff --git a/release/long_running_tests/workloads/many_tasks.py b/release/long_running_tests/workloads/many_tasks.py index 22271cd5a7494..380ba951ee335 100644 --- a/release/long_running_tests/workloads/many_tasks.py +++ b/release/long_running_tests/workloads/many_tasks.py @@ -1,21 +1,16 @@ # This workload tests submitting and getting many tasks over and over. -import json -import os import time import numpy as np import ray from ray.cluster_utils import Cluster +from ray._private.test_utils import safe_write_to_results_json def update_progress(result): result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) num_redis_shards = 5 diff --git a/release/long_running_tests/workloads/many_tasks_serialized_ids.py b/release/long_running_tests/workloads/many_tasks_serialized_ids.py index 172bbf442fe54..27dcf59952844 100644 --- a/release/long_running_tests/workloads/many_tasks_serialized_ids.py +++ b/release/long_running_tests/workloads/many_tasks_serialized_ids.py @@ -1,7 +1,5 @@ # This workload stresses distributed reference counting by passing and # returning serialized ObjectRefs. -import json -import os import time import random @@ -9,15 +7,12 @@ import ray from ray.cluster_utils import Cluster +from ray._private.test_utils import safe_write_to_results_json def update_progress(result): result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) num_redis_shards = 5 diff --git a/release/long_running_tests/workloads/node_failures.py b/release/long_running_tests/workloads/node_failures.py index 1d77e2c2e33b6..1aff8b86193a5 100644 --- a/release/long_running_tests/workloads/node_failures.py +++ b/release/long_running_tests/workloads/node_failures.py @@ -1,20 +1,14 @@ # This workload tests repeatedly killing a node and adding a new node. -import json -import os import time import ray from ray.cluster_utils import Cluster -from ray._private.test_utils import get_other_nodes +from ray._private.test_utils import get_other_nodes, safe_write_to_results_json def update_progress(result): result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) num_redis_shards = 5 diff --git a/release/long_running_tests/workloads/serve.py b/release/long_running_tests/workloads/serve.py index 4327aae4d5607..bc3598fe3e878 100644 --- a/release/long_running_tests/workloads/serve.py +++ b/release/long_running_tests/workloads/serve.py @@ -1,5 +1,3 @@ -import json -import os import re import time import subprocess @@ -10,6 +8,7 @@ import ray from ray import serve from ray.cluster_utils import Cluster +from ray._private.test_utils import safe_write_to_results_json # Global variables / constants appear only right after imports. # Ray serve deployment setup constants @@ -36,11 +35,7 @@ def update_progress(result): anyscale product runs in each releaser test """ result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) cluster = Cluster() diff --git a/release/long_running_tests/workloads/serve_failure.py b/release/long_running_tests/workloads/serve_failure.py index d9301cd383fcc..fa94bfeef31cc 100644 --- a/release/long_running_tests/workloads/serve_failure.py +++ b/release/long_running_tests/workloads/serve_failure.py @@ -1,4 +1,3 @@ -import json import os import random import string @@ -9,6 +8,7 @@ import ray from ray import serve from ray.cluster_utils import Cluster +from ray._private.test_utils import safe_write_to_results_json # Global variables / constants appear only right after imports. # Ray serve deployment setup constants @@ -33,11 +33,7 @@ def update_progress(result): anyscale product runs in each releaser test """ result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) cluster = Cluster() diff --git a/release/runtime_env_tests/workloads/rte_many_tasks_actors.py b/release/runtime_env_tests/workloads/rte_many_tasks_actors.py index 0e7caf1306c93..a83507a5da439 100644 --- a/release/runtime_env_tests/workloads/rte_many_tasks_actors.py +++ b/release/runtime_env_tests/workloads/rte_many_tasks_actors.py @@ -11,17 +11,13 @@ import ray import random import os -import json import time +from ray._private.test_utils import safe_write_to_results_json def update_progress(result): result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) if __name__ == "__main__": diff --git a/release/runtime_env_tests/workloads/wheel_urls.py b/release/runtime_env_tests/workloads/wheel_urls.py index 0b0a4ae6953c4..c32c7646a51f6 100644 --- a/release/runtime_env_tests/workloads/wheel_urls.py +++ b/release/runtime_env_tests/workloads/wheel_urls.py @@ -15,23 +15,18 @@ """ import ray -import os -import json import time import requests import pprint import ray._private.ray_constants as ray_constants from ray._private.utils import get_master_wheel_url, get_release_wheel_url +from ray._private.test_utils import safe_write_to_results_json def update_progress(result): result["last_update"] = time.time() - test_output_json = os.environ.get( - "TEST_OUTPUT_JSON", "/tmp/release_test_output.json" - ) - with open(test_output_json, "wt") as f: - json.dump(result, f) + safe_write_to_results_json(result) if __name__ == "__main__":