forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
/
serve_failure.py
159 lines (130 loc) · 4.57 KB
/
serve_failure.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import os
import random
import string
import time
import requests
import ray
from ray import serve
from ray.cluster_utils import Cluster
from ray._private.test_utils import safe_write_to_results_json
# Global variables / constants appear only right after imports.
# Ray serve deployment setup constants
NUM_REPLICAS = 7
MAX_BATCH_SIZE = 16
# Cluster setup constants
NUM_REDIS_SHARDS = 1
REDIS_MAX_MEMORY = 10**8
OBJECT_STORE_MEMORY = 10**8
NUM_NODES = 4
# RandomTest setup constants
CPUS_PER_NODE = 10
RAY_UNIT_TEST = "RAY_UNIT_TEST" in os.environ
def update_progress(result):
"""
Write test result json to /tmp/, which will be read from
anyscale product runs in each releaser test
"""
result["last_update"] = time.time()
safe_write_to_results_json(result)
cluster = Cluster()
for i in range(NUM_NODES):
cluster.add_node(
redis_port=6379 if i == 0 else None,
num_redis_shards=NUM_REDIS_SHARDS if i == 0 else None,
num_cpus=16,
num_gpus=0,
resources={str(i): 2},
object_store_memory=OBJECT_STORE_MEMORY,
redis_max_memory=REDIS_MAX_MEMORY,
dashboard_host="0.0.0.0",
)
ray.init(
namespace="serve_failure_test",
address=cluster.address,
dashboard_host="0.0.0.0",
log_to_driver=True,
)
serve.start(detached=True)
@ray.remote
class RandomKiller:
def __init__(self, kill_period_s=1):
self.kill_period_s = kill_period_s
def _get_all_serve_actors(self):
controller = serve.context.get_global_client()._controller
routers = list(ray.get(controller.get_http_proxies.remote()).values())
all_handles = routers + [controller]
worker_handle_dict = ray.get(controller._all_running_replicas.remote())
for _, replica_info_list in worker_handle_dict.items():
for replica_info in replica_info_list:
all_handles.append(replica_info.actor_handle)
return all_handles
def run(self):
while True:
chosen = random.choice(self._get_all_serve_actors())
print(f"Killing {chosen}")
ray.kill(chosen, no_restart=False)
time.sleep(self.kill_period_s)
class RandomTest:
def __init__(self, max_deployments=1):
self.max_deployments = max_deployments
self.weighted_actions = [
(self.create_deployment, 1),
(self.verify_deployment, 4),
]
self.deployments = []
for _ in range(max_deployments):
self.create_deployment()
def create_deployment(self):
if len(self.deployments) == self.max_deployments:
deployment_to_delete = self.deployments.pop()
serve.get_deployment(deployment_to_delete).delete()
new_name = "".join([random.choice(string.ascii_letters) for _ in range(10)])
@serve.deployment(name=new_name)
def handler(self, *args):
return new_name
handler.deploy()
self.deployments.append(new_name)
def verify_deployment(self):
deployment = random.choice(self.deployments)
for _ in range(100):
try:
r = requests.get("http:https://127.0.0.1:8000/" + deployment)
assert r.text == deployment
except Exception:
print("Request to {} failed.".format(deployment))
time.sleep(0.01)
def run(self):
iteration = 0
start_time = time.time()
previous_time = start_time
while True:
for _ in range(20):
actions, weights = zip(*self.weighted_actions)
action_chosen = random.choices(actions, weights=weights)[0]
print(f"Executing {action_chosen}")
action_chosen()
new_time = time.time()
print(
"Iteration {}:\n"
" - Iteration time: {}.\n"
" - Absolute time: {}.\n"
" - Total elapsed time: {}.".format(
iteration, new_time - previous_time, new_time, new_time - start_time
)
)
update_progress(
{
"iteration": iteration,
"iteration_time": new_time - previous_time,
"absolute_time": new_time,
"elapsed_time": new_time - start_time,
}
)
previous_time = new_time
iteration += 1
if RAY_UNIT_TEST:
break
tester = RandomTest(max_deployments=NUM_NODES * CPUS_PER_NODE)
random_killer = RandomKiller.remote()
random_killer.run.remote()
tester.run()