From 1212df4d19dd18045fd0aded7fd1dc5726f7d5c5 Mon Sep 17 00:00:00 2001 From: markterm Date: Thu, 3 Dec 2020 12:53:36 +0000 Subject: [PATCH] feat(controller): Support .AnySucceeded / .AllFailed for TaskGroup in depends logic. Closes #3405 (#3964) Signed-off-by: Mark White --- docs/enhanced-depends-logic.md | 7 ++ workflow/common/ancestry.go | 46 ++++++----- workflow/common/ancestry_test.go | 22 +++++- workflow/controller/dag.go | 47 ++++++++--- workflow/controller/dag_test.go | 103 +++++++++++++++++++++++++ workflow/validate/validate.go | 29 +++++-- workflow/validate/validate_dag_test.go | 29 +++++++ 7 files changed, 242 insertions(+), 41 deletions(-) diff --git a/docs/enhanced-depends-logic.md b/docs/enhanced-depends-logic.md index 71da1797eccb..d5e7bc6b6402 100644 --- a/docs/enhanced-depends-logic.md +++ b/docs/enhanced-depends-logic.md @@ -52,6 +52,13 @@ Full boolean logic is also available. Operators include: depends: "(task-2.Succeeded || task-2.Skipped) && !task-3.Failed" ``` +In the case that you're depending on a task that uses withItems, you can depend on +whether any of the item tasks are successful or all have failed using .AnySucceeded and .AllFailed, for example: + +``` +depends: "task-1.AnySucceeded || task-2.AllFailed" +``` + ## Compatibility with `dependencies` and `dag.task.continueOn` This feature is fully compatible with `dependencies` and conversion is easy. diff --git a/workflow/common/ancestry.go b/workflow/common/ancestry.go index 2c1452e77115..13f454678119 100644 --- a/workflow/common/ancestry.go +++ b/workflow/common/ancestry.go @@ -19,17 +19,19 @@ type DagContext interface { type TaskResult string const ( - TaskResultSucceeded TaskResult = "Succeeded" - TaskResultFailed TaskResult = "Failed" - TaskResultErrored TaskResult = "Errored" - TaskResultSkipped TaskResult = "Skipped" - TaskResultDaemoned TaskResult = "Daemoned" + TaskResultSucceeded TaskResult = "Succeeded" + TaskResultFailed TaskResult = "Failed" + TaskResultErrored TaskResult = "Errored" + TaskResultSkipped TaskResult = "Skipped" + TaskResultDaemoned TaskResult = "Daemoned" + TaskResultAnySucceeded TaskResult = "AnySucceeded" + TaskResultAllFailed TaskResult = "AllFailed" ) var ( // TODO: This should use validate.workflowFieldNameFmt, but we can't import it here because an import cycle would be created - taskNameRegex = regexp.MustCompile(`([a-zA-Z0-9][-a-zA-Z0-9]*?\.[A-Z][a-z]+)|([a-zA-Z0-9][-a-zA-Z0-9]*)`) - taskResultRegex = regexp.MustCompile(`([a-zA-Z0-9][-a-zA-Z0-9]*?\.[A-Z][a-z]+)`) + taskNameRegex = regexp.MustCompile(`([a-zA-Z0-9][-a-zA-Z0-9]*?\.[A-Z][a-zA-Z]+)|([a-zA-Z0-9][-a-zA-Z0-9]*)`) + taskResultRegex = regexp.MustCompile(`([a-zA-Z0-9][-a-zA-Z0-9]*?\.[A-Z][a-zA-Z]+)`) ) type expansionMatch struct { @@ -38,31 +40,37 @@ type expansionMatch struct { end int } -func GetTaskDependencies(task *wfv1.DAGTask, ctx DagContext) ([]string, string) { +type DependencyType int + +const ( + DependencyTypeTask DependencyType = iota + DependencyTypeItems +) + +func GetTaskDependencies(task *wfv1.DAGTask, ctx DagContext) (map[string]DependencyType, string) { depends := getTaskDependsLogic(task, ctx) matches := taskNameRegex.FindAllStringSubmatchIndex(depends, -1) var expansionMatches []expansionMatch - dependencies := make(map[string]bool) + dependencies := make(map[string]DependencyType) for _, matchGroup := range matches { // We have matched a taskName.TaskResult if matchGroup[2] != -1 { match := depends[matchGroup[2]:matchGroup[3]] split := strings.Split(match, ".") - dependencies[split[0]] = true + if split[1] == string(TaskResultAnySucceeded) || split[1] == string(TaskResultAllFailed) { + dependencies[split[0]] = DependencyTypeItems + } else if _, ok := dependencies[split[0]]; !ok { //DependencyTypeItems takes precedence + dependencies[split[0]] = DependencyTypeTask + } } else if matchGroup[4] != -1 { match := depends[matchGroup[4]:matchGroup[5]] - dependencies[match] = true + dependencies[match] = DependencyTypeTask expansionMatches = append(expansionMatches, expansionMatch{taskName: match, start: matchGroup[4], end: matchGroup[5]}) } } - var out []string - for dependency := range dependencies { - out = append(out, dependency) - } - if len(expansionMatches) == 0 { - return out, depends + return dependencies, depends } sort.Slice(expansionMatches, func(i, j int) bool { @@ -74,7 +82,7 @@ func GetTaskDependencies(task *wfv1.DAGTask, ctx DagContext) ([]string, string) depends = depends[:match.start] + expandDependency(match.taskName, matchTask) + depends[match.end:] } - return out, depends + return dependencies, depends } func ValidateTaskResults(dagTask *wfv1.DAGTask) error { @@ -88,7 +96,7 @@ func ValidateTaskResults(dagTask *wfv1.DAGTask) error { split := strings.Split(matchGroup[1], ".") taskName, taskResult := split[0], TaskResult(split[1]) switch taskResult { - case TaskResultSucceeded, TaskResultFailed, TaskResultSkipped, TaskResultErrored, TaskResultDaemoned: + case TaskResultSucceeded, TaskResultFailed, TaskResultSkipped, TaskResultErrored, TaskResultDaemoned, TaskResultAnySucceeded, TaskResultAllFailed: // Do nothing default: return fmt.Errorf("task result '%s' for task '%s' is invalid", taskResult, taskName) diff --git a/workflow/common/ancestry_test.go b/workflow/common/ancestry_test.go index a774dc7b21b0..79507165ca6b 100644 --- a/workflow/common/ancestry_test.go +++ b/workflow/common/ancestry_test.go @@ -34,6 +34,14 @@ func TestGetTaskDependenciesFromDepends(t *testing.T) { } assert.Equal(t, "((task-1.Succeeded || task-1.Skipped || task-1.Daemoned) || task-2.Succeeded) && !task-3.Succeeded", logic) + task = &wfv1.DAGTask{Depends: "(task-1 || task-2.AnySucceeded) && !task-3.Succeeded"} + deps, logic = GetTaskDependencies(task, ctx) + assert.Len(t, deps, 3) + for _, dep := range []string{"task-1", "task-2", "task-3"} { + assert.Contains(t, deps, dep) + } + assert.Equal(t, "((task-1.Succeeded || task-1.Skipped || task-1.Daemoned) || task-2.AnySucceeded) && !task-3.Succeeded", logic) + task = &wfv1.DAGTask{Depends: "(task-1||(task-2.Succeeded || task-2.Failed))&&!task-3.Failed"} deps, logic = GetTaskDependencies(task, ctx) assert.Len(t, deps, 3) @@ -44,26 +52,32 @@ func TestGetTaskDependenciesFromDepends(t *testing.T) { task = &wfv1.DAGTask{Depends: "(task-1 || task-1.Succeeded) && !task-1.Failed"} deps, logic = GetTaskDependencies(task, ctx) - assert.Equal(t, []string{"task-1"}, deps) + assert.Equal(t, map[string]DependencyType{"task-1": DependencyTypeTask}, deps) assert.Equal(t, "((task-1.Succeeded || task-1.Skipped || task-1.Daemoned) || task-1.Succeeded) && !task-1.Failed", logic) + task = &wfv1.DAGTask{Depends: "task-1.Succeeded && task-1.AnySucceeded"} + deps, logic = GetTaskDependencies(task, ctx) + assert.Equal(t, map[string]DependencyType{"task-1": DependencyTypeItems}, deps) + assert.Equal(t, "task-1.Succeeded && task-1.AnySucceeded", logic) + ctx.testTasks[0].ContinueOn = &wfv1.ContinueOn{Failed: true} task = &wfv1.DAGTask{Depends: "task-1"} deps, logic = GetTaskDependencies(task, ctx) - assert.Equal(t, []string{"task-1"}, deps) + assert.Equal(t, map[string]DependencyType{"task-1": DependencyTypeTask}, deps) assert.Equal(t, "(task-1.Succeeded || task-1.Skipped || task-1.Daemoned || task-1.Failed)", logic) ctx.testTasks[0].ContinueOn = &wfv1.ContinueOn{Error: true} task = &wfv1.DAGTask{Depends: "task-1"} deps, logic = GetTaskDependencies(task, ctx) - assert.Equal(t, []string{"task-1"}, deps) + assert.Equal(t, map[string]DependencyType{"task-1": DependencyTypeTask}, deps) assert.Equal(t, "(task-1.Succeeded || task-1.Skipped || task-1.Daemoned || task-1.Errored)", logic) ctx.testTasks[0].ContinueOn = &wfv1.ContinueOn{Failed: true, Error: true} task = &wfv1.DAGTask{Depends: "task-1"} deps, logic = GetTaskDependencies(task, ctx) - assert.Equal(t, []string{"task-1"}, deps) + assert.Equal(t, map[string]DependencyType{"task-1": DependencyTypeTask}, deps) assert.Equal(t, "(task-1.Succeeded || task-1.Skipped || task-1.Daemoned || task-1.Errored || task-1.Failed)", logic) + } func TestValidateTaskResults(t *testing.T) { diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index f7090e46da71..de9996072d6b 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -92,7 +92,12 @@ func (d *dagContext) GetTaskDependsLogic(taskName string) string { func (d *dagContext) resolveDependencies(taskName string) { dependencies, resolvedDependsLogic := common.GetTaskDependencies(d.GetTask(taskName), d) - d.dependencies[taskName] = dependencies + var dependencyTasks []string + for dep := range dependencies { + dependencyTasks = append(dependencyTasks, dep) + } + + d.dependencies[taskName] = dependencyTasks d.dependsLogic[taskName] = resolvedDependsLogic } @@ -437,7 +442,7 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) { taskNodeName := dagCtx.taskNodeName(t.Name) node = dagCtx.getTaskNode(t.Name) if node == nil { - woc.log.Infof("All of node %s dependencies %s completed", taskNodeName, taskDependencies) + woc.log.Infof("All of node %s dependencies %v completed", taskNodeName, taskDependencies) // Add the child relationship from our dependency's outbound nodes to this node. connectDependencies(taskNodeName) @@ -674,11 +679,13 @@ func expandTask(task wfv1.DAGTask) ([]wfv1.DAGTask, error) { } type TaskResults struct { - Succeeded bool `json:"Succeeded"` - Failed bool `json:"Failed"` - Errored bool `json:"Errored"` - Skipped bool `json:"Skipped"` - Daemoned bool `json:"Daemoned"` + Succeeded bool `json:"Succeeded"` + Failed bool `json:"Failed"` + Errored bool `json:"Errored"` + Skipped bool `json:"Skipped"` + Daemoned bool `json:"Daemoned"` + AnySucceeded bool `json:"AnySucceeded"` + AllFailed bool `json:"AllFailed"` } // evaluateDependsLogic returns whether a node should execute and proceed. proceed means that all of its dependencies are @@ -706,12 +713,28 @@ func (d *dagContext) evaluateDependsLogic(taskName string) (bool, bool, error) { continue } + anySucceeded := false + allFailed := false + + if depNode.Type == wfv1.NodeTypeTaskGroup { + + allFailed = len(depNode.Children) > 0 + + for _, childNodeID := range depNode.Children { + childNode := d.wf.Status.Nodes[childNodeID] + anySucceeded = anySucceeded || childNode.Phase == wfv1.NodeSucceeded + allFailed = allFailed && childNode.Phase == wfv1.NodeFailed + } + } + evalScope[evalTaskName] = TaskResults{ - Succeeded: depNode.Phase == wfv1.NodeSucceeded, - Failed: depNode.Phase == wfv1.NodeFailed, - Errored: depNode.Phase == wfv1.NodeError, - Skipped: depNode.Phase == wfv1.NodeSkipped, - Daemoned: depNode.IsDaemoned() && depNode.Phase != wfv1.NodePending, + Succeeded: depNode.Phase == wfv1.NodeSucceeded, + Failed: depNode.Phase == wfv1.NodeFailed, + Errored: depNode.Phase == wfv1.NodeError, + Skipped: depNode.Phase == wfv1.NodeSkipped, + Daemoned: depNode.IsDaemoned() && depNode.Phase != wfv1.NodePending, + AnySucceeded: anySucceeded, + AllFailed: allFailed, } } diff --git a/workflow/controller/dag_test.go b/workflow/controller/dag_test.go index 80beb0e7cb8a..bdb17789e5dd 100644 --- a/workflow/controller/dag_test.go +++ b/workflow/controller/dag_test.go @@ -297,6 +297,109 @@ func TestEvaluateDependsLogic(t *testing.T) { assert.True(t, execute) } +func TestEvaluateAnyAllDependsLogic(t *testing.T) { + testTasks := []wfv1.DAGTask{ + { + Name: "A", + }, + { + Name: "A-1", + }, + { + Name: "A-2", + }, + { + Name: "B", + Depends: "A.AnySucceeded", + }, + { + Name: "B-1", + }, + { + Name: "B-2", + }, + { + Name: "C", + Depends: "B.AllFailed", + }, + } + + d := &dagContext{ + boundaryName: "test", + tasks: testTasks, + wf: &wfv1.Workflow{ObjectMeta: metav1.ObjectMeta{Name: "test-wf"}}, + dependencies: make(map[string][]string), + dependsLogic: make(map[string]string), + } + + // Task A is still running, A-1 succeeded but A-2 failed + d.wf = &wfv1.Workflow{ + ObjectMeta: metav1.ObjectMeta{Name: "test-wf"}, + Status: wfv1.WorkflowStatus{ + Nodes: map[string]wfv1.NodeStatus{ + d.taskNodeID("A"): { + Phase: wfv1.NodeRunning, + Type: wfv1.NodeTypeTaskGroup, + Children: []string{d.taskNodeID("A-1"), d.taskNodeID("A-2")}, + }, + d.taskNodeID("A-1"): {Phase: wfv1.NodeRunning}, + d.taskNodeID("A-2"): {Phase: wfv1.NodeRunning}, + }, + }, + } + + // Task B should not proceed as task A is still running + execute, proceed, err := d.evaluateDependsLogic("B") + assert.NoError(t, err) + assert.False(t, proceed) + assert.False(t, execute) + + // Task A succeeded + d.wf.Status.Nodes[d.taskNodeID("A")] = wfv1.NodeStatus{ + Phase: wfv1.NodeSucceeded, + Type: wfv1.NodeTypeTaskGroup, + Children: []string{d.taskNodeID("A-1"), d.taskNodeID("A-2")}, + } + + // Task B should proceed, but not execute as none of the children have succeeded yet + execute, proceed, err = d.evaluateDependsLogic("B") + assert.NoError(t, err) + assert.True(t, proceed) + assert.False(t, execute) + + // Task A-2 succeeded + d.wf.Status.Nodes[d.taskNodeID("A-2")] = wfv1.NodeStatus{Phase: wfv1.NodeSucceeded} + + // Task B should now proceed and execute + execute, proceed, err = d.evaluateDependsLogic("B") + assert.NoError(t, err) + assert.True(t, proceed) + assert.True(t, execute) + + // Task B succeeds and B-1 fails + d.wf.Status.Nodes[d.taskNodeID("B")] = wfv1.NodeStatus{ + Phase: wfv1.NodeSucceeded, + Type: wfv1.NodeTypeTaskGroup, + Children: []string{d.taskNodeID("B-1"), d.taskNodeID("B-2")}, + } + d.wf.Status.Nodes[d.taskNodeID("B-1")] = wfv1.NodeStatus{Phase: wfv1.NodeFailed} + + // Task C should proceed, but not execute as not all of B's children have failed yet + execute, proceed, err = d.evaluateDependsLogic("C") + assert.NoError(t, err) + assert.True(t, proceed) + assert.False(t, execute) + + d.wf.Status.Nodes[d.taskNodeID("B-2")] = wfv1.NodeStatus{Phase: wfv1.NodeFailed} + + // Task C should now proceed and execute as all of B's children have failed + execute, proceed, err = d.evaluateDependsLogic("C") + assert.NoError(t, err) + assert.True(t, proceed) + assert.True(t, execute) + +} + func TestAllEvaluateDependsLogic(t *testing.T) { statusMap := map[common.TaskResult]wfv1.NodePhase{ common.TaskResultSucceeded: wfv1.NodeSucceeded, diff --git a/workflow/validate/validate.go b/workflow/validate/validate.go index 546549983570..92982ece4103 100644 --- a/workflow/validate/validate.go +++ b/workflow/validate/validate.go @@ -1151,7 +1151,7 @@ func validateWorkflowFieldNames(slice interface{}) error { type dagValidationContext struct { tasks map[string]wfv1.DAGTask - dependencies map[string][]string + dependencies map[string]map[string]common.DependencyType //map of DAG tasks, each one containing a map of [task it's dependent on] -> [dependency type] } func (d *dagValidationContext) GetTask(taskName string) *wfv1.DAGTask { @@ -1160,6 +1160,17 @@ func (d *dagValidationContext) GetTask(taskName string) *wfv1.DAGTask { } func (d *dagValidationContext) GetTaskDependencies(taskName string) []string { + dependencies := d.GetTaskDependenciesWithDependencyTypes(taskName) + + var dependencyTasks []string + for task := range dependencies { + dependencyTasks = append(dependencyTasks, task) + } + + return dependencyTasks +} + +func (d *dagValidationContext) GetTaskDependenciesWithDependencyTypes(taskName string) map[string]common.DependencyType { if dependencies, ok := d.dependencies[taskName]; ok { return dependencies } @@ -1203,7 +1214,7 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl dagValidationCtx := &dagValidationContext{ tasks: nameToTask, - dependencies: make(map[string][]string), + dependencies: make(map[string]map[string]common.DependencyType), } resolvedTemplates := make(map[string]*wfv1.Template) @@ -1238,11 +1249,17 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks.%s %s", tmpl.Name, task.Name, err.Error()) } - for j, depName := range dagValidationCtx.GetTaskDependencies(task.Name) { - if _, ok := dagValidationCtx.tasks[depName]; !ok { + for depName, depType := range dagValidationCtx.GetTaskDependenciesWithDependencyTypes(task.Name) { + task, ok := dagValidationCtx.tasks[depName] + if !ok { + return errors.Errorf(errors.CodeBadRequest, + "templates.%s.tasks.%s dependency '%s' not defined", + tmpl.Name, task.Name, depName) + + } else if depType == common.DependencyTypeItems && len(task.WithItems) == 0 && task.WithParam == "" && task.WithSequence == nil { return errors.Errorf(errors.CodeBadRequest, - "templates.%s.tasks.%s.dependencies[%d] dependency '%s' not defined", - tmpl.Name, task.Name, j, depName) + "templates.%s.tasks.%s dependency '%s' uses an items-based condition such as .AnySucceeded or .AllFailed but does not contain any items", + tmpl.Name, task.Name, depName) } } } diff --git a/workflow/validate/validate_dag_test.go b/workflow/validate/validate_dag_test.go index 8313b680e835..059095029949 100644 --- a/workflow/validate/validate_dag_test.go +++ b/workflow/validate/validate_dag_test.go @@ -39,6 +39,35 @@ func TestDAGCycle(t *testing.T) { } } +var dagAnyWithoutExpandingTask = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: dag-cycle- +spec: + entrypoint: entry + templates: + - name: echo + container: + image: alpine:3.7 + command: [echo, hello] + - name: entry + dag: + tasks: + - name: A + template: echo + - name: B + depends: A.AnySucceeded + template: echo +` + +func TestAnyWithoutExpandingTask(t *testing.T) { + _, err := validate(dagAnyWithoutExpandingTask) + if assert.NotNil(t, err) { + assert.Contains(t, err.Error(), "does not contain any items") + } +} + var dagUndefinedTemplate = ` apiVersion: argoproj.io/v1alpha1 kind: Workflow