Skip to content

Commit

Permalink
Fix DAG enable failFast will hang in some case (argoproj#1595)
Browse files Browse the repository at this point in the history
* Fix failFast will hang in some case
  • Loading branch information
xianlubird authored and sarabala1979 committed Sep 20, 2019
1 parent e9f3d9c commit 4bf83fc
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 15 deletions.
62 changes: 62 additions & 0 deletions test/e2e/expectedfailures/dag-failFast-hang.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-hang-failFast-
spec:
entrypoint: retry-with-dags
templates:
- name: retry-with-dags
dag:
failFast: false
tasks:
- name: A
template: success
- name: B
template: success-2
dependencies:
- A
- name: C
template: sub-dag2
dependencies:
- A
- name: D
dependencies:
- A
- C
template: success

- name: sub-dag
dag:
tasks:
- name: fail
template: fail
- name: success1
template: success

- name: fail
container:
command: [sh, -c, exit 1]
image: alpine
retryStrategy:
limit: 1

- name: sub-dag2
steps:
- - name: sub-dag-a
template: success
- - name: sub-dag-b
template: fail

- name: success
container:
command: [sh, -c, exit 0]
image: alpine
retryStrategy:
limit: 1

- name: success-2
container:
command: [sh, -c, sleep 30; exit 0]
image: alpine
retryStrategy:
limit: 1
31 changes: 16 additions & 15 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,27 +68,28 @@ func (d *dagContext) GetTaskNode(taskName string) *wfv1.NodeStatus {
}

// Assert all branch finished for failFast:disable function
func (d *dagContext) assertBranchFinished(targetTaskName string) bool {
func (d *dagContext) assertBranchFinished(targetTaskNames []string) bool {
// We should ensure that from the bottom to the top,
// all the nodes of this branch have at least one failure.
// If successful, we should continue to run down until the leaf node
taskNode := d.GetTaskNode(targetTaskName)
if taskNode == nil {
taskObject := d.getTask(targetTaskName)
if taskObject != nil {
// Make sure all the dependency node have one failed
for _, tmpTaskName := range taskObject.Dependencies {
flag := false
for _, targetTaskName := range targetTaskNames {
taskNode := d.GetTaskNode(targetTaskName)
if taskNode == nil {
taskObject := d.getTask(targetTaskName)
if taskObject != nil {
// Make sure all the dependency node have one failed
// Recursive check until top root node
return d.assertBranchFinished(tmpTaskName)
return d.assertBranchFinished(taskObject.Dependencies)
}
} else if !taskNode.Successful() {
flag = true
}
} else if !taskNode.Successful() {
return true
}

// In failFast situation, if node is successful, it will run to leaf node, above
// the function, we have already check the leaf node status
return false
// In failFast situation, if node is successful, it will run to leaf node, above
// the function, we have already check the leaf node status
}
return flag
}

// assessDAGPhase assesses the overall DAG status
Expand Down Expand Up @@ -128,7 +129,7 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes map[string]wfv1.
tmpDepNode := d.GetTaskNode(tmpDepName)
if tmpDepNode == nil {
// If leaf node is nil, we should check it's parent node and recursive check
if !d.assertBranchFinished(tmpDepName) {
if !d.assertBranchFinished([]string{tmpDepName}) {
tmpOverAllFinished = false
}
} else if tmpDepNode.Type == wfv1.NodeTypeRetry && d.hasMoreRetries(tmpDepNode) {
Expand Down

0 comments on commit 4bf83fc

Please sign in to comment.