Skip to content

Commit

Permalink
Fix failFast bug: When a node in the middle fails, the entire workflo…
Browse files Browse the repository at this point in the history
…w will hang (argoproj#1468)
  • Loading branch information
xianlubird authored and jessesuen committed Jul 15, 2019
1 parent 42adbf3 commit 7d9bb51
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 5 deletions.
54 changes: 54 additions & 0 deletions test/e2e/expectedfailures/dag-disbale-failFast-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-primay-branch-
spec:
entrypoint: statis
templates:
- name: a
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
- name: b
retryStrategy:
limit: 2
container:
image: alpine:latest
command: [sh, -c]
args: ["sleep 30; echo haha"]
- name: c
retryStrategy:
limit: 3
container:
image: alpine:latest
command: [sh, -c]
args: ["echo intentional failure; exit 2"]
- name: d
container:
image: alpine:latest
command: [sh, -c]
args: ["echo intentional failure; exit 2"]
- name: e
container:
image: alpine:latest
command: [sh, -c]
args: ["sleep 30; echo haha"]
- name: statis
dag:
failFast: false
tasks:
- name: A
template: a
- name: B
dependencies: [A]
template: b
- name: C
dependencies: [A]
template: c
- name: D
dependencies: [B]
template: d
- name: E
dependencies: [D]
template: e
37 changes: 32 additions & 5 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,30 @@ func (d *dagContext) getTaskNode(taskName string) *wfv1.NodeStatus {
return &node
}

// Assert all branch finished for failFast:disable function
func (d *dagContext) assertBranchFinished(targetTaskName string) bool {
// We should ensure that from the bottom to the top,
// all the nodes of this branch have at least one failure.
// If successful, we should continue to run down until the leaf node
taskNode := d.getTaskNode(targetTaskName)
if taskNode == nil {
taskObject := d.getTask(targetTaskName)
if taskObject != nil {
// Make sure all the dependency node have one failed
for _, tmpTaskName := range taskObject.Dependencies {
// Recursive check until top root node
return d.assertBranchFinished(tmpTaskName)
}
}
} else if !taskNode.Successful() {
return true
}

// In failFast situation, if node is successful, it will run to leaf node, above
// the function, we have already check the leaf node status
return false
}

// assessDAGPhase assesses the overall DAG status
func (d *dagContext) assessDAGPhase(targetTasks []string, nodes map[string]wfv1.NodeStatus) wfv1.NodePhase {
// First check all our nodes to see if anything is still running. If so, then the DAG is
Expand Down Expand Up @@ -93,17 +117,20 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes map[string]wfv1.
if d.tmpl.DAG.FailFast != nil && !*d.tmpl.DAG.FailFast {
tmpOverAllFinished := true
// If all the nodes have finished, we should mark the failed node to finish overall workflow
// So we should check all the targetTasks have finished
// So we should check all the targetTasks branch have finished
for _, tmpDepName := range targetTasks {
tmpDepNode := d.getTaskNode(tmpDepName)
if tmpDepNode == nil {
// If leaf node is nil, we should check it's parent node and recursive check
if !d.assertBranchFinished(tmpDepName) {
tmpOverAllFinished = false
}
} else if tmpDepNode.Type == wfv1.NodeTypeRetry && hasMoreRetries(tmpDepNode, d.wf) {
tmpOverAllFinished = false
break
}
if tmpDepNode.Type == wfv1.NodeTypeRetry && hasMoreRetries(tmpDepNode, d.wf) {
tmpOverAllFinished = false
break
}

//If leaf node has finished, we should mark the error workflow
}
if !tmpOverAllFinished {
return wfv1.NodeRunning
Expand Down

0 comments on commit 7d9bb51

Please sign in to comment.