Skip to content

Commit

Permalink
[release] Guard against malformed jsons (ray-project#31727)
Browse files Browse the repository at this point in the history
It is possible for a long running job to be interrupted in the middle of writing to results json. That will cause the file to be malformed and fail the test. This PR tries to guard against this by first saving to a temporary file and then using os.replace to change the target json.

Signed-off-by: Antoni Baum <[email protected]>
  • Loading branch information
Yard1 committed Jan 19, 2023
1 parent b3a6a58 commit 3735ba4
Show file tree
Hide file tree
Showing 13 changed files with 41 additions and 81 deletions.
17 changes: 17 additions & 0 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import fnmatch
import functools
import io
import json
import logging
import math
import os
Expand Down Expand Up @@ -1801,3 +1802,19 @@ def wandb_populate_run_location_hook():

os.environ[WANDB_PROJECT_ENV_VAR] = "test_project"
os.environ[WANDB_GROUP_ENV_VAR] = "test_group"


def safe_write_to_results_json(
result: str,
default_file_name: str = "/tmp/release_test_output.json",
env_var: Optional[str] = "TEST_OUTPUT_JSON",
):
"""
Safe (atomic) write to file to guard against malforming the json
if the job gets interrupted in the middle of writing.
"""
test_output_json = os.environ.get(env_var, default_file_name)
test_output_json_tmp = test_output_json + ".tmp"
with open(test_output_json_tmp, "wt") as f:
json.dump(result, f)
os.replace(test_output_json_tmp, test_output_json)
7 changes: 2 additions & 5 deletions python/ray/tune/utils/release_test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from ray import tune
from ray.tune.callback import Callback
from ray._private.test_utils import safe_write_to_results_json


class ProgressCallback(Callback):
Expand All @@ -23,11 +24,7 @@ def on_step_end(self, iteration, trials, **kwargs):
"iteration": iteration,
"trial_states": dict(Counter([trial.status for trial in trials])),
}
test_output_json = os.environ.get(
"TEST_OUTPUT_JSON", "/tmp/release_test.json"
)
with open(test_output_json, "wt") as f:
json.dump(result, f)
safe_write_to_results_json(result, "/tmp/release_test_out.json")

self.last_update = now

Expand Down
11 changes: 2 additions & 9 deletions release/long_running_tests/workloads/actor_deaths.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
# This workload tests repeatedly killing actors and submitting tasks to them.

import json
import numpy as np
import os
import sys
import time

import ray
from ray.cluster_utils import Cluster
from ray._private.test_utils import monitor_memory_usage
from ray._private.test_utils import monitor_memory_usage, safe_write_to_results_json


def update_progress(result):
result["last_update"] = time.time()
test_output_json = os.environ.get(
"TEST_OUTPUT_JSON", "/tmp/release_test_output.json"
)
with open(test_output_json, "wt") as f:
json.dump(result, f)
safe_write_to_results_json(result)


num_redis_shards = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
"""

import argparse
import json
import os
import time
import random
Expand All @@ -17,6 +16,7 @@
from ray.dashboard.modules.job.pydantic_models import JobDetails
import ray
from ray.job_submission import JobSubmissionClient
from ray._private.test_utils import safe_write_to_results_json

NUM_CLIENTS = 4
NUM_JOBS_PER_BATCH = 4
Expand Down Expand Up @@ -130,8 +130,6 @@ def submit_batch_jobs(
result = {
"time_taken": time_taken,
}
test_output_json = os.environ.get("TEST_OUTPUT_JSON", "/tmp/jobs_basic.json")
with open(test_output_json, "wt") as f:
json.dump(result, f)
safe_write_to_results_json(result, "/tmp/jobs_basic.json")

print("PASSED")
9 changes: 2 additions & 7 deletions release/long_running_tests/workloads/many_actor_tasks.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
# This workload tests submitting many actor methods.
import json
import os
import time

import numpy as np

import ray
from ray.cluster_utils import Cluster
from ray._private.test_utils import safe_write_to_results_json


def update_progress(result):
result["last_update"] = time.time()
test_output_json = os.environ.get(
"TEST_OUTPUT_JSON", "/tmp/release_test_output.json"
)
with open(test_output_json, "wt") as f:
json.dump(result, f)
safe_write_to_results_json(result)


num_redis_shards = 5
Expand Down
10 changes: 2 additions & 8 deletions release/long_running_tests/workloads/many_drivers.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
# This workload tests many drivers using the same cluster.
import json
import os
import time
import argparse

import ray
from ray.cluster_utils import Cluster
from ray._private.test_utils import run_string_as_driver
from ray._private.test_utils import run_string_as_driver, safe_write_to_results_json


def update_progress(result):
result["last_update"] = time.time()
test_output_json = os.environ.get(
"TEST_OUTPUT_JSON", "/tmp/release_test_output.json"
)
with open(test_output_json, "wt") as f:
json.dump(result, f)
safe_write_to_results_json(result)


num_redis_shards = 5
Expand Down
9 changes: 2 additions & 7 deletions release/long_running_tests/workloads/many_tasks.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
# This workload tests submitting and getting many tasks over and over.
import json
import os
import time

import numpy as np

import ray
from ray.cluster_utils import Cluster
from ray._private.test_utils import safe_write_to_results_json


def update_progress(result):
result["last_update"] = time.time()
test_output_json = os.environ.get(
"TEST_OUTPUT_JSON", "/tmp/release_test_output.json"
)
with open(test_output_json, "wt") as f:
json.dump(result, f)
safe_write_to_results_json(result)


num_redis_shards = 5
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
# This workload stresses distributed reference counting by passing and
# returning serialized ObjectRefs.
import json
import os
import time
import random

import numpy as np

import ray
from ray.cluster_utils import Cluster
from ray._private.test_utils import safe_write_to_results_json


def update_progress(result):
result["last_update"] = time.time()
test_output_json = os.environ.get(
"TEST_OUTPUT_JSON", "/tmp/release_test_output.json"
)
with open(test_output_json, "wt") as f:
json.dump(result, f)
safe_write_to_results_json(result)


num_redis_shards = 5
Expand Down
10 changes: 2 additions & 8 deletions release/long_running_tests/workloads/node_failures.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
# This workload tests repeatedly killing a node and adding a new node.
import json
import os
import time

import ray
from ray.cluster_utils import Cluster
from ray._private.test_utils import get_other_nodes
from ray._private.test_utils import get_other_nodes, safe_write_to_results_json


def update_progress(result):
result["last_update"] = time.time()
test_output_json = os.environ.get(
"TEST_OUTPUT_JSON", "/tmp/release_test_output.json"
)
with open(test_output_json, "wt") as f:
json.dump(result, f)
safe_write_to_results_json(result)


num_redis_shards = 5
Expand Down
9 changes: 2 additions & 7 deletions release/long_running_tests/workloads/serve.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import json
import os
import re
import time
import subprocess
Expand All @@ -10,6 +8,7 @@
import ray
from ray import serve
from ray.cluster_utils import Cluster
from ray._private.test_utils import safe_write_to_results_json

# Global variables / constants appear only right after imports.
# Ray serve deployment setup constants
Expand All @@ -36,11 +35,7 @@ def update_progress(result):
anyscale product runs in each releaser test
"""
result["last_update"] = time.time()
test_output_json = os.environ.get(
"TEST_OUTPUT_JSON", "/tmp/release_test_output.json"
)
with open(test_output_json, "wt") as f:
json.dump(result, f)
safe_write_to_results_json(result)


cluster = Cluster()
Expand Down
8 changes: 2 additions & 6 deletions release/long_running_tests/workloads/serve_failure.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import os
import random
import string
Expand All @@ -9,6 +8,7 @@
import ray
from ray import serve
from ray.cluster_utils import Cluster
from ray._private.test_utils import safe_write_to_results_json

# Global variables / constants appear only right after imports.
# Ray serve deployment setup constants
Expand All @@ -33,11 +33,7 @@ def update_progress(result):
anyscale product runs in each releaser test
"""
result["last_update"] = time.time()
test_output_json = os.environ.get(
"TEST_OUTPUT_JSON", "/tmp/release_test_output.json"
)
with open(test_output_json, "wt") as f:
json.dump(result, f)
safe_write_to_results_json(result)


cluster = Cluster()
Expand Down
8 changes: 2 additions & 6 deletions release/runtime_env_tests/workloads/rte_many_tasks_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,13 @@
import ray
import random
import os
import json
import time
from ray._private.test_utils import safe_write_to_results_json


def update_progress(result):
result["last_update"] = time.time()
test_output_json = os.environ.get(
"TEST_OUTPUT_JSON", "/tmp/release_test_output.json"
)
with open(test_output_json, "wt") as f:
json.dump(result, f)
safe_write_to_results_json(result)


if __name__ == "__main__":
Expand Down
9 changes: 2 additions & 7 deletions release/runtime_env_tests/workloads/wheel_urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,18 @@
"""

import ray
import os
import json
import time
import requests
import pprint

import ray._private.ray_constants as ray_constants
from ray._private.utils import get_master_wheel_url, get_release_wheel_url
from ray._private.test_utils import safe_write_to_results_json


def update_progress(result):
result["last_update"] = time.time()
test_output_json = os.environ.get(
"TEST_OUTPUT_JSON", "/tmp/release_test_output.json"
)
with open(test_output_json, "wt") as f:
json.dump(result, f)
safe_write_to_results_json(result)


if __name__ == "__main__":
Expand Down

0 comments on commit 3735ba4

Please sign in to comment.