Skip to content

Commit

Permalink
[Doc] Streaming generator alpha doc (ray-project#39914)
Browse files Browse the repository at this point in the history
This is a PR to write a streaming generator documentation. Unlike existing generator doc, streaming generator doc is under "advanced" usage, and I will add proper links from each task/actor/pattern pages to this doc.

The doc is written with the assumption is is an "alpha" feature. It is important to merge this doc PR before proceeding with ray-project#38784 as it will need to refer docs here and there. Some of the contents (such as "API is subject to change") will be removed after we merge these PRs.
  • Loading branch information
rkooo567 authored Oct 16, 2023
1 parent 8d286f0 commit 306c714
Show file tree
Hide file tree
Showing 7 changed files with 442 additions and 7 deletions.
6 changes: 3 additions & 3 deletions doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@
# Special mocking of packaging.version.Version is required when using sphinx;
# we can't just add this to autodoc_mock_imports, as packaging is imported by
# sphinx even before it can be mocked. Instead, we patch it here.
import packaging
import packaging.version as packaging_version

Version = packaging.version.Version
Version = packaging_version.Version


class MockVersion(Version):
Expand All @@ -137,7 +137,7 @@ def __init__(self, version: str):
super().__init__("0")


packaging.version.Version = MockVersion
packaging_version.Version = MockVersion

# This is used to suppress warnings about explicit "toctree" directives.
suppress_warnings = ["etoc.toctree"]
Expand Down
5 changes: 5 additions & 0 deletions doc/source/ray-core/actors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,11 @@ If we instantiate an actor, we can pass the handle around to various tasks.
}
Generators
----------
Ray is compatible with Python generator syntax. See :ref:`Ray Generators <generators>` for more details.
Cancelling Actor Tasks
----------------------
Expand Down
1 change: 1 addition & 0 deletions doc/source/ray-core/advanced-topics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ This section covers extended topics on how to use Ray.

tips-for-first-time
starting-ray
ray-generator
namespaces
cross-language
using-ray-with-jupyter
Expand Down
209 changes: 209 additions & 0 deletions doc/source/ray-core/doc_code/streaming_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# flake8: noqa

# fmt: off

# __streaming_generator_define_start__
import ray
import time

@ray.remote(num_returns="streaming")
def task():
for i in range(5):
time.sleep(5)
yield i

# __streaming_generator_define_end__

# __streaming_generator_execute_start__
gen = task.remote()
# Blocks for 5 seconds.
ref = next(gen)
# return 0
ray.get(ref)
# Blocks for 5 seconds.
ref = next(gen)
# Return 1
ray.get(ref)

# Returns 2~4 every 5 seconds.
for ref in gen:
print(ray.get(ref))

# __streaming_generator_execute_end__

# __streaming_generator_exception_start__
@ray.remote(num_returns="streaming")
def task():
for i in range(5):
time.sleep(1)
if i == 1:
raise ValueError
yield i

gen = task.remote()
# it's okay.
ray.get(next(gen))

# Raises an exception
try:
ray.get(next(gen))
except ValueError as e:
print(f"Exception is raised when i == 1 as expected {e}")

# __streaming_generator_exception_end__

# __streaming_generator_actor_model_start__
@ray.remote
class Actor:
def f(self):
for i in range(5):
yield i

@ray.remote
class AsyncActor:
async def f(self):
for i in range(5):
yield i

@ray.remote(num_concurrency=5)
class ThreadedActor:
def f(self):
for i in range(5):
yield i

actor = Actor.remote()
for ref in actor.f.options(num_returns="streaming").remote():
print(ray.get(ref))

actor = AsyncActor.remote()
for ref in actor.f.options(num_returns="streaming").remote():
print(ray.get(ref))

actor = ThreadedActor.remote()
for ref in actor.f.options(num_returns="streaming").remote():
print(ray.get(ref))

# __streaming_generator_actor_model_end__

# __streaming_generator_asyncio_start__
import asyncio

@ray.remote(num_returns="streaming")
def task():
for i in range(5):
time.sleep(1)
yield i


async def main():
async for ref in task.remote():
print(await ref)

asyncio.run(main())

# __streaming_generator_asyncio_end__

# __streaming_generator_gc_start__
@ray.remote(num_returns="streaming")
def task():
for i in range(5):
time.sleep(1)
yield i

gen = task.remote()
ref1 = next(gen)
del gen

# __streaming_generator_gc_end__

# __streaming_generator_concurrency_asyncio_start__
import asyncio

@ray.remote(num_returns="streaming")
def task():
for i in range(5):
time.sleep(1)
yield i


async def async_task():
async for ref in task.remote():
print(await ref)

async def main():
t1 = async_task()
t2 = async_task()
await asyncio.gather(t1, t2)

asyncio.run(main())
# __streaming_generator_concurrency_asyncio_end__

# __streaming_generator_wait_simple_start__
@ray.remote(num_returns="streaming")
def task():
for i in range(5):
time.sleep(5)
yield i

gen = task.remote()

# Because it takes 5 seconds to make the first yield,
# with 0 timeout, the generator is unready.
ready, unready = ray.wait([gen], timeout=0)
print("timeout 0, nothing is ready.")
print(ready)
assert len(ready) == 0
assert len(unready) == 1

# Without a timeout argument, ray.wait waits until the given argument
# is ready. When a next item is ready, it returns.
ready, unready = ray.wait([gen])
print("Wait for 5 seconds. The next item is ready.")
assert len(ready) == 1
assert len(unready) == 0
next(gen)

# Because the second yield hasn't happened yet,
ready, unready = ray.wait([gen], timeout=0)
print("Wait for 0 seconds. The next item is not ready.")
print(ready, unready)
assert len(ready) == 0
assert len(unready) == 1

# __streaming_generator_wait_simple_end__

# __streaming_generator_wait_complex_start__
from ray._raylet import StreamingObjectRefGenerator

@ray.remote(num_returns="streaming")
def generator_task():
for i in range(5):
time.sleep(5)
yield i

@ray.remote
def regular_task():
for i in range(5):
time.sleep(5)
return

gen = [generator_task.remote()]
ref = [regular_task.remote()]
ready, unready = [], [*gen, *ref]
result = []

while unready:
ready, unready = ray.wait(unready)
for r in ready:
if isinstance(r, StreamingObjectRefGenerator):
try:
ref = next(r)
result.append(ray.get(ref))
except StopIteration:
pass
else:
unready.append(r)
else:
result.append(ray.get(r))

# __streaming_generator_wait_complex_end__
Loading

0 comments on commit 306c714

Please sign in to comment.