forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rte_many_tasks_actors.py
124 lines (105 loc) · 3.81 KB
/
rte_many_tasks_actors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""Runtime env test with many tasks and actors
This test runs on four nodes and schedules many tasks and actors with
different runtime environments.
Test owner: architkulkarni
Acceptance criteria: Should run through and print "PASSED"
"""
import ray
import random
import os
import time
from ray._private.test_utils import safe_write_to_results_json
def update_progress(result):
result["last_update"] = time.time()
safe_write_to_results_json(result)
if __name__ == "__main__":
ray.init(address="auto", runtime_env={"pip": ["requests==2.18.0"]})
versions = ["2.16.0", "2.17.0", "2.18.0"]
envs = [{"pip": [f"requests=={versions[i]}"]} for i in range(len(versions) - 1)]
# If a task's env is {}, we should have requests==2.18.0 from the job's env
envs.append({})
NUM_TASK_ITERATIONS = 10
NUM_ACTOR_ITERATIONS = 10
NUM_CALLS_PER_ITERATION = 100
NUM_ENVS_PER_ITERATION = 4
if os.environ.get("IS_SMOKE_TEST") == "1":
NUM_TASK_ITERATIONS = 10
NUM_ACTOR_ITERATIONS = 10
NUM_CALLS_PER_ITERATION = 1
NUM_ENVS_PER_ITERATION = 1
print("Testing Tasks...")
start_time = time.time()
previous_time = start_time
@ray.remote
def check_version_task(expected_version: str):
import requests
assert requests.__version__ == expected_version, (
requests.__version__,
expected_version,
)
for i in range(NUM_TASK_ITERATIONS):
results = []
for j in range(NUM_ENVS_PER_ITERATION):
(env, expected_version) = random.choice(list(zip(envs, versions)))
remote_task = check_version_task.options(runtime_env=env)
results.extend(
[
remote_task.remote(expected_version)
for _ in range(NUM_CALLS_PER_ITERATION)
]
)
ray.get(results)
print(f"Finished tasks iteration {i+1}/{NUM_TASK_ITERATIONS}")
new_time = time.time()
update_progress(
{
"phase": "Tasks",
"iteration": i + 1,
"iteration_time": new_time - previous_time,
"absolute_time": new_time,
"elapsed_time": new_time - start_time,
}
)
previous_time = new_time
print("Testing Actors...")
@ray.remote
class TestActor:
def check_version(self, expected_version: str):
import requests
assert requests.__version__ == expected_version, (
requests.__version__,
expected_version,
)
def nested_check_version(self, expected_version: str):
ray.get(check_version_task.remote(expected_version))
for i in range(NUM_ACTOR_ITERATIONS):
results = []
for j in range(NUM_ENVS_PER_ITERATION):
env, expected_version = random.choice(list(zip(envs, versions)))
actor = TestActor.options(runtime_env=env).remote()
results.extend(
[
actor.check_version.remote(expected_version)
for _ in range(NUM_CALLS_PER_ITERATION)
]
)
results.extend(
[
actor.nested_check_version.remote(expected_version)
for _ in range(NUM_CALLS_PER_ITERATION)
]
)
ray.get(results)
print(f"Finished actors iteration {i+1}/{NUM_ACTOR_ITERATIONS}")
new_time = time.time()
update_progress(
{
"phase": "Actors",
"iteration": i + 1,
"iteration_time": new_time - previous_time,
"absolute_time": new_time,
"elapsed_time": new_time - start_time,
}
)
previous_time = new_time
print("PASSED")