Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Feature: provide failFast flag, allow a DAG to run all branches of the DAG (either success or failure) #1443

Merged
merged 5 commits into from
Jul 1, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
New Feature: provide failFast flag, allow a DAG to run all branches o…
…f the DAG (either success or failure)
  • Loading branch information
xianlubird authored and xianlu committed Jun 25, 2019
commit eab546948d06acb4ec9c7619a494fef62dc463dc
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,10 @@
"description": "Entrypoint is a template reference to the starting point of the workflow",
"type": "string"
},
"failFast": {
"description": "This flag is for DAG logic. The DAG logic has a built-in \"fail fast\" feature to stop scheduling new steps, as soon as it detects that one of the DAG nodes is failed. Then it waits until all DAG nodes are completed before failing the DAG itself. The FailFast flag default is true, if set to false, it will allow a DAG to run all branches of the DAG to completion (either success or failure), regardless of the failed outcomes of branches in the DAG. More info and example about this feature at https://github.com/argoproj/argo/issues/1442",
"type": "boolean"
},
"hostAliases": {
"description": "HostAliases is an optional list of hosts and IPs that will be injected into the pod spec",
"type": "array",
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/apis/workflow/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ type WorkflowSpec struct {

// HostAliases is an optional list of hosts and IPs that will be injected into the pod spec
HostAliases []apiv1.HostAlias `json:"hostAliases,omitempty"`

// This flag is for DAG logic. The DAG logic has a built-in "fail fast" feature to stop scheduling new steps,
// as soon as it detects that one of the DAG nodes is failed. Then it waits until all DAG nodes are completed
// before failing the DAG itself.
// The FailFast flag default is true, if set to false, it will allow a DAG to run all branches of the DAG to
// completion (either success or failure), regardless of the failed outcomes of branches in the DAG.
// More info and example about this feature at https://github.com/argoproj/argo/issues/1442
FailFast *bool `json:"failFast,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a DAG specific field, I think this may be more suitable the DAG template? Downside is that FailFast would have to be set for every DAG template, if you want the entire workflow to behave this way. On the other hand, having it at DAG template level means you can decide to have some DAG template fail fast, and others not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.

}

// Template is a reusable and composable unit of execution in a workflow
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 49 additions & 0 deletions test/e2e/functional/dag-disable-failFast.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
apiVersion: argoproj.io/v1alpha1
Copy link
Member

@jessesuen jessesuen Jun 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this file under test/e2e/expectedfailures since it is expected to fail

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

kind: Workflow
metadata:
generateName: dag-primay-branch-
spec:
failFast: false
entrypoint: statis
templates:
- name: a
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
- name: b
retryStrategy:
limit: 2
container:
image: alpine:latest
command: [sh, -c]
args: ["sleep 30; echo haha"]
- name: c
retryStrategy:
limit: 3
container:
image: alpine:latest
command: [sh, -c]
args: ["echo intentional failure; exit 2"]
- name: d
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
- name: statis
dag:
tasks:
- name: A
template: a
- name: B
dependencies: [A]
template: b
- name: C
dependencies: [A]
template: c
- name: D
dependencies: [B]
template: d
- name: E
dependencies: [D]
template: d
41 changes: 22 additions & 19 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,29 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes map[string]wfv1.
retriesExhausted = false
}
}

if unsuccessfulPhase != "" {
// If failFast set to false, we should return Running to continue this workflow for other DAG branch
if d.wf.Spec.FailFast != nil && !*d.wf.Spec.FailFast {
tmpOverAllFinished := true
// If all the nodes have finished, we should mark the failed node to finish overall workflow
// So we should check all the targetTasks have finished
for _, tmpDepName := range targetTasks {
tmpDepNode := d.getTaskNode(tmpDepName)
if tmpDepNode == nil {
tmpOverAllFinished = false
jessesuen marked this conversation as resolved.
Show resolved Hide resolved
break
}
if tmpDepNode.Type == wfv1.NodeTypeRetry && hasMoreRetries(tmpDepNode, d.wf) {
tmpOverAllFinished = false
break
}
}
if !tmpOverAllFinished {
return wfv1.NodeRunning
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind writing a unit test for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure


// if we were unsuccessful, we can return *only* if all retry nodes have ben exhausted.
if retriesExhausted {
return unsuccessfulPhase
Expand Down Expand Up @@ -250,25 +272,6 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
dependenciesSuccessful = false
}
continue
} else if depNode.Type == wfv1.NodeTypeRetry {
// For retry type node
// Maybe one of children node already success, but the retry node hasn't sync to latest status
// So skip these steps, the next process function will make the status to be right
tmpContinueFlag := false
for _, tmpChildName := range depNode.Children {
tmpChild, tmpOK := woc.wf.Status.Nodes[tmpChildName]
if !tmpOK {
continue
}
if tmpChild.Successful() {
tmpContinueFlag = true
break
}
}
if tmpContinueFlag {
woc.markNodePhase(depNode.Name, wfv1.NodeSucceeded)
continue
}
}
}
dependenciesCompleted = false
Expand Down