Skip to content

Commit

Permalink
[core][experimental] Fix experimental microbenchmark and add asyncio …
Browse files Browse the repository at this point in the history
…microbenchmarks (ray-project#44330)

Fix microbenchmark after changes made in ray-project#43128:

    must begin_read() on channel before calling end_read()
    multi-output DAGs now return a single channel instead of multiple
    add asyncio versions of the DAG benchmarks

Signed-off-by: Stephanie Wang <[email protected]>
  • Loading branch information
stephanie-wang committed Mar 28, 2024
1 parent c1b003f commit c89eec0
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 22 deletions.
82 changes: 60 additions & 22 deletions python/ray/_private/ray_experimental_perf.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
"""This is the script for `ray microbenchmark`."""

import logging
from ray._private.ray_microbenchmark_helpers import timeit
from ray._private.ray_microbenchmark_helpers import timeit, asyncio_timeit
import multiprocessing
import ray

import ray.experimental.channel as ray_channel
from ray.dag import InputNode, MultiOutputNode
from ray._private.utils import (
get_or_create_event_loop,
)

logger = logging.getLogger(__name__)

Expand All @@ -33,6 +36,7 @@ def check_optimized_build():

def main(results=None):
results = results or []
loop = get_or_create_event_loop()

check_optimized_build()

Expand Down Expand Up @@ -63,10 +67,6 @@ def read(self, chans):
chan.end_read()

chans = [ray_channel.Channel(1000)]
results += timeit(
"[unstable] local put, single channel calls",
lambda: put_channel_small(chans, do_release=True),
)
results += timeit(
"[unstable] local put:local get, single channel calls",
lambda: put_channel_small(chans, do_get=True, do_release=True),
Expand Down Expand Up @@ -126,12 +126,17 @@ def _exec(dag):
output_channel.begin_read()
output_channel.end_read()

def _exec_multi_output(dag):
output_channels = dag.execute(b"x")
for output_channel in output_channels:
output_channel.begin_read()
for output_channel in output_channels:
output_channel.end_read()
async def exec_async(tag):
async def _exec_async():
output_channel = await compiled_dag.execute_async(b"x")
# Using context manager.
async with output_channel as _:
pass

return await asyncio_timeit(
tag,
_exec_async,
)

a = DAGActor.remote()
with InputNode() as inp:
Expand All @@ -140,23 +145,49 @@ def _exec_multi_output(dag):
results += timeit(
"[unstable] single-actor DAG calls", lambda: ray.get(dag.execute(b"x"))
)
dag = dag.experimental_compile()
results += timeit("[unstable] compiled single-actor DAG calls", lambda: _exec(dag))
compiled_dag = dag.experimental_compile()
results += timeit(
"[unstable] compiled single-actor DAG calls", lambda: _exec(compiled_dag)
)
compiled_dag.teardown()

compiled_dag = dag.experimental_compile(enable_asyncio=True)
results += loop.run_until_complete(
exec_async(
"[unstable] compiled single-actor asyncio DAG calls",
)
)
# TODO: Need to explicitly tear down DAGs with enable_asyncio=True because
# these DAGs create a background thread that can segfault if the CoreWorker
# is torn down first.
compiled_dag.teardown()

del a
n_cpu = multiprocessing.cpu_count() // 2
actors = [DAGActor.remote() for _ in range(n_cpu)]
with InputNode() as inp:
dag = MultiOutputNode([a.echo.bind(inp) for a in actors])
results += timeit(
"[unstable] scatter-gather DAG calls, n={n_cpu} actors",
f"[unstable] scatter-gather DAG calls, n={n_cpu} actors",
lambda: ray.get(dag.execute(b"x")),
)
dag = dag.experimental_compile()
compiled_dag = dag.experimental_compile()
results += timeit(
f"[unstable] compiled scatter-gather DAG calls, n={n_cpu} actors",
lambda: _exec_multi_output(dag),
lambda: _exec(compiled_dag),
)
compiled_dag.teardown()

compiled_dag = dag.experimental_compile(enable_asyncio=True)
results += loop.run_until_complete(
exec_async(
f"[unstable] compiled scatter-gather asyncio DAG calls, n={n_cpu} actors",
)
)
# TODO: Need to explicitly tear down DAGs with enable_asyncio=True because
# these DAGs create a background thread that can segfault if the CoreWorker
# is torn down first.
compiled_dag.teardown()

actors = [DAGActor.remote() for _ in range(n_cpu)]
with InputNode() as inp:
Expand All @@ -167,16 +198,23 @@ def _exec_multi_output(dag):
f"[unstable] chain DAG calls, n={n_cpu} actors",
lambda: ray.get(dag.execute(b"x")),
)
dag = dag.experimental_compile()
compiled_dag = dag.experimental_compile()
results += timeit(
f"[unstable] compiled chain DAG calls, n={n_cpu} actors", lambda: _exec(dag)
f"[unstable] compiled chain DAG calls, n={n_cpu} actors",
lambda: _exec(compiled_dag),
)
compiled_dag.teardown()

ray.shutdown()
compiled_dag = dag.experimental_compile(enable_asyncio=True)
results += loop.run_until_complete(
exec_async(f"[unstable] compiled chain asyncio DAG calls, n={n_cpu} actors")
)
# TODO: Need to explicitly tear down DAGs with enable_asyncio=True because
# these DAGs create a background thread that can segfault if the CoreWorker
# is torn down first.
compiled_dag.teardown()

############################
# End of channel perf tests.
############################
ray.shutdown()

return results

Expand Down
35 changes: 35 additions & 0 deletions python/ray/_private/ray_microbenchmark_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,41 @@ def timeit(
return [(name, mean, sd)]


async def asyncio_timeit(
name, async_fn, multiplier=1, warmup_time_sec=10
) -> List[Optional[Tuple[str, float, float]]]:
if filter_pattern and filter_pattern not in name:
return [None]
if skip_pattern and skip_pattern in name:
return [None]
# sleep for a while to avoid noisy neigbhors.
# related issue: https://github.com/ray-project/ray/issues/22045
time.sleep(warmup_time_sec)
# warmup
start = time.perf_counter()
count = 0
while time.perf_counter() - start < 1:
await async_fn()
count += 1
# real run
step = count // 10 + 1
stats = []
for _ in range(4):
start = time.perf_counter()
count = 0
while time.perf_counter() - start < 2:
for _ in range(step):
await async_fn()
count += step
end = time.perf_counter()
stats.append(multiplier * count / (end - start))

mean = np.mean(stats)
sd = np.std(stats)
print(name, "per second", round(mean, 2), "+-", round(sd, 2))
return [(name, mean, sd)]


@contextmanager
def ray_setup_and_teardown(**init_args):
ray.init(**init_args)
Expand Down
2 changes: 2 additions & 0 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,8 @@ def teardown(self):
self.in_teardown = True
for actor in outer.actor_refs:
logger.info(f"Cancelling compiled worker on actor: {actor}")
# TODO(swang): Suppress exceptions from actors trying to
# read closed channels when DAG is being torn down.
try:
ray.get(actor.__ray_call__.remote(do_cancel_compiled_task))
except Exception:
Expand Down

0 comments on commit c89eec0

Please sign in to comment.