Skip to content

Commit

Permalink
[workflow] Major refactoring - new async workflow executor (ray-proje…
Browse files Browse the repository at this point in the history
…ct#25618)

* major workflow refactoring
  • Loading branch information
suquark committed Jun 30, 2022
1 parent 636a9c1 commit ddd63ab
Show file tree
Hide file tree
Showing 41 changed files with 1,992 additions and 2,915 deletions.
77 changes: 0 additions & 77 deletions doc/source/workflows/advanced.rst
Original file line number Diff line number Diff line change
@@ -1,83 +1,6 @@
Advanced Topics
===============

Inplace Execution
-----------------

When executing a workflow task inside another workflow task, it is usually executed in another Ray worker process. This is good for resource and performance isolation, but at the cost of lower efficiency due to non-locality, scheduling and data transfer.

For example, this recursive workflow calculates the exponent. We write it with workflow so that we can recover from any task. However, it is really inefficient to scheduling each task in a different worker.

.. code-block:: python
:caption: Workflow without inplace execution:
import ray
from ray import workflow
@ray.remote
def exp_remote(k, n):
if n == 0:
return k
return workflow.continuation(exp_remote.bind(2 * k, n - 1))
We could optimize it with inplace option:

.. code-block:: python
:caption: Workflow with inplace execution:
import ray
from ray import workflow
@ray.remote
def exp_inplace(k, n):
if n == 0:
return k
return workflow.continuation(
exp_inplace.options(**workflow.options(allow_inplace=True)).bind(2 * k, n - 1))
assert workflow.create(exp_inplace.bind(3, 7)).run() == 3 * 2 ** 7
With ``allow_inplace=True``, the task that called ``.bind()`` executes in the function. Ray options are ignored because they are used for remote execution. Also, you cannot retrieve the output of an inplace task using ``workflow.get_output()`` before it finishes execution.

Inplace is also useful when you need to pass something that is only valid in the current process/physical machine to another task. For example:

.. code-block:: python
@ray.remote
def foo():
x = "<something that is only valid in the current process>"
return workflow.continuation(bar.options(**workflow.options(allow_inplace=True)).bind(x))
Wait for Partial Results
------------------------

By default, a workflow task will only execute after the completion of all of its dependencies. This blocking behavior prevents certain types of workflows from being expressed (e.g., wait for two of the three tasks to finish).

Analogous to ``ray.wait()``, in Ray Workflow we have ``workflow.wait(*tasks: List[Workflow[T]], num_returns: int = 1, timeout: float = None) -> (List[T], List[Workflow[T])``. Calling `workflow.wait` would generate a logical task . The output of the logical task is a tuple of ready workflow results, and workflow results that have not yet been computed. For example, you can use it to print out workflow results as they are computed in the following dynamic workflow:

.. code-block:: python
@ray.remote
def do_task(i):
time.sleep(random.random())
return "task {}".format(i)
@ray.remote
def report_results(wait_result: Tuple[List[str], List[Workflow[str]]]):
ready, remaining = wait_result
for result in ready:
print("Completed", result)
if not remaining:
return "All done"
else:
return workflow.continuation(report_results.bind(workflow.wait(remaining)))
tasks = [do_task.bind(i) for i in range(100)]
report_results.bind(workflow.wait(tasks)).run()
Workflow task Checkpointing
---------------------------

Expand Down
54 changes: 25 additions & 29 deletions doc/source/workflows/basics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ To retrieve a workflow result, you can assign ``workflow_id`` when running a wor
assert workflow.create(ret).run(workflow_id="add_example") == 30
Then workflow results can be retrieved with ``workflow.get_output(workflow_id) -> ObjectRef[T]``.
Then workflow results can be retrieved with ``workflow.get_output(workflow_id) -> ObjectRef[T]``.
If a workflow is not given ``workflow_id``, a random string is set as the ``workflow_id``. To confirm ``workflow_id`` in the situation, call ``ray.workflow.list_all()``.

If the workflow has not yet completed, calling ``ray.get()`` on the returned reference will block until the result is computed. For example:
Expand All @@ -137,13 +137,13 @@ Once a task is given a name, the result of the task will be retrievable via ``wo

.. code-block:: python
# TODO(suquark): Fix this example
import ray
from ray import workflow
workflow_id = "double"
try:
# cleanup previous workflows
workflow.delete("double")
workflow.delete(workflow_id)
except Exception:
pass
Expand All @@ -155,60 +155,56 @@ Once a task is given a name, the result of the task will be retrievable via ``wo
outer_task = double.options(**workflow.options(name="outer")).bind(inner_task)
result = workflow.create(outer_task).run_async("double")
inner = workflow.get_output("double", name="inner")
outer = workflow.get_output("double", name="outer")
inner = workflow.get_output(workflow_id, name="inner")
outer = workflow.get_output(workflow_id, name="outer")
assert ray.get(inner) == 2
assert ray.get(outer) == 4
assert ray.get(result) == 4
If there are multiple tasks with the same name, a suffix with a counter ``_n`` will be added automatically.
If there are multiple tasks with the same name, a suffix with a counter ``_n`` will be added automatically.

The suffix with a counter ``_n`` is a sequential number (1,2,3,...) of the tasks to be executed.
(Note that the first task does not have the suffix.)

# TODO(suquark): make sure Ray DAG does not depend on Ray Serve and PyArrow.

For example,
(Note that before trying the following, install Ray Serve and PyArrow ``pip install "ray[serve]" pyarrow``.)

.. code-block:: python
# TODO(suquark): Fix this example
import ray
from ray import workflow
workflow_id = "_test_task_id_generation"
try:
# cleanup previous workflows
workflow.delete("double")
workflow.delete(workflow_id)
except Exception:
pass
@workflow.options(name="double")
@ray.remote
def double(s):
return s * 2
inner_task = double.bind(1)
middle_task = double.bind(inner_task)
outer_task = double.bind(middle_task)
result = workflow.create(outer_task).run_async("double")
ray.workflow.list_all() == [('double', <WorkflowStatus.SUCCESSFUL: 'SUCCESSFUL'>)]
def simple(x):
return x + 1
outer = workflow.get_output("double", name="double")
inner = workflow.get_output("double", name="double_1")
middle = workflow.get_output("double", name="double_2")
x = simple.options(**workflow.options(name="step")).bind(-1)
n = 20
for i in range(1, n):
x = simple.options(**workflow.options(name="step")).bind(x)
assert ray.get(outer) == 8
assert ray.get(inner) == 4
assert ray.get(middle) == 2
assert result == 8
ret = workflow.create(x).run_async(workflow_id=workflow_id)
outputs = [workflow.get_output(workflow_id, name="step")]
for i in range(1, n):
outputs.append(workflow.get_output(workflow_id, name=f"step_{i}"))
assert ray.get(ret) == n - 1
assert ray.get(outputs) == list(range(n))
By default, each task will be given a name generated by the library, ``<WORKFLOW_ID>.<MODULE_NAME>.<FUNC_NAME>``.
ray.workflow.list_all() == [(workflow_id, workflow.WorkflowStatus.SUCCESSFUL)]
In the above sample, since the tasks are not given ``task_name``, the function name ``double`` is set as the ``task_name``.
In addition, there are multiple tasks with ``double`` as names.
They are executed in the order ``outer_task`` -> ``middle_task`` -> ``inner_task``, so the name "double" is added to ``outer_task``, "double_1" to ``middle_task`` and "double_2" to ``inner_task``.
By default, each task will be given a name generated by the library, ``<MODULE_NAME>.<FUNC_NAME>``. In the example above, ``step`` is given as the task name for function ``simple``.

When the task name duplicates, we append ``_n`` to the name by the order of execution as it task ID. So the initial task get the ID ``step``, the second one get ``step_1``, and this goes on for all later tasks.

Error handling
--------------
Expand Down
5 changes: 0 additions & 5 deletions doc/source/workflows/package-ref.rst
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
Ray Workflows API
=================

Core API
---------
.. autoclass:: ray.workflow.common.Workflow
:members:

Management API
--------------
.. autofunction:: ray.workflow.resume_all
Expand Down
12 changes: 7 additions & 5 deletions python/ray/workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from ray.workflow.api import (
step,
init,
get_output,
get_status,
Expand All @@ -11,20 +10,24 @@
wait_for_event,
sleep,
delete,
wait,
create,
continuation,
options,
)
from ray.workflow.workflow_access import WorkflowExecutionError
from ray.workflow.exceptions import (
WorkflowError,
WorkflowExecutionError,
WorkflowCancellationError,
)
from ray.workflow.common import WorkflowStatus
from ray.workflow.event_listener import EventListener

__all__ = [
"step",
"resume",
"get_output",
"WorkflowError",
"WorkflowExecutionError",
"WorkflowCancellationError",
"resume_all",
"cancel",
"get_status",
Expand All @@ -35,7 +38,6 @@
"sleep",
"EventListener",
"delete",
"wait",
"create",
"continuation",
"options",
Expand Down
Loading

0 comments on commit ddd63ab

Please sign in to comment.