From b87ba3d352c07f6e3e477ee85f4248ba4544660c Mon Sep 17 00:00:00 2001 From: Siyuan Zhuang Date: Sat, 25 Jun 2022 17:14:51 -0700 Subject: [PATCH 1/4] deprecate workflow.create --- python/ray/workflow/__init__.py | 20 +- python/ray/workflow/api.py | 172 +++++++++--------- .../comparisons/airflow/etl_workflow.py | 2 +- .../comparisons/argo/conditionals_workflow.py | 3 +- .../examples/comparisons/argo/dag_workflow.py | 2 +- .../comparisons/argo/exit_handler_workflow.py | 2 +- .../comparisons/argo/hello_world_workflow.py | 2 +- .../comparisons/argo/loops_workflow.py | 2 +- .../comparisons/argo/multi_step_workflow.py | 2 +- .../comparisons/argo/recursion_workflow.py | 2 +- .../comparisons/argo/retry_workflow.py | 9 +- .../cadence/file_processing_workflow.py | 2 +- .../cadence/sub_workflow_workflow.py | 3 +- .../cadence/trip_booking_workflow.py | 2 +- .../concat_array_workflow.py | 2 +- .../data_cond_workflow.py | 2 +- .../sub_workflows_workflow.py | 2 +- .../comparisons/metaflow/foreach_workflow.py | 2 +- .../prefect/compute_fib_workflow.py | 2 +- .../ray/workflow/examples/function_chain.py | 5 +- .../workflow/tests/test_basic_workflows.py | 76 ++++---- .../workflow/tests/test_basic_workflows_2.py | 52 +++--- .../workflow/tests/test_basic_workflows_3.py | 20 +- .../ray/workflow/tests/test_cancellation.py | 3 +- python/ray/workflow/tests/test_checkpoint.py | 21 ++- .../ray/workflow/tests/test_checkpoint_2.py | 15 +- .../workflow/tests/test_complex_workflow.py | 5 +- .../workflow/tests/test_dag_to_workflow.py | 22 ++- python/ray/workflow/tests/test_dataset.py | 6 +- .../tests/test_dynamic_workflow_ref.py | 11 +- python/ray/workflow/tests/test_events.py | 20 +- .../workflow/tests/test_large_intermediate.py | 2 +- python/ray/workflow/tests/test_lifetime.py | 2 +- python/ray/workflow/tests/test_metadata.py | 28 +-- .../ray/workflow/tests/test_object_deref.py | 14 +- python/ray/workflow/tests/test_recovery.py | 24 ++- .../ray/workflow/tests/test_serialization.py | 10 +- .../workflow/tests/test_signature_check.py | 16 +- python/ray/workflow/tests/test_storage.py | 8 +- .../workflow/tests/test_storage_failure.py | 2 +- .../workflow/tests/test_variable_mutable.py | 2 +- .../workflow/tests/test_workflow_manager.py | 3 +- 42 files changed, 299 insertions(+), 303 deletions(-) diff --git a/python/ray/workflow/__init__.py b/python/ray/workflow/__init__.py index 4e1a684678121..448ce2f881ae2 100644 --- a/python/ray/workflow/__init__.py +++ b/python/ray/workflow/__init__.py @@ -1,17 +1,18 @@ from ray.workflow.api import ( init, - get_output, - get_status, - get_metadata, + run, + run_async, + continuation, resume, + resume_all, cancel, list_all, - resume_all, - wait_for_event, - sleep, delete, - create, - continuation, + get_output, + get_status, + get_metadata, + sleep, + wait_for_event, options, ) from ray.workflow.exceptions import ( @@ -23,6 +24,8 @@ from ray.workflow.event_listener import EventListener __all__ = [ + "run", + "run_async", "resume", "get_output", "WorkflowError", @@ -38,7 +41,6 @@ "sleep", "EventListener", "delete", - "create", "continuation", "options", ] diff --git a/python/ray/workflow/api.py b/python/ray/workflow/api.py index 0faa9dfe4d512..c88fdde94eed3 100644 --- a/python/ray/workflow/api.py +++ b/python/ray/workflow/api.py @@ -58,6 +58,81 @@ def _ensure_workflow_initialized() -> None: init() +@PublicAPI(stability="beta") +def run( + dag_node: DAGNode, + *args, + workflow_id: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + **kwargs, +) -> Any: + """Run a workflow. + + If the workflow with the given id already exists, it will be resumed. + + Examples: + >>> import ray + >>> from ray import workflow + >>> Flight, Reservation, Trip = ... # doctest: +SKIP + >>> @ray.remote # doctest: +SKIP + ... def book_flight(origin: str, dest: str) -> Flight: # doctest: +SKIP + ... return Flight(...) # doctest: +SKIP + >>> @ray.remote # doctest: +SKIP + ... def book_hotel(location: str) -> Reservation: # doctest: +SKIP + ... return Reservation(...) # doctest: +SKIP + >>> @ray.remote # doctest: +SKIP + ... def finalize_trip(bookings: List[Any]) -> Trip: # doctest: +SKIP + ... return Trip(...) # doctest: +SKIP + + >>> flight1 = book_flight.bind("OAK", "SAN") # doctest: +SKIP + >>> flight2 = book_flight.bind("SAN", "OAK") # doctest: +SKIP + >>> hotel = book_hotel.bind("SAN") # doctest: +SKIP + >>> trip = finalize_trip.bind([flight1, flight2, hotel]) # doctest: +SKIP + >>> result = workflow.run(trip) # doctest: +SKIP + + Args: + workflow_id: A unique identifier that can be used to resume the + workflow. If not specified, a random id will be generated. + metadata: The metadata to add to the workflow. It has to be able + to serialize to json. + + Returns: + The running result. + """ + return ray.get( + run_async(dag_node, *args, workflow_id=workflow_id, metadata=metadata, **kwargs) + ) + + +@PublicAPI(stability="beta") +def run_async( + dag_node: DAGNode, + *args, + workflow_id: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + **kwargs, +) -> ray.ObjectRef: + """Run a workflow asynchronously. + + If the workflow with the given id already exists, it will be resumed. + + Args: + workflow_id: A unique identifier that can be used to resume the + workflow. If not specified, a random id will be generated. + metadata: The metadata to add to the workflow. It has to be able + to serialize to json. + + Returns: + The running result as ray.ObjectRef. + + """ + _ensure_workflow_initialized() + if not isinstance(dag_node, DAGNode): + raise TypeError("Input should be a DAG.") + input_data = DAGInputData(*args, **kwargs) + return execution.run(dag_node, input_data, workflow_id, metadata) + + @PublicAPI(stability="beta") def resume(workflow_id: str) -> ray.ObjectRef: """Resume a workflow. @@ -174,7 +249,7 @@ def resume_all(include_failed: bool = False) -> Dict[str, ray.ObjectRef]: This can be used after cluster restart to resume all tasks. Args: - with_failed: Whether to resume FAILED workflows. + include_failed: Whether to resume FAILED workflows. Examples: >>> from ray import workflow @@ -378,87 +453,6 @@ def delete(workflow_id: str) -> None: wf_storage.delete_workflow() -@PublicAPI(stability="beta") -def create(dag_node: "DAGNode", *args, **kwargs) -> "DAGNode": - """Converts a DAG into a workflow. - - TODO(suquark): deprecate this API. - - Examples: - >>> import ray - >>> from ray import workflow - >>> Flight, Reservation, Trip = ... # doctest: +SKIP - >>> @ray.remote # doctest: +SKIP - ... def book_flight(origin: str, dest: str) -> Flight: # doctest: +SKIP - ... return Flight(...) # doctest: +SKIP - >>> @ray.remote # doctest: +SKIP - ... def book_hotel(location: str) -> Reservation: # doctest: +SKIP - ... return Reservation(...) # doctest: +SKIP - >>> @ray.remote # doctest: +SKIP - ... def finalize_trip(bookings: List[Any]) -> Trip: # doctest: +SKIP - ... return Trip(...) # doctest: +SKIP - - >>> flight1 = book_flight.bind("OAK", "SAN") # doctest: +SKIP - >>> flight2 = book_flight.bind("SAN", "OAK") # doctest: +SKIP - >>> hotel = book_hotel.bind("SAN") # doctest: +SKIP - >>> trip = finalize_trip.bind([flight1, flight2, hotel]) # doctest: +SKIP - >>> result = workflow.create(trip).run() # doctest: +SKIP - - Args: - dag_node: The DAG to be converted. - args: Positional arguments of the DAG input node. - kwargs: Keyword arguments of the DAG input node. - """ - if not isinstance(dag_node, DAGNode): - raise TypeError("Input should be a DAG.") - input_data = DAGInputData(*args, **kwargs) - - def run_async( - workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None - ): - """Run a workflow asynchronously. - - If the workflow with the given id already exists, it will be resumed. - - Args: - workflow_id: A unique identifier that can be used to resume the - workflow. If not specified, a random id will be generated. - metadata: The metadata to add to the workflow. It has to be able - to serialize to json. - - Returns: - The running result as ray.ObjectRef. - - """ - _ensure_workflow_initialized() - return execution.run(dag_node, input_data, workflow_id, metadata) - - def run( - workflow_id: Optional[str] = None, - metadata: Optional[Dict[str, Any]] = None, - ) -> Any: - """Run a workflow. - - If the workflow with the given id already exists, it will be resumed. - - - - Args: - workflow_id: A unique identifier that can be used to resume the - workflow. If not specified, a random id will be generated. - metadata: The metadata to add to the workflow. It has to be able - to serialize to json. - - Returns: - The running result. - """ - return ray.get(run_async(workflow_id, metadata)) - - dag_node.run_async = run_async - dag_node.run = run - return dag_node - - @PublicAPI(stability="beta") def continuation(dag_node: "DAGNode") -> Union["DAGNode", Any]: """Converts a DAG into a continuation. @@ -476,7 +470,7 @@ def continuation(dag_node: "DAGNode") -> Union["DAGNode", Any]: raise TypeError("Input should be a DAG.") if in_workflow_execution(): - return create(dag_node) + return dag_node return ray.get(dag_node.execute()) @@ -535,11 +529,19 @@ def __call__(self, f: RemoteFunction) -> RemoteFunction: __all__ = ( + "init", + "run", + "run_async", + "continuation", "resume", - "get_output", "resume_all", + "cancel", + "list_all", + "delete", + "get_output", "get_status", "get_metadata", - "cancel", + "sleep", + "wait_for_event", "options", ) diff --git a/python/ray/workflow/examples/comparisons/airflow/etl_workflow.py b/python/ray/workflow/examples/comparisons/airflow/etl_workflow.py index 5c4168db6bde0..4e1c99e474216 100644 --- a/python/ray/workflow/examples/comparisons/airflow/etl_workflow.py +++ b/python/ray/workflow/examples/comparisons/airflow/etl_workflow.py @@ -29,4 +29,4 @@ def load(data_dict: dict) -> str: order_data = extract.bind() order_summary = transform.bind(order_data) etl = load.bind(order_summary) - print(workflow.create(etl).run()) + print(workflow.run(etl)) diff --git a/python/ray/workflow/examples/comparisons/argo/conditionals_workflow.py b/python/ray/workflow/examples/comparisons/argo/conditionals_workflow.py index f4d7833393c83..d53b414af85bf 100644 --- a/python/ray/workflow/examples/comparisons/argo/conditionals_workflow.py +++ b/python/ray/workflow/examples/comparisons/argo/conditionals_workflow.py @@ -26,5 +26,4 @@ def decide(heads: bool) -> str: if __name__ == "__main__": - workflow.init() - print(workflow.create(flip_coin.bind()).run()) + print(workflow.run(flip_coin.bind())) diff --git a/python/ray/workflow/examples/comparisons/argo/dag_workflow.py b/python/ray/workflow/examples/comparisons/argo/dag_workflow.py index d3c49f7b6f1a0..a8e28634c6c30 100644 --- a/python/ray/workflow/examples/comparisons/argo/dag_workflow.py +++ b/python/ray/workflow/examples/comparisons/argo/dag_workflow.py @@ -12,4 +12,4 @@ def echo(msg: str, *deps) -> None: B = echo.options(**workflow.options(name="B")).bind("B", A) C = echo.options(**workflow.options(name="C")).bind("C", A) D = echo.options(**workflow.options(name="D")).bind("D", A, B) - workflow.create(D).run() + workflow.run(D) diff --git a/python/ray/workflow/examples/comparisons/argo/exit_handler_workflow.py b/python/ray/workflow/examples/comparisons/argo/exit_handler_workflow.py index 847909d2044f7..f21b2bd3d71f4 100644 --- a/python/ray/workflow/examples/comparisons/argo/exit_handler_workflow.py +++ b/python/ray/workflow/examples/comparisons/argo/exit_handler_workflow.py @@ -42,4 +42,4 @@ def wait_all(*deps): if __name__ == "__main__": res = intentional_fail.options(**workflow.options(catch_exceptions=True)).bind() - print(workflow.create(exit_handler.bind(res)).run()) + print(workflow.run(exit_handler.bind(res))) diff --git a/python/ray/workflow/examples/comparisons/argo/hello_world_workflow.py b/python/ray/workflow/examples/comparisons/argo/hello_world_workflow.py index a40da6e95e343..9593a3a38a544 100644 --- a/python/ray/workflow/examples/comparisons/argo/hello_world_workflow.py +++ b/python/ray/workflow/examples/comparisons/argo/hello_world_workflow.py @@ -9,4 +9,4 @@ def hello(msg: str) -> None: if __name__ == "__main__": - workflow.create(hello.bind("hello world")).run() + workflow.run(hello.bind("hello world")) diff --git a/python/ray/workflow/examples/comparisons/argo/loops_workflow.py b/python/ray/workflow/examples/comparisons/argo/loops_workflow.py index 97171512f4d87..d68ace20a40db 100644 --- a/python/ray/workflow/examples/comparisons/argo/loops_workflow.py +++ b/python/ray/workflow/examples/comparisons/argo/loops_workflow.py @@ -16,4 +16,4 @@ def wait_all(*args) -> None: children = [] for msg in ["hello world", "goodbye world"]: children.append(hello.bind(msg)) - workflow.create(wait_all.bind(*children)).run() + workflow.run(wait_all.bind(*children)) diff --git a/python/ray/workflow/examples/comparisons/argo/multi_step_workflow.py b/python/ray/workflow/examples/comparisons/argo/multi_step_workflow.py index 3c8761188ae1d..b7d969a335e63 100644 --- a/python/ray/workflow/examples/comparisons/argo/multi_step_workflow.py +++ b/python/ray/workflow/examples/comparisons/argo/multi_step_workflow.py @@ -16,4 +16,4 @@ def wait_all(*args) -> None: h1 = hello.options(**workflow.options(name="hello1")).bind("hello1") h2a = hello.options(**workflow.options(name="hello2a")).bind("hello2a") h2b = hello.options(**workflow.options(name="hello2b")).bind("hello2b", h2a) - workflow.create(wait_all.bind(h1, h2b)).run() + workflow.run(wait_all.bind(h1, h2b)) diff --git a/python/ray/workflow/examples/comparisons/argo/recursion_workflow.py b/python/ray/workflow/examples/comparisons/argo/recursion_workflow.py index 67cc9da81c2ba..efb057588aa6c 100644 --- a/python/ray/workflow/examples/comparisons/argo/recursion_workflow.py +++ b/python/ray/workflow/examples/comparisons/argo/recursion_workflow.py @@ -28,4 +28,4 @@ def decide(heads: bool) -> str: if __name__ == "__main__": - print(workflow.create(flip_coin.bind()).run()) + print(workflow.run(flip_coin.bind())) diff --git a/python/ray/workflow/examples/comparisons/argo/retry_workflow.py b/python/ray/workflow/examples/comparisons/argo/retry_workflow.py index e6ea6dc80b99e..603d9d6e98570 100644 --- a/python/ray/workflow/examples/comparisons/argo/retry_workflow.py +++ b/python/ray/workflow/examples/comparisons/argo/retry_workflow.py @@ -37,12 +37,7 @@ def handle_result(res: Tuple[Optional[str], Optional[Exception]]) -> str: if __name__ == "__main__": - workflow.init() # Default retry strategy. - print( - workflow.create( - flaky_step.options(**workflow.options(max_retries=10)).bind() - ).run() - ) + print(workflow.run(flaky_step.options(**workflow.options(max_retries=10)).bind())) # Custom strategy. - print(workflow.create(custom_retry_strategy.bind(flaky_step, 10, 1)).run()) + print(workflow.run(custom_retry_strategy.bind(flaky_step, 10, 1))) diff --git a/python/ray/workflow/examples/comparisons/cadence/file_processing_workflow.py b/python/ray/workflow/examples/comparisons/cadence/file_processing_workflow.py index 3dea0c7327688..865b6ff0f2c42 100644 --- a/python/ray/workflow/examples/comparisons/cadence/file_processing_workflow.py +++ b/python/ray/workflow/examples/comparisons/cadence/file_processing_workflow.py @@ -58,4 +58,4 @@ def download_one(url: str) -> str: if __name__ == "__main__": res = download_all.bind(FILES_TO_PROCESS) - workflow.create(res).run() + workflow.run(res) diff --git a/python/ray/workflow/examples/comparisons/cadence/sub_workflow_workflow.py b/python/ray/workflow/examples/comparisons/cadence/sub_workflow_workflow.py index 3ecf9d2105c27..ca82911f77127 100644 --- a/python/ray/workflow/examples/comparisons/cadence/sub_workflow_workflow.py +++ b/python/ray/workflow/examples/comparisons/cadence/sub_workflow_workflow.py @@ -13,5 +13,4 @@ def main_workflow(name: str) -> str: if __name__ == "__main__": - wf = workflow.create(main_workflow.bind("Alice")) - print(wf.run()) + print(workflow.run(main_workflow.bind("Alice"))) diff --git a/python/ray/workflow/examples/comparisons/cadence/trip_booking_workflow.py b/python/ray/workflow/examples/comparisons/cadence/trip_booking_workflow.py index 3998a2b89d752..68624a28dae65 100644 --- a/python/ray/workflow/examples/comparisons/cadence/trip_booking_workflow.py +++ b/python/ray/workflow/examples/comparisons/cadence/trip_booking_workflow.py @@ -89,4 +89,4 @@ def cancel(request_id: str) -> None: final_result = handle_errors.bind( car_req_id, hotel_req_id, flight_req_id, saga_result ) - print(workflow.create(final_result).run()) + print(workflow.run(final_result)) diff --git a/python/ray/workflow/examples/comparisons/google_cloud_workflows/concat_array_workflow.py b/python/ray/workflow/examples/comparisons/google_cloud_workflows/concat_array_workflow.py index e2dad6d629296..eab1f1693927d 100644 --- a/python/ray/workflow/examples/comparisons/google_cloud_workflows/concat_array_workflow.py +++ b/python/ray/workflow/examples/comparisons/google_cloud_workflows/concat_array_workflow.py @@ -12,4 +12,4 @@ def iterate(array: List[str], result: str, i: int) -> str: if __name__ == "__main__": - print(workflow.create(iterate.bind(["foo", "ba", "r"], "", 0)).run()) + print(workflow.run(iterate.bind(["foo", "ba", "r"], "", 0))) diff --git a/python/ray/workflow/examples/comparisons/google_cloud_workflows/data_cond_workflow.py b/python/ray/workflow/examples/comparisons/google_cloud_workflows/data_cond_workflow.py index 042ac3a2c5951..0ec7e77065b0a 100644 --- a/python/ray/workflow/examples/comparisons/google_cloud_workflows/data_cond_workflow.py +++ b/python/ray/workflow/examples/comparisons/google_cloud_workflows/data_cond_workflow.py @@ -38,4 +38,4 @@ def decide(result: int) -> str: if __name__ == "__main__": - print(workflow.create(decide.bind(get_size.bind())).run()) + print(workflow.run(decide.bind(get_size.bind()))) diff --git a/python/ray/workflow/examples/comparisons/google_cloud_workflows/sub_workflows_workflow.py b/python/ray/workflow/examples/comparisons/google_cloud_workflows/sub_workflows_workflow.py index 425b6d301328e..e1209100259e5 100644 --- a/python/ray/workflow/examples/comparisons/google_cloud_workflows/sub_workflows_workflow.py +++ b/python/ray/workflow/examples/comparisons/google_cloud_workflows/sub_workflows_workflow.py @@ -20,4 +20,4 @@ def report(msg: str) -> None: if __name__ == "__main__": r1 = hello.bind("Kristof") r2 = report.bind(r1) - workflow.create(r2).run() + workflow.run(r2) diff --git a/python/ray/workflow/examples/comparisons/metaflow/foreach_workflow.py b/python/ray/workflow/examples/comparisons/metaflow/foreach_workflow.py index 931f0d13a612b..2228c8366628c 100644 --- a/python/ray/workflow/examples/comparisons/metaflow/foreach_workflow.py +++ b/python/ray/workflow/examples/comparisons/metaflow/foreach_workflow.py @@ -22,4 +22,4 @@ def end(results: "List[ray.ObjectRef[str]]") -> str: if __name__ == "__main__": - workflow.create(start.bind()).run() + workflow.run(start.bind()) diff --git a/python/ray/workflow/examples/comparisons/prefect/compute_fib_workflow.py b/python/ray/workflow/examples/comparisons/prefect/compute_fib_workflow.py index 824f2cea958f9..5dfcea989a1c4 100644 --- a/python/ray/workflow/examples/comparisons/prefect/compute_fib_workflow.py +++ b/python/ray/workflow/examples/comparisons/prefect/compute_fib_workflow.py @@ -15,4 +15,4 @@ def compute_large_fib(M: int, n: int = 1, fib: int = 1): if __name__ == "__main__": - assert workflow.create(compute_large_fib.bind(100)).run() == 89 + assert workflow.run(compute_large_fib.bind(100)) == 89 diff --git a/python/ray/workflow/examples/function_chain.py b/python/ray/workflow/examples/function_chain.py index 04477715ed03f..9e8491887b745 100644 --- a/python/ray/workflow/examples/function_chain.py +++ b/python/ray/workflow/examples/function_chain.py @@ -79,7 +79,7 @@ def add(i: int, v: int): ray.workflow.delete(workflow_id) except Exception: pass - assert ray.workflow.create(pipeline(10)).run(workflow_id=workflow_id) == 20 + assert ray.workflow.run(pipeline(10), workflow_id=workflow_id) == 20 pipeline = function_compose( [ @@ -99,5 +99,4 @@ def add(i: int, v: int): ray.workflow.delete(workflow_id) except Exception: pass - wf = ray.workflow.create(pipeline(10)) - assert wf.run(workflow_id=workflow_id) == (14, 15, 15, 16) + assert ray.workflow.run(pipeline(10), workflow_id=workflow_id) == (14, 15, 15, 16) diff --git a/python/ray/workflow/tests/test_basic_workflows.py b/python/ray/workflow/tests/test_basic_workflows.py index 96a1bc6c83968..88c9f455ccc34 100644 --- a/python/ray/workflow/tests/test_basic_workflows.py +++ b/python/ray/workflow/tests/test_basic_workflows.py @@ -75,23 +75,21 @@ def factorial(n): return workflow.continuation(mul.bind(n, factorial.bind(n - 1))) # This test also shows different "style" of running workflows. - assert ( - workflow.create(simple_sequential.bind()).run() == "[source1][append1][append2]" - ) + assert workflow.run(simple_sequential.bind()) == "[source1][append1][append2]" wf = simple_sequential_with_input.bind("start:") - assert workflow.create(wf).run() == "start:[append1][append2]" + assert workflow.run(wf) == "start:[append1][append2]" wf = loop_sequential.bind(3) - assert workflow.create(wf).run() == "[source1]" + "[append1]" * 3 + "[append2]" + assert workflow.run(wf) == "[source1]" + "[append1]" * 3 + "[append2]" wf = nested.bind("nested:") - assert workflow.create(wf).run() == "nested:~[nested]~[append1][append2]" + assert workflow.run(wf) == "nested:~[nested]~[append1][append2]" wf = fork_join.bind() - assert workflow.create(wf).run() == "join([source1][append1], [source1][append2])" + assert workflow.run(wf) == "join([source1][append1], [source1][append2])" - assert workflow.create(factorial.bind(10)).run() == 3628800 + assert workflow.run(factorial.bind(10)) == 3628800 def test_async_execution(workflow_start_regular_shared): @@ -101,7 +99,7 @@ def blocking(): return 314 start = time.time() - output = workflow.create(blocking.bind()).run_async() + output = workflow.run_async(blocking.bind()) duration = time.time() - start assert duration < 5 # workflow.run is not blocked assert ray.get(output) == 314 @@ -134,7 +132,7 @@ def chain_func(*args, **kw_argv): wf_step = workflow.step(fs[i]).step(wf_step) return wf_step - assert workflow.create(chain_func.bind(1)).run() == 7 + assert workflow.run(chain_func.bind(1)) == 7 def test_run_or_resume_during_running(workflow_start_regular_shared): @@ -156,13 +154,11 @@ def simple_sequential(): y = append1.bind(x) return workflow.continuation(append2.bind(y)) - output = workflow.create(simple_sequential.bind()).run_async( - workflow_id="running_workflow" + output = workflow.run_async( + simple_sequential.bind(), workflow_id="running_workflow" ) with pytest.raises(RuntimeError): - workflow.create(simple_sequential.bind()).run_async( - workflow_id="running_workflow" - ) + workflow.run_async(simple_sequential.bind(), workflow_id="running_workflow") with pytest.raises(RuntimeError): workflow.resume(workflow_id="running_workflow") assert ray.get(output) == "[source1][append1][append2]" @@ -180,33 +176,28 @@ def unstable_step(): return v with pytest.raises(Exception): - workflow.create( + workflow.run_async( unstable_step.options(**workflow.options(max_retries=-2).bind()) ) with pytest.raises(Exception): - workflow.create( - unstable_step.options(**workflow.options(max_retries=2)).bind() - ).run() - assert ( - 10 - == workflow.create( - unstable_step.options(**workflow.options(max_retries=7)).bind() - ).run() + workflow.run(unstable_step.options(**workflow.options(max_retries=2)).bind()) + assert 10 == workflow.run( + unstable_step.options(**workflow.options(max_retries=7)).bind() ) (tmp_path / "test").write_text("0") - (ret, err) = workflow.create( + (ret, err) = workflow.run( unstable_step.options( **workflow.options(max_retries=2, catch_exceptions=True) ).bind() - ).run() + ) assert ret is None assert isinstance(err, ValueError) - (ret, err) = workflow.create( + (ret, err) = workflow.run( unstable_step.options( **workflow.options(max_retries=7, catch_exceptions=True) ).bind() - ).run() + ) assert ret == 10 assert err is None @@ -223,7 +214,7 @@ def unstable_step(): raise ValueError("Invalid") return v - assert workflow.create(unstable_step.bind()).run() == 10 + assert workflow.run(unstable_step.bind()) == 10 (tmp_path / "test").write_text("0") @@ -236,7 +227,7 @@ def unstable_step_exception(): raise ValueError("Invalid") return v - (ret, err) = workflow.create(unstable_step_exception.bind()).run() + (ret, err) = workflow.run(unstable_step_exception.bind()) assert ret is None assert err is not None @@ -251,7 +242,7 @@ def unstable_step_exception(): raise ValueError("Invalid") return v - (ret, err) = workflow.create(unstable_step_exception.bind()).run() + (ret, err) = workflow.run(unstable_step_exception.bind()) assert ret is None assert err is not None assert (tmp_path / "test").read_text() == "4" @@ -266,9 +257,9 @@ def f2(): def f1(): return workflow.continuation(f2.bind()) - assert (10, None) == workflow.create( + assert (10, None) == workflow.run( f1.options(**workflow.options(catch_exceptions=True)).bind() - ).run() + ) def test_nested_catch_exception_2(workflow_start_regular_shared): @@ -279,9 +270,9 @@ def f1(n): else: return workflow.continuation(f1.bind(n - 1)) - ret, err = workflow.create( + ret, err = workflow.run( f1.options(**workflow.options(catch_exceptions=True)).bind(5) - ).run() + ) assert ret is None assert isinstance(err, ValueError) @@ -309,15 +300,15 @@ def f1(exc): else: return workflow.continuation(f2.bind(f3.bind())) - ret, err = workflow.create( + ret, err = workflow.run( f1.options(**workflow.options(catch_exceptions=True)).bind(True) - ).run() + ) assert ret is None assert isinstance(err, ValueError) - assert (10, None) == workflow.create( + assert (10, None) == workflow.run( f1.options(**workflow.options(catch_exceptions=True)).bind(False) - ).run() + ) def test_dynamic_output(workflow_start_regular_shared): @@ -336,9 +327,10 @@ def exponential_fail(k, n): # When workflow fails, the dynamic output should points to the # latest successful step. try: - workflow.create( - exponential_fail.options(**workflow.options(name="step_0")).bind(3, 10) - ).run(workflow_id="dynamic_output") + workflow.run( + exponential_fail.options(**workflow.options(name="step_0")).bind(3, 10), + workflow_id="dynamic_output", + ) except Exception: pass from ray.workflow.workflow_storage import get_workflow_storage diff --git a/python/ray/workflow/tests/test_basic_workflows_2.py b/python/ray/workflow/tests/test_basic_workflows_2.py index 8b04fb64c4683..010cbfec02d2b 100644 --- a/python/ray/workflow/tests/test_basic_workflows_2.py +++ b/python/ray/workflow/tests/test_basic_workflows_2.py @@ -33,7 +33,7 @@ def remote_run(): lock = FileLock(lock_path) lock.acquire() - ret = workflow.create(step_run.options(num_cpus=2).bind()).run_async() + ret = workflow.run_async(step_run.options(num_cpus=2).bind()) ray.wait([signal_actor.wait.remote()]) obj = remote_run.remote() with pytest.raises(ray.exceptions.GetTimeoutError): @@ -48,7 +48,7 @@ def test_get_output_1(workflow_start_regular, tmp_path): def simple(v): return v - assert 0 == workflow.create(simple.bind(0)).run("simple") + assert 0 == workflow.run(simple.bind(0), workflow_id="simple") assert 0 == ray.get(workflow.get_output("simple")) @@ -62,7 +62,7 @@ def simple(v): return v lock.acquire() - obj = workflow.create(simple.bind(0)).run_async("simple") + obj = workflow.run(simple.bind(0), workflow_id="simple") obj2 = workflow.get_output("simple") lock.release() assert ray.get([obj, obj2]) == [0, 0] @@ -83,8 +83,8 @@ def incr(): return 10 with pytest.raises(workflow.WorkflowExecutionError): - workflow.create(incr.options(**workflow.options(max_retries=0)).bind()).run( - "incr" + workflow.run( + incr.options(**workflow.options(max_retries=0)).bind(), workflow_id="incr" ) assert cnt_file.read_text() == "1" @@ -121,9 +121,10 @@ def recursive(n): workflow_id = "test_get_output_4" lock.acquire() - obj = workflow.create( - recursive.options(**workflow.options(name="10")).bind(10) - ).run_async(workflow_id) + obj = workflow.run_async( + recursive.options(**workflow.options(name="10")).bind(10), + workflow_id=workflow_id, + ) outputs = [workflow.get_output(workflow_id, name=str(i)) for i in range(11)] outputs.append(obj) @@ -148,7 +149,7 @@ def simple(): outputs = [] for i in range(20): - workflow.create(simple.bind()).run_async(workflow_id.format(i)) + workflow.run_async(simple.bind(), workflow_id=workflow_id.format(i)) outputs.append(workflow.get_output(workflow_id.format(i))) assert ray.get(outputs) == [314] * len(outputs) @@ -161,7 +162,7 @@ def double(v): inner_task = double.options(**workflow.options(name="inner")).bind(1) outer_task = double.options(**workflow.options(name="outer")).bind(inner_task) - result = workflow.create(outer_task).run_async("double") + result = workflow.run_async(outer_task, workflow_id="double") inner = workflow.get_output("double", name="inner") outer = workflow.get_output("double", name="outer") @@ -177,7 +178,7 @@ def double_2(s): inner_task = double_2.bind(1) outer_task = double_2.bind(inner_task) workflow_id = "double_2" - result = workflow.create(outer_task).run_async(workflow_id) + result = workflow.run_async(outer_task, workflow_id=workflow_id) inner = workflow.get_output(workflow_id, name="double") outer = workflow.get_output(workflow_id, name="double_1") @@ -199,7 +200,7 @@ def simple(): with FileLock(lock_path): dag = simple.options(**workflow.options(name="simple")).bind() - ret = workflow.create(dag).run_async(workflow_id=workflow_id) + ret = workflow.run_async(dag, workflow_id=workflow_id) exist = workflow.get_output(workflow_id, name="simple") non_exist = workflow.get_output(workflow_id, name="non_exist") @@ -215,11 +216,12 @@ def double(v): return 2 * v # Get the result from named step after workflow finished - assert 4 == workflow.create( + assert 4 == workflow.run( double.options(**workflow.options(name="outer")).bind( double.options(**workflow.options(name="inner")).bind(1) - ) - ).run("double") + ), + workflow_id="double", + ) assert ray.get(workflow.get_output("double", name="inner")) == 2 assert ray.get(workflow.get_output("double", name="outer")) == 4 @@ -237,12 +239,13 @@ def double(v, lock=None): lock_path = str(tmp_path / "lock") lock = FileLock(lock_path) lock.acquire() - output = workflow.create( + output = workflow.run_async( double.options(**workflow.options(name="outer")).bind( double.options(**workflow.options(name="inner")).bind(1, lock_path), lock_path, - ) - ).run_async("double-2") + ), + workflow_id="double-2", + ) inner = workflow.get_output("double-2", name="inner") outer = workflow.get_output("double-2", name="outer") @@ -276,11 +279,12 @@ def double(v, error): # Force it to fail for the outer step with pytest.raises(Exception): - workflow.create( + workflow.run( double.options(**workflow.options(name="outer")).bind( double.options(**workflow.options(name="inner")).bind(1, False), True - ) - ).run("double") + ), + workflow_id="double", + ) # For the inner step, it should have already been executed. assert 2 == ray.get(workflow.get_output("double", name="inner")) @@ -298,7 +302,7 @@ def factorial(n, r=1): import math - assert math.factorial(5) == workflow.create(factorial.bind(5)).run("factorial") + assert math.factorial(5) == workflow.run(factorial.bind(5), workflow_id="factorial") for i in range(5): step_name = ( "test_basic_workflows_2.test_get_named_step_default.locals.factorial" @@ -319,7 +323,7 @@ def f(n, dep): inner = f.bind(10, None) outer = f.bind(20, inner) - assert 20 == workflow.create(outer).run("duplicate") + assert 20 == workflow.run(outer, workflow_id="duplicate") # The outer will be checkpointed first. So there is no suffix for the name assert ray.get(workflow.get_output("duplicate", name="f")) == 10 # The inner will be checkpointed after the outer. And there is a duplicate @@ -332,7 +336,7 @@ def test_no_init_run(shutdown_only): def f(): pass - workflow.create(f.bind()).run() + workflow.run(f.bind()) def test_no_init_api(shutdown_only): diff --git a/python/ray/workflow/tests/test_basic_workflows_3.py b/python/ray/workflow/tests/test_basic_workflows_3.py index 75b3e5090adec..dce601a50b80b 100644 --- a/python/ray/workflow/tests/test_basic_workflows_3.py +++ b/python/ray/workflow/tests/test_basic_workflows_3.py @@ -16,10 +16,10 @@ def f(): v = int(counter.read_text()) + 1 counter.write_text(str(v)) - workflow.create(f.bind()).run("abc") + workflow.run(f.bind(), workflow_id="abc") assert counter.read_text() == "1" # This will not rerun the job from beginning - workflow.create(f.bind()).run("abc") + workflow.run(f.bind(), workflow_id="abc") assert counter.read_text() == "1" @@ -37,8 +37,7 @@ def f1(): def f2(*w): pass - f = workflow.create(f2.bind(*[f1.bind() for _ in range(10)])) - f.run() + workflow.run(f2.bind(*[f1.bind() for _ in range(10)])) def test_dedupe_indirect(workflow_start_regular_shared, tmp_path): @@ -66,11 +65,11 @@ def join(*a): a = incr.bind() i1 = identity.bind(a) i2 = identity.bind(a) - assert "1" == workflow.create(join.bind(i1, i2)).run() - assert "2" == workflow.create(join.bind(i1, i2)).run() + assert "1" == workflow.run(join.bind(i1, i2)) + assert "2" == workflow.run(join.bind(i1, i2)) # pass a multiple times - assert "3" == workflow.create(join.bind(a, a, a, a)).run() - assert "4" == workflow.create(join.bind(a, a, a, a)).run() + assert "3" == workflow.run(join.bind(a, a, a, a)) + assert "4" == workflow.run(join.bind(a, a, a, a)) def test_run_off_main_thread(workflow_start_regular_shared): @@ -84,8 +83,7 @@ def fake_data(num: int): def run(): global succ # Setup the workflow. - data = workflow.create(fake_data.bind(10)) - assert data.run(workflow_id="run") == list(range(10)) + assert workflow.run(fake_data.bind(10), workflow_id="run") == list(range(10)) import threading @@ -106,7 +104,7 @@ def simple(x): x = simple.options(**workflow.options(name="simple")).bind(x) workflow_id = "test_task_id_generation" - ret = workflow.create(x).run_async(workflow_id=workflow_id) + ret = workflow.run_async(x, workflow_id=workflow_id) outputs = [workflow.get_output(workflow_id, name="simple")] for i in range(1, n): outputs.append(workflow.get_output(workflow_id, name=f"simple_{i}")) diff --git a/python/ray/workflow/tests/test_cancellation.py b/python/ray/workflow/tests/test_cancellation.py index d42c59526e7d6..195189bc6eaec 100644 --- a/python/ray/workflow/tests/test_cancellation.py +++ b/python/ray/workflow/tests/test_cancellation.py @@ -18,10 +18,9 @@ def simple(): pass workflow_id = "test_cancellation" - wf = workflow.create(simple.bind()) with filelock.FileLock(lock_b): - r = wf.run_async(workflow_id=workflow_id) + r = workflow.run_async(simple.bind(), workflow_id=workflow_id) try: ray.get(r, timeout=5) except GetTimeoutError: diff --git a/python/ray/workflow/tests/test_checkpoint.py b/python/ray/workflow/tests/test_checkpoint.py index 2cd1444126819..bfc72d667a71c 100644 --- a/python/ray/workflow/tests/test_checkpoint.py +++ b/python/ray/workflow/tests/test_checkpoint.py @@ -53,11 +53,12 @@ def _assert_step_checkpoints(wf_storage, step_id, mode): def test_checkpoint_dag_skip_all(workflow_start_regular_shared): - outputs = workflow.create( + outputs = workflow.run( checkpoint_dag.options( **workflow.options(name="checkpoint_dag", checkpoint=False) - ).bind(False) - ).run(workflow_id="checkpoint_skip") + ).bind(False), + workflow_id="checkpoint_skip", + ) assert np.isclose(outputs, 8388607.5) recovered = ray.get(workflow.resume("checkpoint_skip")) assert np.isclose(recovered, 8388607.5) @@ -70,9 +71,10 @@ def test_checkpoint_dag_skip_all(workflow_start_regular_shared): def test_checkpoint_dag_skip_partial(workflow_start_regular_shared): - outputs = workflow.create( - checkpoint_dag.options(**workflow.options(name="checkpoint_dag")).bind(False) - ).run(workflow_id="checkpoint_partial") + outputs = workflow.run( + checkpoint_dag.options(**workflow.options(name="checkpoint_dag")).bind(False), + workflow_id="checkpoint_partial", + ) assert np.isclose(outputs, 8388607.5) recovered = ray.get(workflow.resume("checkpoint_partial")) assert np.isclose(recovered, 8388607.5) @@ -85,9 +87,10 @@ def test_checkpoint_dag_skip_partial(workflow_start_regular_shared): def test_checkpoint_dag_full(workflow_start_regular_shared): - outputs = workflow.create( - checkpoint_dag.options(**workflow.options(name="checkpoint_dag")).bind(True) - ).run(workflow_id="checkpoint_whole") + outputs = workflow.run( + checkpoint_dag.options(**workflow.options(name="checkpoint_dag")).bind(True), + workflow_id="checkpoint_whole", + ) assert np.isclose(outputs, 8388607.5) recovered = ray.get(workflow.resume("checkpoint_whole")) assert np.isclose(recovered, 8388607.5) diff --git a/python/ray/workflow/tests/test_checkpoint_2.py b/python/ray/workflow/tests/test_checkpoint_2.py index 904818931d319..695d54cd169ce 100644 --- a/python/ray/workflow/tests/test_checkpoint_2.py +++ b/python/ray/workflow/tests/test_checkpoint_2.py @@ -39,9 +39,10 @@ def test_checkpoint_dag_recovery_skip(workflow_start_regular_shared): start = time.time() with pytest.raises(workflow.WorkflowExecutionError): - workflow.create( - checkpoint_dag.options(**workflow.options(checkpoint=False)).bind(False) - ).run(workflow_id="checkpoint_skip_recovery") + workflow.run( + checkpoint_dag.options(**workflow.options(checkpoint=False)).bind(False), + workflow_id="checkpoint_skip_recovery", + ) run_duration_skipped = time.time() - start utils.set_global_mark() @@ -62,8 +63,8 @@ def test_checkpoint_dag_recovery_partial(workflow_start_regular_shared): start = time.time() with pytest.raises(workflow.WorkflowExecutionError): - workflow.create(checkpoint_dag.bind(False)).run( - workflow_id="checkpoint_partial_recovery" + workflow.run( + checkpoint_dag.bind(False), workflow_id="checkpoint_partial_recovery" ) run_duration_partial = time.time() - start @@ -84,9 +85,7 @@ def test_checkpoint_dag_recovery_whole(workflow_start_regular_shared): start = time.time() with pytest.raises(workflow.WorkflowExecutionError): - workflow.create(checkpoint_dag.bind(True)).run( - workflow_id="checkpoint_whole_recovery" - ) + workflow.run(checkpoint_dag.bind(True), workflow_id="checkpoint_whole_recovery") run_duration_whole = time.time() - start utils.set_global_mark() diff --git a/python/ray/workflow/tests/test_complex_workflow.py b/python/ray/workflow/tests/test_complex_workflow.py index c48962343852b..4df9240683323 100644 --- a/python/ray/workflow/tests/test_complex_workflow.py +++ b/python/ray/workflow/tests/test_complex_workflow.py @@ -73,11 +73,10 @@ def test_workflow_with_pressure(workflow_start_regular_shared): ] ans = ray.get([d.execute() for d in dags]) - workflows = [workflow.create(d) for d in dags] outputs = [] for _ in range(pressure_level): - for w in workflows: - outputs.append(w.run_async()) + for w in dags: + outputs.append(workflow.run_async(w)) assert ray.get(outputs) == ans * pressure_level diff --git a/python/ray/workflow/tests/test_dag_to_workflow.py b/python/ray/workflow/tests/test_dag_to_workflow.py index 8a12c1744f1a9..f8f7fcdbcb75f 100644 --- a/python/ray/workflow/tests/test_dag_to_workflow.py +++ b/python/ray/workflow/tests/test_dag_to_workflow.py @@ -28,7 +28,7 @@ def end(lf, rt, b): return f"{lf},{rt};{b}" with pytest.raises(TypeError): - workflow.create(begin.remote(1, 2, 3)) + workflow.run_async(begin.remote(1, 2, 3)) with InputNode() as dag_input: f = begin.bind(2, dag_input[1], a=dag_input.a) @@ -36,8 +36,10 @@ def end(lf, rt, b): rt = right.bind(f, b=dag_input.b, pos=dag_input[0]) b = end.bind(lf, rt, b=dag_input.b) - wf = workflow.create(b, 2, 3.14, a=10, b="ok") - assert wf.run() == "left(23.14, hello, 10),right(23.14, ok, 2);ok" + assert ( + workflow.run(b, 2, 3.14, a=10, b="ok") + == "left(23.14, hello, 10),right(23.14, ok, 2);ok" + ) def test_dedupe_serialization_dag(workflow_start_regular_shared): @@ -64,7 +66,7 @@ def get_num_uploads(): single = identity.bind((ref,)) double = identity.bind(list_of_refs) - result_ref, result_list = workflow.create(gather.bind(single, double)).run() + result_ref, result_list = workflow.run(gather.bind(single, double)) for result in result_list: assert ray.get(*result_ref) == ray.get(result) @@ -84,10 +86,10 @@ def f(a): x = {0: ray.put(10)} - result1 = workflow.create(f.bind(x)).run() - result2 = workflow.create(f.bind(x)).run() + result1 = workflow.run(f.bind(x)) + result2 = workflow.run(f.bind(x)) with InputNode() as dag_input: - result3 = workflow.create(f.bind(dag_input.x), x=x).run() + result3 = workflow.run(f.bind(dag_input.x), x=x) assert ray.get(*result1) == 10 assert ray.get(*result2) == 10 @@ -116,7 +118,7 @@ def h(): dag = f.bind(g.bind(x=ray.put(314), y=[ray.put(2022)])) # Run with workflow and normal Ray engine. - workflow.create(dag).run() + workflow.run(dag) ray.get(dag.execute()) @@ -164,7 +166,7 @@ def nested_continuation(x): ) # Run with workflow and normal Ray engine. - assert workflow.create(dag).run() == "ok" + assert workflow.run(dag) == "ok" assert ray.get(dag.execute()) == "ok" @@ -189,7 +191,7 @@ def f(): dag = f.bind() assert ray.get(dag.execute()) == 43 - assert workflow.create(dag).run() == 43 + assert workflow.run(dag) == 43 if __name__ == "__main__": diff --git a/python/ray/workflow/tests/test_dataset.py b/python/ray/workflow/tests/test_dataset.py index 8af7a73576e0d..1862f83068979 100644 --- a/python/ray/workflow/tests/test_dataset.py +++ b/python/ray/workflow/tests/test_dataset.py @@ -41,7 +41,7 @@ def test_dataset(workflow_start_regular_shared): transformed_ref = transform_dataset.bind(ds_ref) output_ref = sum_dataset.bind(transformed_ref) - result = workflow.create(output_ref).run() + result = workflow.run(output_ref) assert result == 2 * sum(range(1000)) @@ -50,7 +50,7 @@ def test_dataset_1(workflow_start_regular_shared): transformed_ref = transform_dataset.bind(ds_ref) output_ref = sum_dataset.bind(transformed_ref) - result = workflow.create(output_ref).run() + result = workflow.run(output_ref) assert result == 2 * sum(range(1000)) @@ -59,7 +59,7 @@ def test_dataset_2(workflow_start_regular_shared): transformed_ref = transform_dataset_1.bind(ds_ref) output_ref = sum_dataset.bind(transformed_ref) - result = workflow.create(output_ref).run() + result = workflow.run(output_ref) assert result == 2 * sum(range(1000)) diff --git a/python/ray/workflow/tests/test_dynamic_workflow_ref.py b/python/ray/workflow/tests/test_dynamic_workflow_ref.py index 34bfe0c7b290a..128fe37c61fc1 100644 --- a/python/ray/workflow/tests/test_dynamic_workflow_ref.py +++ b/python/ray/workflow/tests/test_dynamic_workflow_ref.py @@ -13,11 +13,14 @@ def incr(x): return x + 1 # This test also shows different "style" of running workflows. - first_step = workflow.create(incr.bind(0)) - assert first_step.run("test_dynamic_workflow_ref") == 1 - second_step = workflow.create(incr.bind(WorkflowRef("incr"))) + assert workflow.run(incr.bind(0), workflow_id="test_dynamic_workflow_ref") == 1 # Without rerun, it'll just return the previous result - assert second_step.run("test_dynamic_workflow_ref") == 1 + assert ( + workflow.run( + incr.bind(WorkflowRef("incr")), workflow_id="test_dynamic_workflow_ref" + ) + == 1 + ) # TODO (yic) We need re-run to make this test work # assert second_step.run("test_dynamic_workflow_ref") == 2 diff --git a/python/ray/workflow/tests/test_events.py b/python/ray/workflow/tests/test_events.py index 05d615fb171ef..eaa7d3528c7e1 100644 --- a/python/ray/workflow/tests/test_events.py +++ b/python/ray/workflow/tests/test_events.py @@ -19,7 +19,7 @@ def after_sleep(sleep_start_time, _): def sleep_helper(): return workflow.continuation(after_sleep.bind(time.time(), workflow.sleep(2))) - start, end = workflow.create(sleep_helper.bind()).run() + start, end = workflow.run(sleep_helper.bind()) duration = end - start assert 1 < duration @@ -31,7 +31,7 @@ def test_sleep_checkpointing(workflow_start_regular_shared): sleep_step = workflow.sleep(2) time.sleep(2) start_time = time.time() - workflow.create(sleep_step).run() + workflow.run(sleep_step) end_time = time.time() duration = end_time - start_time assert 1 < duration @@ -73,9 +73,7 @@ def trivial_step(arg1, arg2): event1_promise = workflow.wait_for_event(EventListener1) event2_promise = workflow.wait_for_event(EventListener2) - promise = workflow.create( - trivial_step.bind(event1_promise, event2_promise) - ).run_async() + promise = workflow.run_async(trivial_step.bind(event1_promise, event2_promise)) while not ( utils.check_global_mark("listener1") and utils.check_global_mark("listener2") @@ -118,7 +116,7 @@ def gather(*args): event_promise = workflow.wait_for_event(MyEventListener) - assert workflow.create(gather.bind(event_promise, triggers_event.bind())).run() == ( + assert workflow.run(gather.bind(event_promise, triggers_event.bind())) == ( None, None, ) @@ -156,7 +154,7 @@ def gather(*args): return args event_promise = workflow.wait_for_event(MyEventListener) - assert workflow.create(gather.bind(event_promise, triggers_event.bind())).run() == ( + assert workflow.run(gather.bind(event_promise, triggers_event.bind())) == ( None, None, ) @@ -192,7 +190,7 @@ def wait_then_finish(arg): pass event_promise = workflow.wait_for_event(MyEventListener) - workflow.create(wait_then_finish.bind(event_promise)).run_async("workflow") + workflow.run_async(wait_then_finish.bind(event_promise), workflow_id="workflow") while not utils.check_global_mark("time_to_die"): time.sleep(0.1) @@ -247,7 +245,7 @@ async def event_checkpointed(self, event): await asyncio.sleep(1000000) event_promise = workflow.wait_for_event(MyEventListener) - workflow.create(event_promise).run_async("workflow") + workflow.run_async(event_promise, workflow_id="workflow") while not utils.check_global_mark("first"): time.sleep(0.1) @@ -280,7 +278,9 @@ async def poll_for_event(self): await asyncio.sleep(1) utils.unset_global_mark() - promise = workflow.create(workflow.wait_for_event(MyEventListener)).run_async("wf") + promise = workflow.run_async( + workflow.wait_for_event(MyEventListener), workflow_id="wf" + ) assert workflow.get_status("wf") == workflow.WorkflowStatus.RUNNING diff --git a/python/ray/workflow/tests/test_large_intermediate.py b/python/ray/workflow/tests/test_large_intermediate.py index 93e9dfb3a10c6..18ae8cf2b1a80 100644 --- a/python/ray/workflow/tests/test_large_intermediate.py +++ b/python/ray/workflow/tests/test_large_intermediate.py @@ -27,7 +27,7 @@ def simple_large_intermediate(): return workflow.continuation(average.bind(y)) start = time.time() - outputs = workflow.create(simple_large_intermediate.bind()).run() + outputs = workflow.run(simple_large_intermediate.bind()) print(f"duration = {time.time() - start}") assert np.isclose(outputs, 8388607.5) diff --git a/python/ray/workflow/tests/test_lifetime.py b/python/ray/workflow/tests/test_lifetime.py index f865ea7577b65..dad267bd304a2 100644 --- a/python/ray/workflow/tests/test_lifetime.py +++ b/python/ray/workflow/tests/test_lifetime.py @@ -27,7 +27,7 @@ def foo(x): if __name__ == "__main__": ray.init() - output = workflow.create(foo.bind(0)).run_async(workflow_id="driver_terminated") + output = workflow.run_async(foo.bind(0), workflow_id="driver_terminated") time.sleep({}) """ diff --git a/python/ray/workflow/tests/test_metadata.py b/python/ray/workflow/tests/test_metadata.py index ac000ab52acfc..63cad99106e72 100644 --- a/python/ray/workflow/tests/test_metadata.py +++ b/python/ray/workflow/tests/test_metadata.py @@ -19,7 +19,7 @@ def test_user_metadata(workflow_start_regular): def simple(): return 0 - workflow.create(simple.bind()).run(workflow_id, metadata=user_run_metadata) + workflow.run(simple.bind(), workflow_id=workflow_id, metadata=user_run_metadata) assert workflow.get_metadata("simple")["user_metadata"] == user_run_metadata assert ( @@ -38,7 +38,7 @@ def test_user_metadata_empty(workflow_start_regular): def simple(): return 0 - workflow.create(simple.bind()).run(workflow_id) + workflow.run(simple.bind(), workflow_id=workflow_id) assert workflow.get_metadata("simple")["user_metadata"] == {} assert workflow.get_metadata("simple", "simple_step")["user_metadata"] == {} @@ -50,10 +50,10 @@ def simple(): return 0 with pytest.raises(ValueError): - workflow.create(simple.options(**workflow.options(metadata="x")).bind()) + workflow.run_async(simple.options(**workflow.options(metadata="x")).bind()) with pytest.raises(ValueError): - workflow.create(simple.bind()).run(metadata="x") + workflow.run(simple.bind(), metadata="x") def test_user_metadata_not_json_serializable(workflow_start_regular): @@ -65,10 +65,12 @@ class X: pass with pytest.raises(ValueError): - workflow.create(simple.options(**workflow.options(metadata={"x": X()})).bind()) + workflow.run_async( + simple.options(**workflow.options(metadata={"x": X()})).bind() + ) with pytest.raises(ValueError): - workflow.create(simple.bind()).run(metadata={"x": X()}) + workflow.run(simple.bind(), metadata={"x": X()}) def test_runtime_metadata(workflow_start_regular): @@ -82,7 +84,7 @@ def simple(): time.sleep(2) return 0 - workflow.create(simple.bind()).run(workflow_id) + workflow.run(simple.bind(), workflow_id=workflow_id) workflow_metadata = workflow.get_metadata("simple") assert "start_time" in workflow_metadata["stats"] @@ -113,7 +115,7 @@ def simple(): time.sleep(2) return 0 - workflow.create(simple.bind()).run(workflow_id, metadata=user_run_metadata) + workflow.run(simple.bind(), workflow_id=workflow_id, metadata=user_run_metadata) workflow_metadata = workflow.get_metadata("simple") assert workflow_metadata["status"] == "SUCCESSFUL" @@ -145,7 +147,7 @@ def simple(): time.sleep(1000) return 0 - workflow.create(simple.bind()).run_async(workflow_id) + workflow.run_async(simple.bind(), workflow_id=workflow_id) # Wait until step runs to make sure pre-run metadata is written while not flag.exists(): @@ -177,7 +179,7 @@ def simple(): return 0 with pytest.raises(workflow.WorkflowExecutionError): - workflow.create(simple.bind()).run(workflow_id) + workflow.run(simple.bind(), workflow_id=workflow_id) workflow_metadata_failed = workflow.get_metadata(workflow_id) assert workflow_metadata_failed["status"] == "FAILED" @@ -213,7 +215,9 @@ def outer(): time.sleep(2) return workflow.continuation(inner.bind()) - workflow.create(outer.bind()).run("nested", metadata={"workflow_k": "workflow_v"}) + workflow.run( + outer.bind(), workflow_id="nested", metadata={"workflow_k": "workflow_v"} + ) workflow_metadata = workflow.get_metadata("nested") outer_step_metadata = workflow.get_metadata("nested", "outer") @@ -251,7 +255,7 @@ def test_no_workflow_found(workflow_start_regular): def simple(): return 0 - workflow.create(simple.bind()).run(workflow_id) + workflow.run(simple.bind(), workflow_id=workflow_id) with pytest.raises(ValueError) as excinfo: workflow.get_metadata("simple1") diff --git a/python/ray/workflow/tests/test_object_deref.py b/python/ray/workflow/tests/test_object_deref.py index 0c42efb58b319..6c2828aa7158d 100644 --- a/python/ray/workflow/tests/test_object_deref.py +++ b/python/ray/workflow/tests/test_object_deref.py @@ -33,14 +33,14 @@ def deref_check(u: int, x: str, y: List[str], z: List[Dict[str, str]]): except Exception as e: return False, str(e) - output, s = workflow.create( + output, s = workflow.run( deref_check.bind( ray.put(42), nested_workflow.bind(10), [nested_workflow.bind(9)], [{"output": nested_workflow.bind(7)}], ) - ).run() + ) assert output is True, s @@ -57,10 +57,10 @@ def nested_ref_workflow(): def return_objectrefs() -> List[ObjectRef]: return [ray.put(x) for x in range(5)] - single = workflow.create(nested_ref_workflow.bind()).run() + single = workflow.run(nested_ref_workflow.bind()) assert ray.get(ray.get(single)) == 42 - multi = workflow.create(return_objectrefs.bind()).run() + multi = workflow.run(return_objectrefs.bind()) assert ray.get(multi) == list(range(5)) @@ -77,7 +77,7 @@ def receive_workflow(workflow): @ray.remote def return_workflow(): - return workflow.create(empty_list.bind()) + return empty_list.bind() @ray.remote def return_data() -> ray.ObjectRef: @@ -88,7 +88,7 @@ def receive_data(data: "ray.ObjectRef[np.ndarray]"): return ray.get(data) # test we are forbidden from directly passing workflow to Ray. - x = workflow.create(empty_list.bind()) + x = empty_list.bind() with pytest.raises(ValueError): ray.put(x) with pytest.raises(ValueError): @@ -98,7 +98,7 @@ def receive_data(data: "ray.ObjectRef[np.ndarray]"): # test return object ref obj = return_data.bind() - arr: np.ndarray = workflow.create(receive_data.bind(obj)).run() + arr: np.ndarray = workflow.run(receive_data.bind(obj)) assert np.array_equal(arr, np.ones(4096)) diff --git a/python/ray/workflow/tests/test_recovery.py b/python/ray/workflow/tests/test_recovery.py index 09cf51b5e7571..d118f3d57eadf 100644 --- a/python/ray/workflow/tests/test_recovery.py +++ b/python/ray/workflow/tests/test_recovery.py @@ -42,7 +42,7 @@ def test_dedupe_downloads_list(workflow_start_regular): numbers = [ray.put(i) for i in range(5)] workflows = [identity.bind(numbers) for _ in range(100)] - workflow.create(gather.bind(*workflows)).run() + workflow.run(gather.bind(*workflows)) ops = debug_store._logged_storage.get_op_counter() get_objects_count = 0 @@ -70,7 +70,7 @@ def test_dedupe_download_raw_ref(workflow_start_regular): ref = ray.put("hello") workflows = [identity.bind(ref) for _ in range(100)] - workflow.create(gather.bind(*workflows)).run() + workflow.run(gather.bind(*workflows)) ops = debug_store._logged_storage.get_op_counter() get_objects_count = 0 @@ -107,7 +107,7 @@ def recursive(ref, count): utils._alter_storage(debug_store) ref = ray.put("hello") - result = workflow.create(recursive.bind([ref], 10)).run() + result = workflow.run(recursive.bind([ref], 10)) ops = debug_store._logged_storage.get_op_counter() get_objects_count = 0 @@ -140,7 +140,7 @@ def test_recovery_simple_1(workflow_start_regular): workflow_id = "test_recovery_simple_1" with pytest.raises(workflow.WorkflowExecutionError): # internally we get WorkerCrashedError - workflow.create(the_failed_step.bind("x")).run(workflow_id=workflow_id) + workflow.run(the_failed_step.bind("x"), workflow_id=workflow_id) assert workflow.get_status(workflow_id) == workflow.WorkflowStatus.FAILED @@ -162,7 +162,7 @@ def simple(x): workflow_id = "test_recovery_simple_2" with pytest.raises(workflow.WorkflowExecutionError): # internally we get WorkerCrashedError - workflow.create(simple.bind("x")).run(workflow_id=workflow_id) + workflow.run(simple.bind("x"), workflow_id=workflow_id) assert workflow.get_status(workflow_id) == workflow.WorkflowStatus.FAILED @@ -196,7 +196,7 @@ def simple(x): workflow_id = "test_recovery_simple_3" with pytest.raises(workflow.WorkflowExecutionError): # internally we get WorkerCrashedError - workflow.create(simple.bind("x")).run(workflow_id=workflow_id) + workflow.run(simple.bind("x"), workflow_id=workflow_id) assert workflow.get_status(workflow_id) == workflow.WorkflowStatus.FAILED @@ -240,7 +240,7 @@ def complex(x1): workflow_id = "test_recovery_complex" with pytest.raises(workflow.WorkflowExecutionError): # internally we get WorkerCrashedError - workflow.create(complex.bind("x")).run(workflow_id=workflow_id) + workflow.run(complex.bind("x"), workflow_id=workflow_id) assert workflow.get_status(workflow_id) == workflow.WorkflowStatus.FAILED @@ -281,8 +281,7 @@ def foo(x): if __name__ == "__main__": ray.init(storage="{tmp_path}") - workflow.init() - assert workflow.create(foo.bind(0)).run(workflow_id="cluster_failure") == 20 + assert workflow.run(foo.bind(0), workflow_id="cluster_failure") == 20 """ ) time.sleep(10) @@ -320,8 +319,7 @@ def foo(x): if __name__ == "__main__": ray.init(storage="{str(workflow_dir)}") - workflow.init() - assert workflow.create(foo.bind(0)).run(workflow_id="cluster_failure") == 20 + assert workflow.run(foo.bind(0), workflow_id="cluster_failure") == 20 """ ) time.sleep(10) @@ -346,7 +344,7 @@ def recursive_chain(x): else: return 100 - assert workflow.create(recursive_chain.bind(0)).run(workflow_id="shortcut") == 100 + assert workflow.run(recursive_chain.bind(0), workflow_id="shortcut") == 100 # the shortcut points to the step with output checkpoint store = workflow_storage.get_workflow_storage("shortcut") step_id = store.get_entrypoint_step_id() @@ -361,7 +359,7 @@ def constant(): ray.init(storage=str(tmp_path)) workflow.init() - workflow.create(constant.bind()).run(workflow_id="const") + workflow.run(constant.bind(), workflow_id="const") assert ray.get(workflow.resume(workflow_id="const")) == 31416 diff --git a/python/ray/workflow/tests/test_serialization.py b/python/ray/workflow/tests/test_serialization.py index e5658c498f743..b936cfa54a819 100644 --- a/python/ray/workflow/tests/test_serialization.py +++ b/python/ray/workflow/tests/test_serialization.py @@ -60,7 +60,7 @@ def __getstate__(self): single = identity.bind((ref,)) double = identity.bind(list_of_refs) - workflow.create(gather.bind(single, double)).run() + workflow.run(gather.bind(single, double)) # One more for hashing the ref, and for uploading. assert ray.get(counter.get_count.remote()) == 3 @@ -75,7 +75,7 @@ def test_dedupe_serialization_2(workflow_start_regular_shared): single = identity.bind((ref,)) double = identity.bind(list_of_refs) - result_ref, result_list = workflow.create(gather.bind(single, double)).run() + result_ref, result_list = workflow.run(gather.bind(single, double)) for result in result_list: assert ray.get(*result_ref) == ray.get(result) @@ -96,8 +96,8 @@ def f(a): x = {0: ray.put(10)} - result1 = workflow.create(f.bind(x)).run() - result2 = workflow.create(f.bind(x)).run() + result1 = workflow.run(f.bind(x)) + result2 = workflow.run(f.bind(x)) print(result1) print(result2) @@ -140,7 +140,7 @@ def foo(objrefs): workflow.init() arg = ray.put("hello world") - workflow.create(foo.bind([arg, arg])).run() + workflow.run(foo.bind([arg, arg])) assert False """ diff --git a/python/ray/workflow/tests/test_signature_check.py b/python/ray/workflow/tests/test_signature_check.py index 045967829499b..e17af5a22886c 100644 --- a/python/ray/workflow/tests/test_signature_check.py +++ b/python/ray/workflow/tests/test_signature_check.py @@ -16,21 +16,21 @@ def test_signature_check(workflow_start_regular): # TODO(suquark): Ray DAG does not check the inputs. Fix it in Ray DAG. with pytest.raises(TypeError): - workflow.create(signature_check.bind(1)).run() + workflow.run(signature_check.bind(1)) with pytest.raises(TypeError): - workflow.create(signature_check.bind(1, c=2)).run() + workflow.run(signature_check.bind(1, c=2)) with pytest.raises(TypeError): - workflow.create(signature_check.bind(1, 2, d=3)).run() + workflow.run(signature_check.bind(1, 2, d=3)) with pytest.raises(TypeError): - workflow.create(signature_check.bind(1, 2, 3, 4)).run() + workflow.run(signature_check.bind(1, 2, 3, 4)) - workflow.create(signature_check.bind(1, 2, 3)).run() - workflow.create(signature_check.bind(1, 2, c=3)).run() - workflow.create(signature_check.bind(1, b=2, c=3)).run() - workflow.create(signature_check.bind(a=1, b=2, c=3)).run() + workflow.run(signature_check.bind(1, 2, 3)) + workflow.run(signature_check.bind(1, 2, c=3)) + workflow.run(signature_check.bind(1, b=2, c=3)) + workflow.run(signature_check.bind(a=1, b=2, c=3)) if __name__ == "__main__": diff --git a/python/ray/workflow/tests/test_storage.py b/python/ray/workflow/tests/test_storage.py index ff9b06626c507..3188e89744c9c 100644 --- a/python/ray/workflow/tests/test_storage.py +++ b/python/ray/workflow/tests/test_storage.py @@ -39,7 +39,7 @@ def never_ends(x): time.sleep(1000000) return x - workflow.create(never_ends.bind("hello world")).run_async("never_finishes") + workflow.run_async(never_ends.bind("hello world"), workflow_id="never_finishes") # Make sure the step is actualy executing before killing the cluster while not utils.check_global_mark(): @@ -75,7 +75,7 @@ def never_ends(x): def basic_step(arg): return arg - result = workflow.create(basic_step.bind("hello world")).run(workflow_id="finishes") + result = workflow.run(basic_step.bind("hello world"), workflow_id="finishes") assert result == "hello world" ouput = workflow.get_output("finishes") assert ray.get(ouput) == "hello world" @@ -98,7 +98,7 @@ def basic_step(arg): assert workflow.list_all() == [] # The workflow can be re-run as if it was never run before. - assert workflow.create(basic_step.bind("123")).run(workflow_id="finishes") == "123" + assert workflow.run(basic_step.bind("123"), workflow_id="finishes") == "123" # utils.unset_global_mark() # never_ends.step("123").run_async(workflow_id="never_finishes") @@ -252,7 +252,7 @@ def test_cluster_storage_init(workflow_start_cluster, tmp_path): def f(): return 10 - assert workflow.create(f.bind()).run() == 10 + assert workflow.run(f.bind()) == 10 if __name__ == "__main__": diff --git a/python/ray/workflow/tests/test_storage_failure.py b/python/ray/workflow/tests/test_storage_failure.py index 80083dd019b19..263a98a589147 100644 --- a/python/ray/workflow/tests/test_storage_failure.py +++ b/python/ray/workflow/tests/test_storage_failure.py @@ -49,7 +49,7 @@ def construct_workflow(length: int): for i in range(length): x0, x1, x2 = results[-2], results[-1], str(i) results.append(scan.bind(x0, x1, x2)) - return workflow.create(results[-1]) + return results[-1] def _locate_initial_commit(debug_store: DebugStorage) -> int: diff --git a/python/ray/workflow/tests/test_variable_mutable.py b/python/ray/workflow/tests/test_variable_mutable.py index d2b3b024ea1c2..7b1ae34f68a0a 100644 --- a/python/ray/workflow/tests/test_variable_mutable.py +++ b/python/ray/workflow/tests/test_variable_mutable.py @@ -19,7 +19,7 @@ def projection(x, _): a = identity.bind(x) x.append(1) b = identity.bind(x) - assert workflow.create(projection.bind(a, b)).run() == [] + assert workflow.run(projection.bind(a, b)) == [] if __name__ == "__main__": diff --git a/python/ray/workflow/tests/test_workflow_manager.py b/python/ray/workflow/tests/test_workflow_manager.py index 3d3ce32c4dd61..b1f1a81e22fa8 100644 --- a/python/ray/workflow/tests/test_workflow_manager.py +++ b/python/ray/workflow/tests/test_workflow_manager.py @@ -32,8 +32,7 @@ def long_running(i): return 100 outputs = [ - workflow.create(long_running.bind(i)).run_async(workflow_id=str(i)) - for i in range(100) + workflow.run_async(long_running.bind(i), workflow_id=str(i)) for i in range(100) ] # Test list all, it should list all jobs running all_tasks = workflow.list_all() From d0e02d351e8b8e5d509ca9ac1269e0446b83e08c Mon Sep 17 00:00:00 2001 From: Siyuan Zhuang Date: Thu, 30 Jun 2022 10:26:25 -0700 Subject: [PATCH 2/4] update doc --- doc/source/workflows/basics.rst | 94 +++++++++++++++++++--------- doc/source/workflows/comparison.rst | 5 +- doc/source/workflows/concepts.rst | 22 ++++--- doc/source/workflows/events.rst | 2 +- doc/source/workflows/metadata.rst | 12 ++-- doc/source/workflows/package-ref.rst | 7 +++ 6 files changed, 96 insertions(+), 46 deletions(-) diff --git a/doc/source/workflows/basics.rst b/doc/source/workflows/basics.rst index 4644eee24f418..bf4617c558dc7 100644 --- a/doc/source/workflows/basics.rst +++ b/doc/source/workflows/basics.rst @@ -43,22 +43,21 @@ The Ray DAG will not be executed until further actions are taken on it. Your first workflow ------------------- -A single line is all you need to turn the previous DAG into a workflow: +A single line is all you need to run the workflow DAG: .. code-block:: python # from ray import workflow - # Create the workflow from the DAG. - wf = workflow.create(output) # Execute the workflow and print the result. - print(wf.run()) + print(workflow.run(output)) - # The workflow can also be executed asynchronously. - # print(ray.get(output.run_async())) + # You can also run the workflow asynchronously and fetching the output via 'ray.get' + output_ref = workflow.run_async(dag) + print(ray.get(output_ref)) -Here is the workflow we created: +Here this figure visualizes the workflow we created: .. image:: basic.png :width: 500px @@ -115,7 +114,7 @@ To retrieve a workflow result, you can assign ``workflow_id`` when running a wor ret = add.bind(get_val.bind(), 20) - assert workflow.create(ret).run(workflow_id="add_example") == 30 + assert workflow.run(ret, workflow_id="add_example") == 30 Then workflow results can be retrieved with ``workflow.get_output(workflow_id) -> ObjectRef[T]``. If a workflow is not given ``workflow_id``, a random string is set as the ``workflow_id``. To confirm ``workflow_id`` in the situation, call ``ray.workflow.list_all()``. @@ -153,14 +152,14 @@ Once a task is given a name, the result of the task will be retrievable via ``wo inner_task = double.options(**workflow.options(name="inner")).bind(1) outer_task = double.options(**workflow.options(name="outer")).bind(inner_task) - result = workflow.create(outer_task).run_async("double") + result_ref = workflow.run_async(outer_task, workflow_id="double") inner = workflow.get_output(workflow_id, name="inner") outer = workflow.get_output(workflow_id, name="outer") assert ray.get(inner) == 2 assert ray.get(outer) == 4 - assert ray.get(result) == 4 + assert ray.get(result_ref) == 4 If there are multiple tasks with the same name, a suffix with a counter ``_n`` will be added automatically. @@ -193,7 +192,7 @@ For example, for i in range(1, n): x = simple.options(**workflow.options(name="step")).bind(x) - ret = workflow.create(x).run_async(workflow_id=workflow_id) + ret = workflow.run_async(x, workflow_id=workflow_id) outputs = [workflow.get_output(workflow_id, name="step")] for i in range(1, n): outputs.append(workflow.get_output(workflow_id, name=f"step_{i}")) @@ -229,7 +228,7 @@ The following error handling flags can be either set in the task decorator or vi # Tries up to five times before giving up. r1 = faulty_function.options(**workflow.options(max_retries=5)).bind() - workflow.create(r1).run() + workflow.run(r1) @ray.remote def handle_errors(result: Tuple[str, Exception]): @@ -242,7 +241,7 @@ The following error handling flags can be either set in the task decorator or vi # `handle_errors` receives a tuple of (result, exception). r2 = faulty_function.options(**workflow.options(catch_exceptions=True)).bind() - workflow.create(handle_errors.bind(r2)).run() + workflow.run(handle_errors.bind(r2)) - If ``max_retries`` is given, the task will be retried for the given number of times if an exception is raised. It will only retry for the application level error. For system errors, it's controlled by ray. By default, ``max_retries`` is set to be 3. - If ``catch_exceptions`` is True, the return value of the function will be converted to ``Tuple[Optional[T], Optional[Exception]]``. This can be combined with ``max_retries`` to try a given number of times before returning the result tuple. @@ -278,7 +277,7 @@ Note that tasks that have side-effects still need to be idempotent. This is beca return ticket # UNSAFE: we could book multiple flight tickets - workflow.create(book_flight_unsafe.bind()).run() + workflow.run(book_flight_unsafe.bind()) .. code-block:: python :caption: Idempotent workflow: @@ -297,7 +296,7 @@ Note that tasks that have side-effects still need to be idempotent. This is beca # SAFE: book_flight is written to be idempotent request_id = generate_id.bind() - workflow.create(book_flight_idempotent.bind(request_id)).run() + workflow.run(book_flight_idempotent.bind(request_id)) Dynamic workflows ----------------- @@ -305,7 +304,42 @@ Dynamic workflows Additional tasks can be dynamically created and inserted into the workflow DAG during execution. This is achieved by returning a continuation of a DAG. -A continuation is something returned by a function and executed after it returns. + +In our context, a continuation is basically a tail function call returned by a function. For example: + +.. code-block:: python + + def bar(): ... + + def foo_1(): + # Here we say 'foo_1()' returns a continuation. + # The continuation is made of 'bar()' + return bar() + + def foo_2(): + # This is NOT a continuation because we do not return it. + bar() + + def foo_3(): + # This is NOT a continuation because it is not a function call. + return 42 + +Continuations can be used to implement something more complex, for example, recursions: + +.. code-block:: python + + def factorial(n: int) -> int: + if n == 1: + return 1 + else: + return multiply(n, factorial.bind(n - 1)) + + @ray.remote + def multiply(a: int, b: int) -> int: + return a * b + + assert factorial(10) == 3628800 + The continuation feature enables nesting, looping, and recursion within workflows. The following example shows how to implement the recursive ``factorial`` program using dynamically generated tasks: @@ -323,10 +357,11 @@ The following example shows how to implement the recursive ``factorial`` program def multiply(a: int, b: int) -> int: return a * b - ret = workflow.create(factorial.bind(10)) - assert ret.run() == 3628800 + assert workflow.run(factorial.bind(10)) == 3628800 -The key behavior to note is that when a task returns a ``Workflow`` output instead of a concrete value, that workflow's output will be substituted for the task's return. To better understand dynamic workflows, let's look at a more realistic example of booking a trip: +The key behavior to note is that when a task returns a continuation instead of a concrete value, +that continuation will be substituted for the task's return. +To better understand dynamic workflows, let's look at a more realistic example of booking a trip: .. code-block:: python @@ -342,18 +377,16 @@ The key behavior to note is that when a task returns a ``Workflow`` output inste hotels: List[Hotel]) -> Receipt: ... @ray.remote - def book_trip(origin: str, dest: str, dates) -> - "Workflow[Receipt]": + def book_trip(origin: str, dest: str, dates) -> Receipt: # Note that the workflow engine will not begin executing # child workflows until the parent task returns. # This avoids task overlap and ensures recoverability. - f1: Workflow = book_flight.bind(origin, dest, dates[0]) - f2: Workflow = book_flight.bind(dest, origin, dates[1]) - hotel: Workflow = book_hotel.bind(dest, dates) + f1 = book_flight.bind(origin, dest, dates[0]) + f2 = book_flight.bind(dest, origin, dates[1]) + hotel = book_hotel.bind(dest, dates) return workflow.continuation(finalize_or_cancel.bind([f1, f2], [hotel])) - fut = workflow.create(book_trip.bind("OAK", "SAN", ["6/12", "7/5"])) - fut.run() # returns Receipt(...) + receipt: Receipt = workflow.run(book_trip.bind("OAK", "SAN", ["6/12", "7/5"])) Here the workflow initially just consists of the ``book_trip`` task. Once executed, ``book_trip`` generates tasks to book flights and hotels in parallel, which feeds into a task to decide whether to cancel the trip or finalize it. The DAG can be visualized as follows (note the dynamically generated nested workflows within ``book_trip``): @@ -379,7 +412,8 @@ Workflows are compatible with Ray tasks and actors. There are two methods of usi Passing nested arguments ~~~~~~~~~~~~~~~~~~~~~~~~ -Like Ray tasks, when you pass a list of ``Workflow`` outputs to a task, the values are not resolved. But we ensure that all ancestors of a task are fully executed prior to the task starting: +Like Ray tasks, when you pass a list of task outputs to a task, the values are not resolved. +But we ensure that all ancestors of a task are fully executed prior to the task starting: .. code-block:: python @@ -395,7 +429,7 @@ Like Ray tasks, when you pass a list of ``Workflow`` outputs to a task, the valu return 10 ret = add.bind([get_val.bind() for _ in range(3)]) - assert workflow.create(ret).run() == 30 + assert workflow.run(ret) == 30 Passing object references between tasks ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -412,7 +446,7 @@ Ray object references and data structures composed of them (e.g., ``ray.Dataset` def add(a, b): return do_add.remote(a, b) - workflow.create(add.bind(ray.put(10), ray.put(20))).run() == 30 + workflow.run(add.bind(ray.put(10), ray.put(20))) == 30 Ray actor handles are not allowed to be passed between tasks. @@ -428,4 +462,4 @@ You can assign resources (e.g., CPUs, GPUs to tasks via the same ``num_cpus``, ` def train_model() -> Model: pass # This task is assigned a GPU by Ray. - workflow.create(train_model.bind()).run() + workflow.run(train_model.bind()) diff --git a/doc/source/workflows/comparison.rst b/doc/source/workflows/comparison.rst index 85923354f8752..6f9b9619308dc 100644 --- a/doc/source/workflows/comparison.rst +++ b/doc/source/workflows/comparison.rst @@ -7,7 +7,10 @@ Workflows is built on top of Ray, and offers a mostly consistent subset of its A ``func.remote`` vs ``func.bind`` ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -With Ray tasks, ``func.remote`` will submit a remote task to run eagerly. In Ray workflows, ``func.bind`` is used to create a DAG, and the DAG is converted into a workflow. Execution of the workflow is deferred until ``.run(workflow_id="id")`` or ``.run_async(workflow_id="id")`` is called on the ``Workflow``. Specifying the workflow id allows for resuming of the workflow by its id in case of cluster failure. +With Ray tasks, ``func.remote`` will submit a remote task to run eagerly. +In Ray workflows, ``func.bind`` is used to create a DAG, and the DAG is converted into a workflow. +Execution of the workflow is deferred until ``workflow.run(dag, workflow_id=...)`` or ``workflow.run_async(dag, workflow_id=...)`` is called on the DAG. +Specifying the workflow id allows for resuming of the workflow by its id in case of cluster failure. Other Workflow Engines ---------------------- diff --git a/doc/source/workflows/concepts.rst b/doc/source/workflows/concepts.rst index 60c373ee0f968..0135b76852497 100644 --- a/doc/source/workflows/concepts.rst +++ b/doc/source/workflows/concepts.rst @@ -61,18 +61,24 @@ Unlike Ray tasks, you are not allowed to call ``ray.get()`` or ``ray.wait()`` on Workflows ~~~~~~~~~ -It takes a single line of code to turn a DAG into a workflow DAG: +It takes a single line of code to run a workflow DAG: .. code-block:: python - :caption: Turning the DAG into a workflow DAG: + :caption: Run a workflow DAG: from ray import workflow - output: "Workflow[int]" = workflow.create(dag) + # Run the workflow until it completes and returns the output + assert workflow.run(dag) == 101 -Execute the workflow DAG by ``.run()`` or ``.run_async()``. Once started, a workflow's execution is durably logged to storage. On system failure, workflows can be resumed on any Ray cluster with access to the storage. + # Or you can run it asynchronously and fetching the output via 'ray.get' + output_ref = workflow.run_async(dag) + assert ray.get(output_ref) == 101 -When executing the workflow DAG, remote functions are retried on failure, but once they finish successfully and the results are persisted by the workflow engine, they will never be run again. + +Once started, a workflow's execution is durably logged to storage. On system failure, workflows can be resumed on any Ray cluster with access to the storage. + +When executing the workflow DAG, workflow tasks are retried on failure, but once they finish successfully and the results are persisted by the workflow engine, they will never be run again. .. code-block:: python :caption: Run the workflow: @@ -108,7 +114,7 @@ Large data objects can be stored in the Ray object store. References to these ob def concat(words: List[ray.ObjectRef]) -> str: return " ".join([ray.get(w) for w in words]) - assert workflow.create(concat.bind(words.bind())).run() == "hello world" + assert workflow.run(concat.bind(words.bind())) == "hello world" Dynamic Workflows ~~~~~~~~~~~~~~~~~ @@ -130,7 +136,7 @@ The continuation feature enables nesting, looping, and recursion within workflow # return a continuation of a DAG return workflow.continuation(add.bind(fib.bind(n - 1), fib.bind(n - 2))) - assert workflow.create(fib.bind(10)).run() == 55 + assert workflow.run(fib.bind(10)) == 55 Events @@ -151,4 +157,4 @@ Workflows can be efficiently triggered by timers or external events using the ev return args # If a task's arguments include events, the task won't be executed until all of the events have occured. - workflow.create(gather.bind(sleep_task, event_task, "hello world")).run() + workflow.run(gather.bind(sleep_task, event_task, "hello world")) diff --git a/doc/source/workflows/events.rst b/doc/source/workflows/events.rst index e65c86df61847..336acbf1a6eff 100644 --- a/doc/source/workflows/events.rst +++ b/doc/source/workflows/events.rst @@ -34,7 +34,7 @@ Workflow events are a special type of workflow task. They "finish" when the even return args # Gather will run after 60 seconds, when both event1 and event2 are done. - workflow.create(gather.bind(event1_task, event_2_task)).run() + workflow.run(gather.bind(event1_task, event_2_task)) Custom event listeners diff --git a/doc/source/workflows/metadata.rst b/doc/source/workflows/metadata.rst index 7e5a640094bc3..55bf815da50eb 100644 --- a/doc/source/workflows/metadata.rst +++ b/doc/source/workflows/metadata.rst @@ -19,7 +19,7 @@ For example: def add(left: int, right: int) -> int: return left + right - workflow.create(add.bind(10, 20)).run("add_example") + workflow.run(add.bind(10, 20), workflow_id="add_example") workflow_metadata = workflow.get_metadata("add_example") @@ -32,7 +32,7 @@ providing the task name: .. code-block:: python - workflow.create(add.options(**workflow.options(name="add_task")).bind(10, 20)).run("add_example_2") + workflow.run(add.options(**workflow.options(name="add_task")).bind(10, 20), workflow_id="add_example_2") task_metadata = workflow.get_metadata("add_example_2", name="add_task") @@ -51,8 +51,8 @@ workflow or workflow task. .. code-block:: python - workflow.create(add.options(**workflow.options(name="add_task", metadata={"task_k": "task_v"})).bind(10, 20))\ - .run("add_example_3", metadata={"workflow_k": "workflow_v"}) + workflow.run(add.options(**workflow.options(name="add_task", metadata={"task_k": "task_v"})).bind(10, 20) + workflow_id="add_example_3", metadata={"workflow_k": "workflow_v"}) assert workflow.get_metadata("add_example_3")["user_metadata"] == {"workflow_k": "workflow_v"} assert workflow.get_metadata("add_example_3", name="add_task")["user_metadata"] == {"task_k": "task_v"} @@ -92,7 +92,7 @@ is completed). time.sleep(1000) return 0 - workflow.create(simple.bind()).run_async(workflow_id) + workflow.run_async(simple.bind(), workflow_id=workflow_id) # make sure workflow task starts running while not flag.exists(): @@ -126,7 +126,7 @@ be updated whenever a workflow is resumed. return 0 with pytest.raises(ray.exceptions.RaySystemError): - workflow.create(simple.bind()).run(workflow_id) + workflow.run(simple.bind(), workflow_id=workflow_id) workflow_metadata_failed = workflow.get_metadata(workflow_id) assert workflow_metadata_failed["status"] == "FAILED" diff --git a/doc/source/workflows/package-ref.rst b/doc/source/workflows/package-ref.rst index db239d5df9e7b..029893b944a03 100644 --- a/doc/source/workflows/package-ref.rst +++ b/doc/source/workflows/package-ref.rst @@ -1,6 +1,13 @@ Ray Workflows API ================= +Workflow Execution API +---------------------- + +.. autofunction:: ray.workflow.run +.. autofunction:: ray.workflow.run_async + + Management API -------------- .. autofunction:: ray.workflow.resume_all From 0e9f7557fce09936fbf33f0152e8ab14457647e1 Mon Sep 17 00:00:00 2001 From: Siyuan Zhuang Date: Thu, 30 Jun 2022 14:24:50 -0700 Subject: [PATCH 3/4] fix --- doc/source/workflows/basics.rst | 3 +-- doc/source/workflows/comparison.rst | 7 ++++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/doc/source/workflows/basics.rst b/doc/source/workflows/basics.rst index bf4617c558dc7..e7336a2b9aa29 100644 --- a/doc/source/workflows/basics.rst +++ b/doc/source/workflows/basics.rst @@ -332,9 +332,8 @@ Continuations can be used to implement something more complex, for example, recu if n == 1: return 1 else: - return multiply(n, factorial.bind(n - 1)) + return multiply(n, factorial(n - 1)) - @ray.remote def multiply(a: int, b: int) -> int: return a * b diff --git a/doc/source/workflows/comparison.rst b/doc/source/workflows/comparison.rst index 6f9b9619308dc..52ae404b2a63e 100644 --- a/doc/source/workflows/comparison.rst +++ b/doc/source/workflows/comparison.rst @@ -7,9 +7,10 @@ Workflows is built on top of Ray, and offers a mostly consistent subset of its A ``func.remote`` vs ``func.bind`` ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -With Ray tasks, ``func.remote`` will submit a remote task to run eagerly. -In Ray workflows, ``func.bind`` is used to create a DAG, and the DAG is converted into a workflow. -Execution of the workflow is deferred until ``workflow.run(dag, workflow_id=...)`` or ``workflow.run_async(dag, workflow_id=...)`` is called on the DAG. +With Ray tasks, ``func.remote`` will submit a remote task to run eagerly; ``func.bind`` will generate +a node in a DAG, it will not be executed until the DAG is been executed. + +Under the context of Ray Workflows, the execution of the DAG is deferred until ``workflow.run(dag, workflow_id=...)`` or ``workflow.run_async(dag, workflow_id=...)`` is called on the DAG. Specifying the workflow id allows for resuming of the workflow by its id in case of cluster failure. Other Workflow Engines From ad28649450711e8c2be0ced761644092ad0ed75d Mon Sep 17 00:00:00 2001 From: Siyuan Date: Sat, 2 Jul 2022 18:19:11 -0700 Subject: [PATCH 4/4] fix --- python/ray/workflow/tests/test_basic_workflows_2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/workflow/tests/test_basic_workflows_2.py b/python/ray/workflow/tests/test_basic_workflows_2.py index 010cbfec02d2b..9f1db9182dd7d 100644 --- a/python/ray/workflow/tests/test_basic_workflows_2.py +++ b/python/ray/workflow/tests/test_basic_workflows_2.py @@ -62,7 +62,7 @@ def simple(v): return v lock.acquire() - obj = workflow.run(simple.bind(0), workflow_id="simple") + obj = workflow.run_async(simple.bind(0), workflow_id="simple") obj2 = workflow.get_output("simple") lock.release() assert ray.get([obj, obj2]) == [0, 0]