If ops fail or upstream data has changed within a job execution, the job may need to be re-run starting from a particular point. Dagster calls this process re-execution.
Imagine a machine learning job with three ops. The first op, training the model, is the most time and resource intensive. Then, we test the model, and build analyses on the results. Suppose that the job fails with the op that is testing the model. After fixing the root cause, we want to re-run our job. However, it would take much more time to create a new run of the job as we would have to repeat the first op. It would be more economical to start again from the second op, reusing the previous run's execution result for the first op.
With Dagster, the re-execution of parts of the job is grouped with the original run to make it easy to trace. The original job execution metadata is not overwritten, making re-execution a non-destructive operation.
Consider the following job which has three ops, one of which fails half of the time.
from dagster import in_process_executor, job, op
@opdefstart():return1@opdefunreliable(num:int)->int:
failure_rate =0.5if some_random_result()< failure_rate:raise Exception("blah")return num
@opdefend(_num:int):pass@job(executor_def=in_process_executor)defunreliable_job():
end(unreliable(start()))
Although very simple, there are inputs and outputs passed between ops. With an IO manager, re-execution is able to handle inputs and outputs stored from the initial run.
To initiate a re-execution from an existing run, navigate to the run in the UI and you can find the re-execution option on the top right of the interface.
Under the re-execution drop down, you will see multiple options. No matter which one you choose, the re-executed job is linked to the original run.
All Ops: Re-execute the job from scratch. This option is most relevant if you would like to associate runs together when testing jobs end-to-end.
Selected Op(s): Re-execute the selected op(s). Ops can be selected regardless of their op status. This option is most relevant if your job is large, and you know exactly which ops to execute. This can be done by clicking on the boxes in the gantt chart view.
From Selected: Re-execute the job downstream from the selected ops. This option is most relevant if a particular op fails, and your intent is to run all downstream ops regardless of op status. You are likely developing a single op, and want to make sure downstream ops work as expected.
From Failure: Re-execute the job, skipping ops completed successfully. This option is only enabled when the run has failed. You have likely fixed the failed op, and want to re-run the op and all downstream dependencies. Dagster will figure out the dependencies for you!
In the above example, re-executing from failure would make sense as the failed task has a 50% chance of succeeding on the next run.
If the run succeeded but the underlying code changed, running specific ops to test the differences would be more relevant.
Within the UI, a single or multiple ops may be selected simply by clicking them with the mouse. Alternatively, you can use the subset selector and specify your desired op names to re-run.
Using Dagster's API, you can programmatically trigger both an execution and a reexecution. Upon an initial run failing, you may want to re-trigger a run from the point of failure, as shown above. Similarly, you can trigger a re-execution of selected ops or from a particular point.
# re-execute the job, but only the "unreliable" op and all its descendents
options = ReexecutionOptions(
parent_run_id=initial_result.run_id, step_selection=["unreliable*"])
result = execute_job(
reconstructable(unreliable_job),
instance=instance,
reexecution_options=options,)