Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[workflow] Deprecate workflow.create #26106

Merged
merged 4 commits into from
Jul 3, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 64 additions & 30 deletions doc/source/workflows/basics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,21 @@ The Ray DAG will not be executed until further actions are taken on it.
Your first workflow
-------------------

A single line is all you need to turn the previous DAG into a workflow:
A single line is all you need to run the workflow DAG:

.. code-block:: python

# <follow the previous code>
from ray import workflow

# Create the workflow from the DAG.
wf = workflow.create(output)
# Execute the workflow and print the result.
print(wf.run())
print(workflow.run(output))

# The workflow can also be executed asynchronously.
# print(ray.get(output.run_async()))
# You can also run the workflow asynchronously and fetching the output via 'ray.get'
output_ref = workflow.run_async(dag)
print(ray.get(output_ref))

Here is the workflow we created:
Here this figure visualizes the workflow we created:

.. image:: basic.png
:width: 500px
Expand Down Expand Up @@ -115,7 +114,7 @@ To retrieve a workflow result, you can assign ``workflow_id`` when running a wor

ret = add.bind(get_val.bind(), 20)

assert workflow.create(ret).run(workflow_id="add_example") == 30
assert workflow.run(ret, workflow_id="add_example") == 30

Then workflow results can be retrieved with ``workflow.get_output(workflow_id) -> ObjectRef[T]``.
If a workflow is not given ``workflow_id``, a random string is set as the ``workflow_id``. To confirm ``workflow_id`` in the situation, call ``ray.workflow.list_all()``.
Expand Down Expand Up @@ -153,14 +152,14 @@ Once a task is given a name, the result of the task will be retrievable via ``wo

inner_task = double.options(**workflow.options(name="inner")).bind(1)
outer_task = double.options(**workflow.options(name="outer")).bind(inner_task)
result = workflow.create(outer_task).run_async("double")
result_ref = workflow.run_async(outer_task, workflow_id="double")

inner = workflow.get_output(workflow_id, name="inner")
outer = workflow.get_output(workflow_id, name="outer")

assert ray.get(inner) == 2
assert ray.get(outer) == 4
assert ray.get(result) == 4
assert ray.get(result_ref) == 4

If there are multiple tasks with the same name, a suffix with a counter ``_n`` will be added automatically.

Expand Down Expand Up @@ -193,7 +192,7 @@ For example,
for i in range(1, n):
x = simple.options(**workflow.options(name="step")).bind(x)

ret = workflow.create(x).run_async(workflow_id=workflow_id)
ret = workflow.run_async(x, workflow_id=workflow_id)
outputs = [workflow.get_output(workflow_id, name="step")]
for i in range(1, n):
outputs.append(workflow.get_output(workflow_id, name=f"step_{i}"))
Expand Down Expand Up @@ -229,7 +228,7 @@ The following error handling flags can be either set in the task decorator or vi

# Tries up to five times before giving up.
r1 = faulty_function.options(**workflow.options(max_retries=5)).bind()
workflow.create(r1).run()
workflow.run(r1)

@ray.remote
def handle_errors(result: Tuple[str, Exception]):
Expand All @@ -242,7 +241,7 @@ The following error handling flags can be either set in the task decorator or vi

# `handle_errors` receives a tuple of (result, exception).
r2 = faulty_function.options(**workflow.options(catch_exceptions=True)).bind()
workflow.create(handle_errors.bind(r2)).run()
workflow.run(handle_errors.bind(r2))

- If ``max_retries`` is given, the task will be retried for the given number of times if an exception is raised. It will only retry for the application level error. For system errors, it's controlled by ray. By default, ``max_retries`` is set to be 3.
- If ``catch_exceptions`` is True, the return value of the function will be converted to ``Tuple[Optional[T], Optional[Exception]]``. This can be combined with ``max_retries`` to try a given number of times before returning the result tuple.
Expand Down Expand Up @@ -278,7 +277,7 @@ Note that tasks that have side-effects still need to be idempotent. This is beca
return ticket

# UNSAFE: we could book multiple flight tickets
workflow.create(book_flight_unsafe.bind()).run()
workflow.run(book_flight_unsafe.bind())

.. code-block:: python
:caption: Idempotent workflow:
Expand All @@ -297,15 +296,50 @@ Note that tasks that have side-effects still need to be idempotent. This is beca

# SAFE: book_flight is written to be idempotent
request_id = generate_id.bind()
workflow.create(book_flight_idempotent.bind(request_id)).run()
workflow.run(book_flight_idempotent.bind(request_id))

Dynamic workflows
-----------------

Additional tasks can be dynamically created and inserted into the workflow DAG during execution.

This is achieved by returning a continuation of a DAG.
A continuation is something returned by a function and executed after it returns.

In our context, a continuation is basically a tail function call returned by a function. For example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also adds a sentence about how the continuation behaves in ray core engine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, I do not want to mention how a DAG is executed by default in Ray core engine. I feel this complicates the topic.


.. code-block:: python

def bar(): ...

def foo_1():
# Here we say 'foo_1()' returns a continuation.
# The continuation is made of 'bar()'
return bar()

def foo_2():
# This is NOT a continuation because we do not return it.
bar()

def foo_3():
# This is NOT a continuation because it is not a function call.
return 42

Continuations can be used to implement something more complex, for example, recursions:

.. code-block:: python

def factorial(n: int) -> int:
if n == 1:
return 1
else:
return multiply(n, factorial.bind(n - 1))

@ray.remote
def multiply(a: int, b: int) -> int:
return a * b

suquark marked this conversation as resolved.
Show resolved Hide resolved
assert factorial(10) == 3628800

The continuation feature enables nesting, looping, and recursion within workflows.

The following example shows how to implement the recursive ``factorial`` program using dynamically generated tasks:
Expand All @@ -323,10 +357,11 @@ The following example shows how to implement the recursive ``factorial`` program
def multiply(a: int, b: int) -> int:
return a * b

ret = workflow.create(factorial.bind(10))
assert ret.run() == 3628800
assert workflow.run(factorial.bind(10)) == 3628800

The key behavior to note is that when a task returns a ``Workflow`` output instead of a concrete value, that workflow's output will be substituted for the task's return. To better understand dynamic workflows, let's look at a more realistic example of booking a trip:
The key behavior to note is that when a task returns a continuation instead of a concrete value,
that continuation will be substituted for the task's return.
To better understand dynamic workflows, let's look at a more realistic example of booking a trip:

.. code-block:: python

Expand All @@ -342,18 +377,16 @@ The key behavior to note is that when a task returns a ``Workflow`` output inste
hotels: List[Hotel]) -> Receipt: ...

@ray.remote
def book_trip(origin: str, dest: str, dates) ->
"Workflow[Receipt]":
def book_trip(origin: str, dest: str, dates) -> Receipt:
# Note that the workflow engine will not begin executing
# child workflows until the parent task returns.
# This avoids task overlap and ensures recoverability.
f1: Workflow = book_flight.bind(origin, dest, dates[0])
f2: Workflow = book_flight.bind(dest, origin, dates[1])
hotel: Workflow = book_hotel.bind(dest, dates)
f1 = book_flight.bind(origin, dest, dates[0])
f2 = book_flight.bind(dest, origin, dates[1])
hotel = book_hotel.bind(dest, dates)
return workflow.continuation(finalize_or_cancel.bind([f1, f2], [hotel]))

fut = workflow.create(book_trip.bind("OAK", "SAN", ["6/12", "7/5"]))
fut.run() # returns Receipt(...)
receipt: Receipt = workflow.run(book_trip.bind("OAK", "SAN", ["6/12", "7/5"]))

Here the workflow initially just consists of the ``book_trip`` task. Once executed, ``book_trip`` generates tasks to book flights and hotels in parallel, which feeds into a task to decide whether to cancel the trip or finalize it. The DAG can be visualized as follows (note the dynamically generated nested workflows within ``book_trip``):

Expand All @@ -379,7 +412,8 @@ Workflows are compatible with Ray tasks and actors. There are two methods of usi

Passing nested arguments
~~~~~~~~~~~~~~~~~~~~~~~~
Like Ray tasks, when you pass a list of ``Workflow`` outputs to a task, the values are not resolved. But we ensure that all ancestors of a task are fully executed prior to the task starting:
Like Ray tasks, when you pass a list of task outputs to a task, the values are not resolved.
But we ensure that all ancestors of a task are fully executed prior to the task starting:

.. code-block:: python

Expand All @@ -395,7 +429,7 @@ Like Ray tasks, when you pass a list of ``Workflow`` outputs to a task, the valu
return 10

ret = add.bind([get_val.bind() for _ in range(3)])
assert workflow.create(ret).run() == 30
assert workflow.run(ret) == 30

Passing object references between tasks
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -412,7 +446,7 @@ Ray object references and data structures composed of them (e.g., ``ray.Dataset`
def add(a, b):
return do_add.remote(a, b)

workflow.create(add.bind(ray.put(10), ray.put(20))).run() == 30
workflow.run(add.bind(ray.put(10), ray.put(20))) == 30


Ray actor handles are not allowed to be passed between tasks.
Expand All @@ -428,4 +462,4 @@ You can assign resources (e.g., CPUs, GPUs to tasks via the same ``num_cpus``, `
def train_model() -> Model:
pass # This task is assigned a GPU by Ray.

workflow.create(train_model.bind()).run()
workflow.run(train_model.bind())
5 changes: 4 additions & 1 deletion doc/source/workflows/comparison.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ Workflows is built on top of Ray, and offers a mostly consistent subset of its A

``func.remote`` vs ``func.bind``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
With Ray tasks, ``func.remote`` will submit a remote task to run eagerly. In Ray workflows, ``func.bind`` is used to create a DAG, and the DAG is converted into a workflow. Execution of the workflow is deferred until ``.run(workflow_id="id")`` or ``.run_async(workflow_id="id")`` is called on the ``Workflow``. Specifying the workflow id allows for resuming of the workflow by its id in case of cluster failure.
With Ray tasks, ``func.remote`` will submit a remote task to run eagerly.
In Ray workflows, ``func.bind`` is used to create a DAG, and the DAG is converted into a workflow.
suquark marked this conversation as resolved.
Show resolved Hide resolved
Execution of the workflow is deferred until ``workflow.run(dag, workflow_id=...)`` or ``workflow.run_async(dag, workflow_id=...)`` is called on the DAG.
Specifying the workflow id allows for resuming of the workflow by its id in case of cluster failure.

Other Workflow Engines
----------------------
Expand Down
22 changes: 14 additions & 8 deletions doc/source/workflows/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,24 @@ Unlike Ray tasks, you are not allowed to call ``ray.get()`` or ``ray.wait()`` on
Workflows
~~~~~~~~~

It takes a single line of code to turn a DAG into a workflow DAG:
It takes a single line of code to run a workflow DAG:

.. code-block:: python
:caption: Turning the DAG into a workflow DAG:
:caption: Run a workflow DAG:

from ray import workflow

output: "Workflow[int]" = workflow.create(dag)
# Run the workflow until it completes and returns the output
assert workflow.run(dag) == 101

Execute the workflow DAG by ``<workflow>.run()`` or ``<workflow>.run_async()``. Once started, a workflow's execution is durably logged to storage. On system failure, workflows can be resumed on any Ray cluster with access to the storage.
# Or you can run it asynchronously and fetching the output via 'ray.get'
output_ref = workflow.run_async(dag)
assert ray.get(output_ref) == 101

When executing the workflow DAG, remote functions are retried on failure, but once they finish successfully and the results are persisted by the workflow engine, they will never be run again.

Once started, a workflow's execution is durably logged to storage. On system failure, workflows can be resumed on any Ray cluster with access to the storage.

When executing the workflow DAG, workflow tasks are retried on failure, but once they finish successfully and the results are persisted by the workflow engine, they will never be run again.

.. code-block:: python
:caption: Run the workflow:
Expand Down Expand Up @@ -108,7 +114,7 @@ Large data objects can be stored in the Ray object store. References to these ob
def concat(words: List[ray.ObjectRef]) -> str:
return " ".join([ray.get(w) for w in words])

assert workflow.create(concat.bind(words.bind())).run() == "hello world"
assert workflow.run(concat.bind(words.bind())) == "hello world"

Dynamic Workflows
~~~~~~~~~~~~~~~~~
Expand All @@ -130,7 +136,7 @@ The continuation feature enables nesting, looping, and recursion within workflow
# return a continuation of a DAG
return workflow.continuation(add.bind(fib.bind(n - 1), fib.bind(n - 2)))

assert workflow.create(fib.bind(10)).run() == 55
assert workflow.run(fib.bind(10)) == 55


Events
Expand All @@ -151,4 +157,4 @@ Workflows can be efficiently triggered by timers or external events using the ev
return args

# If a task's arguments include events, the task won't be executed until all of the events have occured.
workflow.create(gather.bind(sleep_task, event_task, "hello world")).run()
workflow.run(gather.bind(sleep_task, event_task, "hello world"))
2 changes: 1 addition & 1 deletion doc/source/workflows/events.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Workflow events are a special type of workflow task. They "finish" when the even
return args

# Gather will run after 60 seconds, when both event1 and event2 are done.
workflow.create(gather.bind(event1_task, event_2_task)).run()
workflow.run(gather.bind(event1_task, event_2_task))


Custom event listeners
Expand Down
12 changes: 6 additions & 6 deletions doc/source/workflows/metadata.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ For example:
def add(left: int, right: int) -> int:
return left + right

workflow.create(add.bind(10, 20)).run("add_example")
workflow.run(add.bind(10, 20), workflow_id="add_example")

workflow_metadata = workflow.get_metadata("add_example")

Expand All @@ -32,7 +32,7 @@ providing the task name:

.. code-block:: python

workflow.create(add.options(**workflow.options(name="add_task")).bind(10, 20)).run("add_example_2")
workflow.run(add.options(**workflow.options(name="add_task")).bind(10, 20), workflow_id="add_example_2")

task_metadata = workflow.get_metadata("add_example_2", name="add_task")

Expand All @@ -51,8 +51,8 @@ workflow or workflow task.

.. code-block:: python

workflow.create(add.options(**workflow.options(name="add_task", metadata={"task_k": "task_v"})).bind(10, 20))\
.run("add_example_3", metadata={"workflow_k": "workflow_v"})
workflow.run(add.options(**workflow.options(name="add_task", metadata={"task_k": "task_v"})).bind(10, 20)
workflow_id="add_example_3", metadata={"workflow_k": "workflow_v"})

assert workflow.get_metadata("add_example_3")["user_metadata"] == {"workflow_k": "workflow_v"}
assert workflow.get_metadata("add_example_3", name="add_task")["user_metadata"] == {"task_k": "task_v"}
Expand Down Expand Up @@ -92,7 +92,7 @@ is completed).
time.sleep(1000)
return 0

workflow.create(simple.bind()).run_async(workflow_id)
workflow.run_async(simple.bind(), workflow_id=workflow_id)

# make sure workflow task starts running
while not flag.exists():
Expand Down Expand Up @@ -126,7 +126,7 @@ be updated whenever a workflow is resumed.
return 0

with pytest.raises(ray.exceptions.RaySystemError):
workflow.create(simple.bind()).run(workflow_id)
workflow.run(simple.bind(), workflow_id=workflow_id)

workflow_metadata_failed = workflow.get_metadata(workflow_id)
assert workflow_metadata_failed["status"] == "FAILED"
Expand Down
7 changes: 7 additions & 0 deletions doc/source/workflows/package-ref.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Ray Workflows API
=================

Workflow Execution API
----------------------

.. autofunction:: ray.workflow.run
.. autofunction:: ray.workflow.run_async


Management API
--------------
.. autofunction:: ray.workflow.resume_all
Expand Down
20 changes: 11 additions & 9 deletions python/ray/workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from ray.workflow.api import (
init,
get_output,
get_status,
get_metadata,
run,
run_async,
continuation,
resume,
resume_all,
cancel,
list_all,
resume_all,
wait_for_event,
sleep,
delete,
create,
continuation,
get_output,
get_status,
get_metadata,
sleep,
wait_for_event,
options,
)
from ray.workflow.exceptions import (
Expand All @@ -23,6 +24,8 @@
from ray.workflow.event_listener import EventListener

__all__ = [
"run",
"run_async",
"resume",
"get_output",
"WorkflowError",
Expand All @@ -38,7 +41,6 @@
"sleep",
"EventListener",
"delete",
"create",
"continuation",
"options",
]
Expand Down
Loading