-
Notifications
You must be signed in to change notification settings - Fork 5.6k
/
test_chaos_basic.py
238 lines (197 loc) · 7.1 KB
/
test_chaos_basic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import argparse
import json
import logging
import os
import random
import string
import time
import numpy as np
import ray
from ray._private.test_utils import monitor_memory_usage, wait_for_condition
from ray.data._internal.progress_bar import ProgressBar
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
def run_task_workload(total_num_cpus, smoke):
"""Run task-based workload that doesn't require object reconstruction."""
@ray.remote(num_cpus=1, max_retries=-1)
def task():
def generate_data(size_in_kb=10):
return np.zeros(1024 * size_in_kb, dtype=np.uint8)
a = ""
for _ in range(100000):
a = a + random.choice(string.ascii_letters)
return generate_data(size_in_kb=50)
@ray.remote(num_cpus=1, max_retries=-1)
def invoke_nested_task():
time.sleep(0.8)
return ray.get(task.remote())
multiplier = 75
# For smoke mode, run fewer tasks
if smoke:
multiplier = 1
TOTAL_TASKS = int(total_num_cpus * 2 * multiplier)
pb = ProgressBar("Chaos test", TOTAL_TASKS)
results = [invoke_nested_task.remote() for _ in range(TOTAL_TASKS)]
pb.block_until_complete(results)
pb.close()
# Consistency check.
wait_for_condition(
lambda: (
ray.cluster_resources().get("CPU", 0)
== ray.available_resources().get("CPU", 0)
),
timeout=60,
)
def run_actor_workload(total_num_cpus, smoke):
"""Run actor-based workload.
The test checks if actor restart -1 and task_retries -1 works
as expected. It basically requires many actors to report the
seqno to the centralized DB actor while there are failures.
If at least once is guaranteed upon failures, this test
shouldn't fail.
"""
@ray.remote(num_cpus=0)
class DBActor:
def __init__(self):
self.letter_dict = set()
def add(self, letter):
self.letter_dict.add(letter)
def get(self):
return self.letter_dict
@ray.remote(num_cpus=1, max_restarts=-1, max_task_retries=-1)
class ReportActor:
def __init__(self, db_actor):
self.db_actor = db_actor
def add(self, letter):
ray.get(self.db_actor.add.remote(letter))
NUM_CPUS = int(total_num_cpus)
multiplier = 2
# For smoke mode, run fewer tasks
if smoke:
multiplier = 1
TOTAL_TASKS = int(300 * multiplier)
head_node_id = ray.get_runtime_context().get_node_id()
db_actors = [
DBActor.options(
scheduling_strategy=NodeAffinitySchedulingStrategy(
node_id=head_node_id, soft=False
)
).remote()
for _ in range(NUM_CPUS)
]
pb = ProgressBar("Chaos test", TOTAL_TASKS * NUM_CPUS)
actors = []
for db_actor in db_actors:
actors.append(ReportActor.remote(db_actor))
results = []
highest_reported_num = 0
for a in actors:
for _ in range(TOTAL_TASKS):
results.append(a.add.remote(str(highest_reported_num)))
highest_reported_num += 1
pb.fetch_until_complete(results)
pb.close()
for actor in actors:
ray.kill(actor)
# Consistency check
wait_for_condition(
lambda: (
ray.cluster_resources().get("CPU", 0)
== ray.available_resources().get("CPU", 0)
),
timeout=60,
)
letter_set = set()
for db_actor in db_actors:
letter_set.update(ray.get(db_actor.get.remote()))
# Make sure the DB actor didn't lose any report.
# If this assert fails, that means at least once actor task semantic
# wasn't guaranteed.
for i in range(highest_reported_num):
assert str(i) in letter_set, i
def run_placement_group_workload(total_num_cpus, smoke):
raise NotImplementedError
def parse_script_args():
parser = argparse.ArgumentParser()
parser.add_argument("--node-kill-interval", type=int, default=60)
parser.add_argument("--workload", type=str)
parser.add_argument("--smoke", action="store_true")
return parser.parse_known_args()
def main():
"""Test task/actor/placement group basic chaos test.
Currently, it only tests node failures scenario.
Node failures are implemented by an actor that keeps calling
Raylet's KillRaylet RPC.
Ideally, we should setup the infra to cause machine failures/
network partitions/etc., but we don't do that for now.
In the short term, we will only test gRPC network delay +
node failures.
Currently, the test runs 3 steps. Each step records the
peak memory usage to observe the memory usage while there
are node failures.
Step 1: Warm up the cluster. It is needed to pre-start workers
if necessary.
Step 2: Start the test without a failure.
Step 3: Start the test with constant node failures.
"""
args, _ = parse_script_args()
logging.info("Received arguments: {}".format(args))
ray.init(address="auto")
total_num_cpus = ray.cluster_resources()["CPU"]
total_nodes = 0
for n in ray.nodes():
if n["Alive"]:
total_nodes += 1
monitor_actor = monitor_memory_usage()
workload = None
if args.workload == "tasks":
workload = run_task_workload
elif args.workload == "actors":
workload = run_actor_workload
elif args.workload == "pg":
workload = run_placement_group_workload
else:
assert False
# Step 1
print("Warm up... Prestarting workers if necessary.")
start = time.time()
workload(total_num_cpus, args.smoke)
print(f"Runtime when warm up: {time.time() - start}")
# Step 2
print("Running without failures")
start = time.time()
workload(total_num_cpus, args.smoke)
print(f"Runtime when there are no failures: {time.time() - start}")
used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote())
print("Memory usage without failures.")
print(f"Peak memory usage: {round(used_gb, 2)}GB")
print(f"Peak memory usage per processes:\n {usage}")
# Step 3
print("Running with failures")
start = time.time()
node_killer = ray.get_actor("RayletKiller", namespace="release_test_namespace")
node_killer.run.remote()
workload(total_num_cpus, args.smoke)
print(f"Runtime when there are many failures: {time.time() - start}")
print(f"Total node failures: " f"{ray.get(node_killer.get_total_killed.remote())}")
node_killer.stop_run.remote()
used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote())
print("Memory usage with failures.")
print(f"Peak memory usage: {round(used_gb, 2)}GB")
print(f"Peak memory usage per processes:\n {usage}")
# Report the result.
ray.get(monitor_actor.stop_run.remote())
print(
"Total number of killed nodes: "
f"{ray.get(node_killer.get_total_killed.remote())}"
)
with open(os.environ["TEST_OUTPUT_JSON"], "w") as f:
f.write(
json.dumps(
{
"success": 1,
"_peak_memory": round(used_gb, 2),
"_peak_process_memory": usage,
}
)
)
main()