forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_tracing.py
208 lines (155 loc) · 5.86 KB
/
test_tracing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import asyncio
import glob
import json
import os
import pytest
import shutil
import ray
from ray.test_utils import check_call_ray
from ray.util.tracing.setup_local_tmp_tracing import spans_dir
setup_tracing_path = "ray.util.tracing.setup_local_tmp_tracing:setup_tracing"
@pytest.fixture()
def cleanup_dirs():
"""Cleanup temporary spans_dir folder at beginning and end of test."""
if os.path.exists(spans_dir):
shutil.rmtree(spans_dir)
os.makedirs(spans_dir)
yield
# Enable tracing only sets up tracing once per driver process.
# We set ray.__traced__ to False here so that each
# test will re-set up tracing.
ray.__traced__ = False
if os.path.exists(spans_dir):
shutil.rmtree(spans_dir)
@pytest.fixture()
def ray_start_cli_tracing(scope="function"):
"""Start ray with tracing-startup-hook, and clean up at end of test."""
check_call_ray(["stop", "--force"], )
check_call_ray(
["start", "--head", "--tracing-startup-hook", setup_tracing_path], )
ray.init(address="auto")
yield
ray.shutdown()
check_call_ray(["stop", "--force"])
@pytest.fixture()
def ray_start_cli_predefined_actor_tracing(scope="function"):
"""Start ray with tracing-startup-hook, and clean up at end of test."""
check_call_ray(["stop", "--force"], )
check_call_ray(
["start", "--head", "--tracing-startup-hook", setup_tracing_path], )
yield
ray.shutdown()
check_call_ray(["stop", "--force"])
@pytest.fixture()
def ray_start_init_tracing(scope="function"):
"""Call ray.init with tracing-startup-hook, and clean up at end of test."""
ray.init(_tracing_startup_hook=setup_tracing_path)
yield
ray.shutdown()
def get_span_list():
"""Read span files and return list of span names."""
span_list = []
for entry in glob.glob(f"{spans_dir}/**/*.txt", recursive=True):
with open(entry) as f:
for line in f.readlines():
span_list.append(json.loads(line))
return span_list
def get_span_dict(span_list):
"""Given a list of span names, return dictionary of span names."""
span_names = {}
for span in span_list:
span_name = span["name"]
if span_name in span_names:
span_names[span_name] += 1
else:
span_names[span_name] = 1
return span_names
def task_helper():
"""Run a Ray task and check the spans produced."""
@ray.remote
def f(value):
return value + 1
obj_ref = f.remote(2)
ray.get(obj_ref)
span_list = get_span_list()
assert len(span_list) == 2
# The spans could show up in a different order, so just check that
# all spans are as expected
span_names = get_span_dict(span_list)
return span_names == {
"test_tracing.f ray.remote": 1,
"test_tracing.f ray.remote_worker": 1,
}
def sync_actor_helper(connect_to_cluster: bool = False):
"""Run a Ray sync actor and check the spans produced."""
@ray.remote
class Counter(object):
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
if connect_to_cluster:
ray.init(address="auto")
# Create an actor from this class.
counter = Counter.remote()
obj_ref = counter.increment.remote()
assert ray.get(obj_ref) == 1
span_list = get_span_list()
assert len(span_list) == 4
# The spans could show up in a different order, so just check that
# all spans are as expected
span_names = get_span_dict(span_list)
return span_names == {
"sync_actor_helper.<locals>.Counter.__init__ ray.remote": 1,
"sync_actor_helper.<locals>.Counter.increment ray.remote": 1,
"Counter.__init__ ray.remote_worker": 1,
"Counter.increment ray.remote_worker": 1,
}
def async_actor_helper():
"""Run a Ray async actor and check the spans produced."""
@ray.remote
class AsyncActor:
# multiple invocation of this method can be running in
# the event loop at the same time
async def run_concurrent(self):
await asyncio.sleep(2) # concurrent workload here
actor = AsyncActor.remote()
ray.get([actor.run_concurrent.remote() for _ in range(4)])
span_list = get_span_list()
assert len(span_list) == 10
# The spans could show up in a different order, so just check that
# all spans are as expected
span_names = get_span_dict(span_list)
return span_names == {
"async_actor_helper.<locals>.AsyncActor.__init__ ray.remote": 1,
"async_actor_helper.<locals>.AsyncActor.run_concurrent ray.remote": 4,
"AsyncActor.__init__ ray.remote_worker": 1,
"AsyncActor.run_concurrent ray.remote_worker": 4
}
def test_tracing_task_init_workflow(cleanup_dirs, ray_start_init_tracing):
assert task_helper()
def test_tracing_task_start_workflow(cleanup_dirs, ray_start_cli_tracing):
assert task_helper()
def test_tracing_sync_actor_init_workflow(cleanup_dirs,
ray_start_init_tracing):
assert sync_actor_helper()
def test_tracing_sync_actor_start_workflow(cleanup_dirs,
ray_start_cli_tracing):
assert sync_actor_helper()
def test_tracing_async_actor_init_workflow(cleanup_dirs,
ray_start_init_tracing):
assert async_actor_helper()
def test_tracing_async_actor_start_workflow(cleanup_dirs,
ray_start_cli_tracing):
assert async_actor_helper()
def test_tracing_predefined_actor(cleanup_dirs,
ray_start_cli_predefined_actor_tracing):
assert sync_actor_helper(connect_to_cluster=True)
def test_wrapping(ray_start_init_tracing):
@ray.remote
def f(**_kwargs):
pass
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))