Skip to content

Commit

Permalink
[2/N] Streaming Generator. Support core worker APIs + cython generato…
Browse files Browse the repository at this point in the history
…r interface. (#35324) (#35682)

This is the second PR to support streaming generator.

The detailed design and API proposal can be found from https://docs.google.com/document/d/1hAASLe2sCoay23raqxqwJdSDiJWNMcNhlTwWJXsJOU4/edit#heading=h.w91y1fgnpu0m.
The Execution plan can be found from https://docs.google.com/document/d/1hAASLe2sCoay23raqxqwJdSDiJWNMcNhlTwWJXsJOU4/edit#heading=h.kxktymq5ihf7.
There will be 4 PRs to enable streaming generator for Ray Serve (phase 1).

 This PR -> introduce cpp interfaces to handle intermediate task return [1/N] Streaming Generator. Cpp interfaces and implementation #35291
 Support core worker APIs + cython generator interface. [2/N] Streaming Generator. Support core worker APIs + cython generator interface. #35324 < --- this PR
 E2e integration [3/N] Streaming Generator. E2e integration #35325 (review)
 Support async actors
This PR implements the Cython generator interface that users can use to obtain a next available object reference.
---------

Signed-off-by: SangBin Cho <[email protected]>
  • Loading branch information
rkooo567 committed May 24, 2023
1 parent 5c5fdfd commit 819c6f1
Show file tree
Hide file tree
Showing 8 changed files with 379 additions and 1 deletion.
172 changes: 172 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,149 @@ class ObjectRefGenerator:
return len(self._refs)


class ObjectRefStreamEoFError(RayError):
pass


class StreamingObjectRefGenerator:
def __init__(self, generator_ref: ObjectRef, worker: "Worker"):
# The reference to a generator task.
self._generator_ref = generator_ref
# The last time generator task has completed.
self._generator_task_completed_time = None
# The exception raised from a generator task.
self._generator_task_exception = None
# Ray's worker class. ray._private.worker.global_worker
self.worker = worker
assert hasattr(worker, "core_worker")
self.worker.core_worker.create_object_ref_stream(self._generator_ref)

def __iter__(self) -> "StreamingObjectRefGenerator":
return self

def __next__(self) -> ObjectRef:
"""Waits until a next ref is available and returns the object ref.

Raises StopIteration if there's no more objects
to generate.

The object ref will contain an exception if the task fails.
When the generator task returns N objects, it can return
up to N + 1 objects (if there's a system failure, the
last object will contain a system level exception).
"""
return self._next()

def _next(
self,
timeout_s: float = -1,
sleep_interval_s: float = 0.0001,
unexpected_network_failure_timeout_s: float = 30) -> ObjectRef:
"""Waits for timeout_s and returns the object ref if available.

If an object is not available within the given timeout, it
returns a nil object reference.

If -1 timeout is provided, it means it waits infinitely.

Waiting is implemented as busy waiting. You can control
the busy waiting interval via sleep_interval_s.

Raises StopIteration if there's no more objects
to generate.

The object ref will contain an exception if the task fails.
When the generator task returns N objects, it can return
up to N + 1 objects (if there's a system failure, the
last object will contain a system level exception).

Args:
timeout_s: If the next object is not ready within
this timeout, it returns the nil object ref.
sleep_interval_s: busy waiting interval.
unexpected_network_failure_timeout_s: If the
task is finished, but the next ref is not
available within this time, it will hard fail
the generator.
"""
obj = self._handle_next()
last_time = time.time()

# The generator ref will be None if the task succeeds.
# It will contain an exception if the task fails by
# a system error.
while obj.is_nil():
if self._generator_task_exception:
# The generator task has failed already.
# We raise StopIteration
# to conform the next interface in Python.
raise StopIteration from None
else:
# Otherwise, we should ray.get on the generator
# ref to find if the task has a system failure.
# Return the generator ref that contains the system
# error as soon as possible.
r, _ = ray.wait([self._generator_ref], timeout=0)
if len(r) > 0:
try:
ray.get(r)
except Exception as e:
# If it has failed, return the generator task ref
# so that the ref will raise an exception.
self._generator_task_exception = e
return self._generator_ref
finally:
if self._generator_task_completed_time is None:
self._generator_task_completed_time = time.time()

# Currently, since the ordering of intermediate result report
# is not guaranteed, it is possible that althoug the task
# has succeeded, all of the object references are not reported
# (e.g., when there are network failures).
# If all the object refs are not reported to the generator
# within 30 seconds, we consider is as an unreconverable error.
if self._generator_task_completed_time:
if (time.time() - self._generator_task_completed_time
> unexpected_network_failure_timeout_s):
# It means the next wasn't reported although the task
# has been terminated 30 seconds ago.
self._generator_task_exception = AssertionError
assert False, "Unexpected network failure occured."

if timeout_s != -1 and time.time() - last_time > timeout_s:
return ObjectRef.nil()

# 100us busy waiting
time.sleep(sleep_interval_s)
obj = self._handle_next()
return obj

def _handle_next(self) -> ObjectRef:
try:
if hasattr(self.worker, "core_worker"):
obj = self.worker.core_worker.try_read_next_object_ref_stream(
self._generator_ref)
return obj
else:
raise ValueError(
"Cannot access the core worker. "
"Did you already shutdown Ray via ray.shutdown()?")
except ObjectRefStreamEoFError:
raise StopIteration from None

def __del__(self):
if hasattr(self.worker, "core_worker"):
# NOTE: This can be called multiple times
# because python doesn't guarantee __del__ is called
# only once.
self.worker.core_worker.delete_object_ref_stream(self._generator_ref)

def __getstate__(self):
raise TypeError(
"You cannot return or pass a generator to other task. "
"Serializing a StreamingObjectRefGenerator is not allowed.")


cdef int check_status(const CRayStatus& status) nogil except -1:
if status.ok():
return 0
Expand All @@ -209,6 +352,8 @@ cdef int check_status(const CRayStatus& status) nogil except -1:
raise ObjectStoreFullError(message)
elif status.IsOutOfDisk():
raise OutOfDiskError(message)
elif status.IsObjectRefStreamEoF():
raise ObjectRefStreamEoFError(message)
elif status.IsInterrupted():
raise KeyboardInterrupt()
elif status.IsTimedOut():
Expand Down Expand Up @@ -3139,6 +3284,33 @@ cdef class CoreWorker:
CCoreWorkerProcess.GetCoreWorker() \
.RecordTaskLogEnd(out_end_offset, err_end_offset)

def create_object_ref_stream(self, ObjectRef generator_id):
cdef:
CObjectID c_generator_id = generator_id.native()

CCoreWorkerProcess.GetCoreWorker().CreateObjectRefStream(c_generator_id)

def delete_object_ref_stream(self, ObjectRef generator_id):
cdef:
CObjectID c_generator_id = generator_id.native()

CCoreWorkerProcess.GetCoreWorker().DelObjectRefStream(c_generator_id)

def try_read_next_object_ref_stream(self, ObjectRef generator_id):
cdef:
CObjectID c_generator_id = generator_id.native()
CObjectReference c_object_ref

check_status(
CCoreWorkerProcess.GetCoreWorker().TryReadObjectRefStream(
c_generator_id, &c_object_ref))
return ObjectRef(
c_object_ref.object_id(),
c_object_ref.owner_address().SerializeAsString(),
"",
# Already added when the ref is updated.
skip_adding_local_ref=True)

cdef void async_callback(shared_ptr[CRayObject] obj,
CObjectID object_ref,
void *user_callback) with gil:
Expand Down
4 changes: 4 additions & 0 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:
@staticmethod
CRayStatus NotFound()

@staticmethod
CRayStatus ObjectRefStreamEoF()

c_bool ok()
c_bool IsOutOfMemory()
c_bool IsKeyError()
Expand All @@ -118,6 +121,7 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:
c_bool IsObjectUnknownOwner()
c_bool IsRpcError()
c_bool IsOutOfResource()
c_bool IsObjectRefStreamEoF()

c_string ToString()
c_string CodeAsString()
Expand Down
5 changes: 5 additions & 0 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const CObjectID& return_id,
shared_ptr[CRayObject] *return_object,
const CObjectID& generator_id)
void DelObjectRefStream(const CObjectID &generator_id)
void CreateObjectRefStream(const CObjectID &generator_id)
CRayStatus TryReadObjectRefStream(
const CObjectID &generator_id,
CObjectReference *object_ref_out)
CObjectID AllocateDynamicReturnId()

CJobID GetCurrentJobId()
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ py_test_module_list(
"test_gcs_fault_tolerance.py",
"test_gcs_utils.py",
"test_generators.py",
"test_streaming_generator.py",
"test_metrics_agent.py",
"test_metrics_head.py",
"test_component_failures_2.py",
Expand Down
143 changes: 143 additions & 0 deletions python/ray/tests/test_streaming_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import pytest
import sys
import time

from unittest.mock import patch, Mock

import ray
from ray._raylet import StreamingObjectRefGenerator, ObjectRefStreamEoFError
from ray.cloudpickle import dumps
from ray.exceptions import WorkerCrashedError


class MockedWorker:
def __init__(self, mocked_core_worker):
self.core_worker = mocked_core_worker

def reset_core_worker(self):
"""Emulate the case ray.shutdown is called
and the core_worker instance is GC'ed.
"""
self.core_worker = None


@pytest.fixture
def mocked_worker():
mocked_core_worker = Mock()
mocked_core_worker.try_read_next_object_ref_stream.return_value = None
mocked_core_worker.delete_object_ref_stream.return_value = None
mocked_core_worker.create_object_ref_stream.return_value = None
worker = MockedWorker(mocked_core_worker)
yield worker


def test_streaming_object_ref_generator_basic_unit(mocked_worker):
"""
Verify the basic case:
create a generator -> read values -> nothing more to read -> delete.
"""
with patch("ray.wait") as mocked_ray_wait:
c = mocked_worker.core_worker
generator_ref = ray.ObjectRef.from_random()
generator = StreamingObjectRefGenerator(generator_ref, mocked_worker)
c.try_read_next_object_ref_stream.return_value = ray.ObjectRef.nil()
c.create_object_ref_stream.assert_called()

# Test when there's no new ref, it returns a nil.
mocked_ray_wait.return_value = [], [generator_ref]
ref = generator._next(timeout_s=0)
assert ref.is_nil()

# When the new ref is available, next should return it.
for _ in range(3):
new_ref = ray.ObjectRef.from_random()
c.try_read_next_object_ref_stream.return_value = new_ref
ref = generator._next(timeout_s=0)
assert new_ref == ref

# When try_read_next_object_ref_stream raises a
# ObjectRefStreamEoFError, it should raise a stop iteration.
c.try_read_next_object_ref_stream.side_effect = ObjectRefStreamEoFError(
""
) # noqa
with pytest.raises(StopIteration):
ref = generator._next(timeout_s=0)

# Make sure we cannot serialize the generator.
with pytest.raises(TypeError):
dumps(generator)

del generator
c.delete_object_ref_stream.assert_called()


def test_streaming_object_ref_generator_task_failed_unit(mocked_worker):
"""
Verify when a task is failed by a system error,
the generator ref is returned.
"""
with patch("ray.get") as mocked_ray_get:
with patch("ray.wait") as mocked_ray_wait:
c = mocked_worker.core_worker
generator_ref = ray.ObjectRef.from_random()
generator = StreamingObjectRefGenerator(generator_ref, mocked_worker)

# Simulate the worker failure happens.
mocked_ray_wait.return_value = [generator_ref], []
mocked_ray_get.side_effect = WorkerCrashedError()

c.try_read_next_object_ref_stream.return_value = ray.ObjectRef.nil()
ref = generator._next(timeout_s=0)
# If the generator task fails by a systsem error,
# meaning the ref will raise an exception
# it should be returned.
print(ref)
print(generator_ref)
assert ref == generator_ref

# Once exception is raised, it should always
# raise stopIteration regardless of what
# the ref contains now.
with pytest.raises(StopIteration):
ref = generator._next(timeout_s=0)


def test_streaming_object_ref_generator_network_failed_unit(mocked_worker):
"""
Verify when a task is finished, but if the next ref is not available
on time, it raises an assertion error.
TODO(sang): Once we move the task subimssion path to use pubsub
to guarantee the ordering, we don't need this test anymore.
"""
with patch("ray.get") as mocked_ray_get:
with patch("ray.wait") as mocked_ray_wait:
c = mocked_worker.core_worker
generator_ref = ray.ObjectRef.from_random()
generator = StreamingObjectRefGenerator(generator_ref, mocked_worker)

# Simulate the task has finished.
mocked_ray_wait.return_value = [generator_ref], []
mocked_ray_get.return_value = None

# If StopIteration is not raised within
# unexpected_network_failure_timeout_s second,
# it should fail.
c.try_read_next_object_ref_stream.return_value = ray.ObjectRef.nil()
ref = generator._next(timeout_s=0, unexpected_network_failure_timeout_s=1)
assert ref == ray.ObjectRef.nil()
time.sleep(1)
with pytest.raises(AssertionError):
generator._next(timeout_s=0, unexpected_network_failure_timeout_s=1)
# After that StopIteration should be raised.
with pytest.raises(StopIteration):
generator._next(timeout_s=0, unexpected_network_failure_timeout_s=1)


if __name__ == "__main__":
import os

if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
Loading

0 comments on commit 819c6f1

Please sign in to comment.