diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json
index 9a4345fd8708..3242b077561d 100644
--- a/api/openapi-spec/swagger.json
+++ b/api/openapi-spec/swagger.json
@@ -128,6 +128,53 @@
}
}
},
+ "io.argoproj.workflow.v1alpha1.DAG": {
+ "description": "DAG is a template subtype for directed acyclic graph templates",
+ "required": [
+ "tasks"
+ ],
+ "properties": {
+ "target": {
+ "description": "Target are one or more names of targets to execute in a DAG",
+ "type": "string"
+ },
+ "tasks": {
+ "description": "Tasks are a list of DAG tasks",
+ "type": "array",
+ "items": {
+ "$ref": "#/definitions/io.argoproj.workflow.v1alpha1.DAGTask"
+ }
+ }
+ }
+ },
+ "io.argoproj.workflow.v1alpha1.DAGTask": {
+ "description": "DAGTask represents a node in the graph during DAG execution",
+ "required": [
+ "name",
+ "template"
+ ],
+ "properties": {
+ "arguments": {
+ "description": "Arguments are the parameter and artifact arguments to the template",
+ "$ref": "#/definitions/io.argoproj.workflow.v1alpha1.Arguments"
+ },
+ "dependencies": {
+ "description": "Dependencies are name of other targets which this depends on",
+ "type": "array",
+ "items": {
+ "type": "string"
+ }
+ },
+ "name": {
+ "description": "Name is the name of the target",
+ "type": "string"
+ },
+ "template": {
+ "description": "Name of template to execute",
+ "type": "string"
+ }
+ }
+ },
"io.argoproj.workflow.v1alpha1.GitArtifact": {
"description": "GitArtifact is the location of an git artifact",
"required": [
@@ -538,6 +585,10 @@
"description": "Deamon will allow a workflow to proceed to the next step so long as the container reaches readiness",
"type": "boolean"
},
+ "dag": {
+ "description": "DAG template subtype which runs a DAG",
+ "$ref": "#/definitions/io.argoproj.workflow.v1alpha1.DAG"
+ },
"inputs": {
"description": "Inputs describe what inputs parameters and artifacts are supplied to this template",
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.Inputs"
diff --git a/cmd/argo/commands/get.go b/cmd/argo/commands/get.go
index db98bb86b442..4baf7e445c0f 100644
--- a/cmd/argo/commands/get.go
+++ b/cmd/argo/commands/get.go
@@ -17,6 +17,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
+const onExitSuffix = "onExit"
+
func init() {
RootCmd.AddCommand(getCmd)
getCmd.Flags().StringVarP(&getArgs.output, "output", "o", "", "Output format. One of: json|yaml|wide")
@@ -116,25 +118,284 @@ func printWorkflowHelper(wf *wfv1.Workflow) {
} else {
fmt.Fprintf(w, "%s\tPODNAME\tDURATION\tMESSAGE\n", ansiFormat("STEP", FgDefault))
}
- node, ok := wf.Status.Nodes[wf.ObjectMeta.Name]
- if ok {
- printNodeTree(w, wf, node, 0, " ", " ")
- }
- onExitNode, ok := wf.Status.Nodes[wf.NodeID(wf.ObjectMeta.Name+".onExit")]
- if ok {
- fmt.Fprintf(w, "\t\t\t\t\n")
- onExitNode.Name = "onExit"
- printNodeTree(w, wf, onExitNode, 0, " ", " ")
+
+ // Convert Nodes to Render Trees
+ roots := convertToRenderTrees(wf)
+
+ // Print main and onExit Trees
+ rootNodeIDs := [2]string{wf.NodeID(wf.ObjectMeta.Name), wf.NodeID(wf.ObjectMeta.Name + "." + onExitSuffix)}
+ for _, id := range rootNodeIDs {
+ if node, ok := wf.Status.Nodes[id]; ok {
+ if root, ok := roots[node.ID]; ok {
+ root.renderNodes(w, wf, 0, " ", " ")
+ }
+ }
}
+
_ = w.Flush()
}
}
-func printNodeTree(w *tabwriter.Writer, wf *wfv1.Workflow, node wfv1.NodeStatus, depth int, nodePrefix string, childPrefix string) {
- nodeName := fmt.Sprintf("%s %s", jobStatusIconMap[node.Phase], node.Name)
+type nodeInfoInterface interface {
+ getID() string
+ getNodeStatus(wf *wfv1.Workflow) wfv1.NodeStatus
+ getStartTime(wf *wfv1.Workflow) metav1.Time
+}
+
+type nodeInfo struct {
+ id string
+}
+
+func (n *nodeInfo) getID() string {
+ return n.id
+}
+
+func (n *nodeInfo) getNodeStatus(wf *wfv1.Workflow) wfv1.NodeStatus {
+ return wf.Status.Nodes[n.id]
+}
+
+func (n *nodeInfo) getStartTime(wf *wfv1.Workflow) metav1.Time {
+ return wf.Status.Nodes[n.id].StartedAt
+}
+
+// Interface to represent Nodes in render form types
+type renderNode interface {
+ // Render this renderNode and its children
+ renderNodes(w *tabwriter.Writer, wf *wfv1.Workflow, depth int,
+ nodePrefix string, childPrefix string)
+ nodeInfoInterface
+}
+
+// Currently this is Pod or Resource Nodes
+type executionNode struct {
+ nodeInfo
+}
+
+// Currently this is the step groups or retry nodes
+type nonBoundaryParentNode struct {
+ nodeInfo
+ children []renderNode // Can be boundaryNode or executionNode
+}
+
+// Currently this is the virtual Template node
+type boundaryNode struct {
+ nodeInfo
+ boundaryContained []renderNode // Can be nonBoundaryParent or executionNode or boundaryNode
+}
+
+func isBoundaryNode(node wfv1.NodeType) bool {
+ return (node == wfv1.NodeTypeDAG) || (node == wfv1.NodeTypeSteps)
+}
+
+func isNonBoundaryParentNode(node wfv1.NodeType) bool {
+ return (node == wfv1.NodeTypeStepGroup) || (node == wfv1.NodeTypeRetry)
+}
+
+func isExecutionNode(node wfv1.NodeType) bool {
+ return (node == wfv1.NodeTypePod) || (node == wfv1.NodeTypeSkipped)
+}
+
+func insertSorted(wf *wfv1.Workflow, sortedArray []renderNode, item renderNode) []renderNode {
+ insertTime := item.getStartTime(wf)
+ var index int
+ for index = 0; index < len(sortedArray); index++ {
+ existingItem := sortedArray[index]
+ t := existingItem.getStartTime(wf)
+ if insertTime.Before(&t) {
+ break
+ } else if insertTime.Equal(&t) {
+ // If they are equal apply alphabetical order so we
+ // get some consistent printing
+ insertName := humanizeNodeName(wf, item.getNodeStatus(wf))
+ equalName := humanizeNodeName(wf, existingItem.getNodeStatus(wf))
+ if insertName < equalName {
+ break
+ }
+ }
+ }
+ sortedArray = append(sortedArray, nil)
+ copy(sortedArray[index+1:], sortedArray[index:])
+ sortedArray[index] = item
+ return sortedArray
+}
+
+// Attach render node n to its parent based on what has been parsed previously
+// In some cases add it to list of things that still needs to be attached to parent
+// Return if I am a possible root
+func attachToParent(wf *wfv1.Workflow, n renderNode,
+ nonBoundaryParentChildrenMap map[string]*nonBoundaryParentNode, boundaryID string,
+ boundaryNodeMap map[string]*boundaryNode, parentBoundaryMap map[string][]renderNode) bool {
+
+ // Check first if I am a child of a nonBoundaryParent
+ // that implies I attach to that instead of my boundary. This was already
+ // figured out in Pass 1
+ if nonBoundaryParent, ok := nonBoundaryParentChildrenMap[n.getID()]; ok {
+ nonBoundaryParent.children = insertSorted(wf, nonBoundaryParent.children, n)
+ return false
+ }
+
+ // If I am not attached to a nonBoundaryParent and I have no Boundary ID then
+ // I am a possible root
+ if boundaryID == "" {
+ return true
+ }
+ if parentBoundary, ok := boundaryNodeMap[boundaryID]; ok {
+ parentBoundary.boundaryContained = insertSorted(wf, parentBoundary.boundaryContained, n)
+ } else {
+ // put ourselves to be added by the parent when we get to it later
+ if _, ok := parentBoundaryMap[boundaryID]; !ok {
+ parentBoundaryMap[boundaryID] = make([]renderNode, 0)
+ }
+ parentBoundaryMap[boundaryID] = append(parentBoundaryMap[boundaryID], n)
+ }
+ return false
+}
+
+// This takes the map of NodeStatus and converts them into a forrest
+// of trees of renderNodes and returns the set of roots for each tree
+func convertToRenderTrees(wf *wfv1.Workflow) map[string]renderNode {
+
+ renderTreeRoots := make(map[string]renderNode)
+
+ // Used to store all boundary nodes so future render children can attach
+ // Maps node Name -> *boundaryNode
+ boundaryNodeMap := make(map[string]*boundaryNode)
+ // Used to store children of a boundary node that has not been parsed yet
+ // Maps boundary Node name -> array of render Children
+ parentBoundaryMap := make(map[string][]renderNode)
+
+ // Used to store Non Boundary Parent nodes so render children can attach
+ // Maps non Boundary Parent Node name -> *nonBoundaryParentNode
+ nonBoundaryParentMap := make(map[string]*nonBoundaryParentNode)
+ // Used to store children which have a Non Boundary Parent from rendering perspective
+ // Maps non Boundary render Children name -> *nonBoundaryParentNode
+ nonBoundaryParentChildrenMap := make(map[string]*nonBoundaryParentNode)
+
+ // We have to do a 2 pass approach because anything that is a child
+ // of a nonBoundaryParent and also has a boundaryID we may not know which
+ // parent to attach to if we didn't see the nonBoundaryParent earlier
+ // in a 1 pass strategy
+
+ // 1st Pass Process enough of nonBoundaryParent nodes to know all their children
+ for id, status := range wf.Status.Nodes {
+ if isNonBoundaryParentNode(status.Type) {
+ n := nonBoundaryParentNode{nodeInfo: nodeInfo{id: id}}
+ nonBoundaryParentMap[id] = &n
+
+ for _, child := range status.Children {
+ nonBoundaryParentChildrenMap[child] = &n
+ }
+ }
+ }
+
+ // 2nd Pass process everything
+ for id, status := range wf.Status.Nodes {
+ switch {
+ case isBoundaryNode(status.Type):
+ n := boundaryNode{nodeInfo: nodeInfo{id: id}}
+ boundaryNodeMap[id] = &n
+ // Attach to my parent if needed
+ if attachToParent(wf, &n, nonBoundaryParentChildrenMap,
+ status.BoundaryID, boundaryNodeMap, parentBoundaryMap) {
+ renderTreeRoots[n.getID()] = &n
+ }
+ // Attach nodes who are in my boundary already seen before me to me
+ for _, val := range parentBoundaryMap[id] {
+ n.boundaryContained = insertSorted(wf, n.boundaryContained, val)
+ }
+ case isNonBoundaryParentNode(status.Type):
+ nPtr, ok := nonBoundaryParentMap[id]
+ if !ok {
+ log.Fatal("Unable to lookup node " + id)
+ return nil
+ }
+ // Attach to my parent if needed
+ if attachToParent(wf, nPtr, nonBoundaryParentChildrenMap,
+ status.BoundaryID, boundaryNodeMap, parentBoundaryMap) {
+ renderTreeRoots[nPtr.getID()] = nPtr
+ }
+ // All children attach directly to the nonBoundaryParents since they are already created
+ // in pass 1 so no need to do that here
+ case isExecutionNode(status.Type):
+ n := executionNode{nodeInfo: nodeInfo{id: id}}
+ // Attach to my parent if needed
+ if attachToParent(wf, &n, nonBoundaryParentChildrenMap,
+ status.BoundaryID, boundaryNodeMap, parentBoundaryMap) {
+ renderTreeRoots[n.getID()] = &n
+ }
+ // Execution nodes don't have other render nodes as children
+ }
+ }
+
+ return renderTreeRoots
+}
+
+// This function decides if a Node will be filtered from rendering and returns
+// two things. First argument tells if the node is filtered and second argument
+// tells whether the children need special indentation due to filtering
+// Return Values: (is node filtered, do children need special indent)
+func filterNode(node wfv1.NodeStatus) (bool, bool) {
+ if node.Type == wfv1.NodeTypeRetry && len(node.Children) == 1 {
+ return true, false
+ } else if node.Type == wfv1.NodeTypeStepGroup {
+ return true, true
+ }
+ return false, false
+}
+
+// Render the child of a given node based on information about the parent such as:
+// whether it was filtered and does this child need special indent
+func renderChild(w *tabwriter.Writer, wf *wfv1.Workflow, nInfo renderNode, depth int,
+ nodePrefix string, childPrefix string, parentFiltered bool,
+ childIndex int, maxIndex int, childIndent bool) {
+ var part, subp string
+ if parentFiltered && childIndent {
+ if maxIndex == 0 {
+ part = "--"
+ subp = " "
+ } else if childIndex == 0 {
+ part = "·-"
+ subp = " "
+ } else if childIndex == maxIndex {
+ part = "└-"
+ subp = " "
+ } else {
+ part = "├-"
+ subp = " "
+ }
+ } else if !parentFiltered {
+ if childIndex == maxIndex {
+ part = "└-"
+ subp = " "
+ } else {
+ part = "├-"
+ subp = "| "
+ }
+ }
+ var childNodePrefix, childChldPrefix string
+ if !parentFiltered {
+ depth = depth + 1
+ childNodePrefix = childPrefix + part
+ childChldPrefix = childPrefix + subp
+ } else {
+ if childIndex == 0 {
+ childNodePrefix = nodePrefix + part
+ } else {
+ childNodePrefix = childPrefix + part
+ }
+ childChldPrefix = childPrefix + subp
+ }
+ nInfo.renderNodes(w, wf, depth, childNodePrefix, childChldPrefix)
+}
+
+// Main method to print information of node in get
+func printNode(w *tabwriter.Writer, wf *wfv1.Workflow, node wfv1.NodeStatus, depth int,
+ nodePrefix string, childPrefix string) {
+
+ nodeName := fmt.Sprintf("%s %s", jobStatusIconMap[node.Phase], humanizeNodeName(wf, node))
var args []interface{}
duration := humanizeDurationShort(node.StartedAt, node.FinishedAt)
- if len(node.Children) == 0 && node.Phase != wfv1.NodeSkipped {
+ if isExecutionNode(node.Type) && node.Phase != wfv1.NodeSkipped {
args = []interface{}{nodePrefix, nodeName, node.ID, duration, node.Message}
} else {
args = []interface{}{nodePrefix, nodeName, "", "", ""}
@@ -147,78 +408,44 @@ func printNodeTree(w *tabwriter.Writer, wf *wfv1.Workflow, node wfv1.NodeStatus,
} else {
fmt.Fprintf(w, "%s%s\t%s\t%s\t%s\n", args...)
}
+}
- if node.RetryStrategy != nil {
- for i, childNodeID := range node.Children {
- var part1, subp1 string
- subp1 = " "
+// renderNodes for each renderNode Type
+// boundaryNode
+func (nodeInfo *boundaryNode) renderNodes(w *tabwriter.Writer, wf *wfv1.Workflow, depth int,
+ nodePrefix string, childPrefix string) {
- childNode := wf.Status.Nodes[childNodeID]
- if i > 0 && i < len(node.Children)-1 {
- part1 = "├-"
- } else {
- part1 = "└-"
- }
- var part2, subp2 string
- part2 = "--"
- childNodePrefix := childPrefix + part1 + part2
- childChldPrefix := childPrefix + subp1 + subp2
- printNodeTree(w, wf, childNode, depth+1, childNodePrefix, childChldPrefix)
- }
- } else {
- // If the node has children, the node is a workflow template and
- // node.Children prepresent a list of parallel steps. We skip
- // a generation when recursing since the children nodes of workflow
- // templates represent a virtual step group, which are not worh printing.
- for i, stepGroupNodeID := range node.Children {
- lastStepGroup := bool(i == len(node.Children)-1)
- var part1, subp1 string
- if lastStepGroup {
- part1 = "└-"
- subp1 = " "
- } else {
- part1 = "├-"
- subp1 = "| "
- }
- stepGroupNode := wf.Status.Nodes[stepGroupNodeID]
- for j, childNodeID := range stepGroupNode.Children {
- childNode := wf.Status.Nodes[childNodeID]
- if j > 0 {
- if lastStepGroup {
- part1 = " "
- } else {
- part1 = "| "
- }
- }
- firstParallel := bool(j == 0)
- lastParallel := bool(j == len(stepGroupNode.Children)-1)
- var part2, subp2 string
- if firstParallel {
- if len(stepGroupNode.Children) == 1 {
- part2 = "--"
- } else {
- part2 = "·-"
- }
- if !lastParallel {
- subp2 = "| "
- } else {
- subp2 = " "
- }
-
- } else if lastParallel {
- part2 = "└-"
- subp2 = " "
- } else {
- part2 = "├-"
- subp2 = "| "
- }
- childNodePrefix := childPrefix + part1 + part2
- childChldPrefix := childPrefix + subp1 + subp2
- // Remove stepgroup name from being displayed
- childNode.Name = strings.TrimPrefix(childNode.Name, stepGroupNode.Name+".")
- printNodeTree(w, wf, childNode, depth+1, childNodePrefix, childChldPrefix)
- }
- }
+ filtered, childIndent := filterNode(nodeInfo.getNodeStatus(wf))
+ if !filtered {
+ printNode(w, wf, nodeInfo.getNodeStatus(wf), depth, nodePrefix, childPrefix)
+ }
+
+ for i, nInfo := range nodeInfo.boundaryContained {
+ renderChild(w, wf, nInfo, depth, nodePrefix, childPrefix, filtered, i,
+ len(nodeInfo.boundaryContained)-1, childIndent)
+ }
+}
+
+// nonBoundaryParentNode
+func (nodeInfo *nonBoundaryParentNode) renderNodes(w *tabwriter.Writer, wf *wfv1.Workflow, depth int,
+ nodePrefix string, childPrefix string) {
+ filtered, childIndent := filterNode(nodeInfo.getNodeStatus(wf))
+ if !filtered {
+ printNode(w, wf, nodeInfo.getNodeStatus(wf), depth, nodePrefix, childPrefix)
+ }
+
+ for i, nInfo := range nodeInfo.children {
+ renderChild(w, wf, nInfo, depth, nodePrefix, childPrefix, filtered, i,
+ len(nodeInfo.children)-1, childIndent)
+ }
+}
+
+// executionNode
+func (nodeInfo *executionNode) renderNodes(w *tabwriter.Writer, wf *wfv1.Workflow, depth int,
+ nodePrefix string, childPrefix string) {
+ filtered, _ := filterNode(nodeInfo.getNodeStatus(wf))
+ if !filtered {
+ printNode(w, wf, nodeInfo.getNodeStatus(wf), depth, nodePrefix, childPrefix)
}
}
@@ -233,6 +460,16 @@ func getArtifactsString(node wfv1.NodeStatus) string {
return strings.Join(artNames, ",")
}
+// Will take the printed name for a node to be the last part after a '.'
+// Will also special case wfName.onExit nodes to onExit
+func humanizeNodeName(wf *wfv1.Workflow, node wfv1.NodeStatus) string {
+ if node.Name == (wf.ObjectMeta.Name + onExitSuffix) {
+ return onExitSuffix
+ }
+ parts := strings.Split(node.Name, ".")
+ return parts[len(parts)-1]
+}
+
func humanizeTimestamp(epoch int64) string {
ts := time.Unix(epoch, 0)
return fmt.Sprintf("%s (%s)", ts.Format("Mon Jan 02 15:04:05 -0700"), humanize.Time(ts))
diff --git a/examples/dag-coinflip.yaml b/examples/dag-coinflip.yaml
new file mode 100644
index 000000000000..ab0ff76a4fce
--- /dev/null
+++ b/examples/dag-coinflip.yaml
@@ -0,0 +1,44 @@
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-diamond-coinflip-
+spec:
+ entrypoint: diamond
+ templates:
+ - name: coinflip
+ steps:
+ - - name: flip-coin
+ template: flip-coin
+ - - name: heads
+ template: heads
+ when: "{{steps.flip-coin.outputs.result}} == heads"
+ - name: tails
+ template: coinflip
+ when: "{{steps.flip-coin.outputs.result}} == tails"
+ - name: flip-coin
+ script:
+ image: python:3.6
+ command: [python]
+ source: |
+ import random
+ result = "heads" if random.randint(0,1) == 0 else "tails"
+ print(result)
+ - name: heads
+ container:
+ image: alpine:3.6
+ command: [sh, -c]
+ args: ["echo \"it was heads\""]
+ - name: diamond
+ dag:
+ tasks:
+ - name: A
+ template: coinflip
+ - name: B
+ dependencies: [A]
+ template: coinflip
+ - name: C
+ dependencies: [A]
+ template: coinflip
+ - name: D
+ dependencies: [B, C]
+ template: coinflip
diff --git a/examples/dag-diamond-steps.yaml b/examples/dag-diamond-steps.yaml
new file mode 100644
index 000000000000..3a91bec17970
--- /dev/null
+++ b/examples/dag-diamond-steps.yaml
@@ -0,0 +1,69 @@
+# The following workflow executes a diamond workflow, with each
+# node comprising of three parallel fan-in fan-out steps.
+#
+# *
+# / | \
+# A1 A2 A3
+# \ | /
+# *
+# / \
+# / \
+# * *
+# / | \ / | \
+# B1 B2 B3 C1 C2 C3
+# \ | / \ | /
+# * *
+# \ /
+# \ /
+# *
+# / | \
+# D1 D2 D3
+# \ | /
+# *
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-diamond-steps-
+spec:
+ entrypoint: diamond
+ templates:
+ - name: echo
+ inputs:
+ parameters:
+ - name: message
+ container:
+ image: alpine:3.7
+ command: [echo, "{{inputs.parameters.message}}"]
+ - name: echo-thrice
+ inputs:
+ parameters:
+ - name: message
+ steps:
+ - - name: echo
+ template: echo
+ arguments:
+ parameters:
+ - {name: message, value: "{{inputs.parameters.message}}{{item}}"}
+ withItems: [1,2,3]
+ - name: diamond
+ dag:
+ tasks:
+ - name: A
+ template: echo-thrice
+ arguments:
+ parameters: [{name: message, value: A}]
+ - name: B
+ dependencies: [A]
+ template: echo-thrice
+ arguments:
+ parameters: [{name: message, value: B}]
+ - name: C
+ dependencies: [A]
+ template: echo-thrice
+ arguments:
+ parameters: [{name: message, value: C}]
+ - name: D
+ dependencies: [B, C]
+ template: echo-thrice
+ arguments:
+ parameters: [{name: message, value: D}]
diff --git a/examples/dag-diamond.yaml b/examples/dag-diamond.yaml
new file mode 100644
index 000000000000..bdc644c6dd23
--- /dev/null
+++ b/examples/dag-diamond.yaml
@@ -0,0 +1,43 @@
+# The following workflow executes a diamond workflow
+#
+# A
+# / \
+# B C
+# \ /
+# D
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-diamond-
+spec:
+ entrypoint: diamond
+ templates:
+ - name: echo
+ inputs:
+ parameters:
+ - name: message
+ container:
+ image: alpine:3.7
+ command: [echo, "{{inputs.parameters.message}}"]
+ - name: diamond
+ dag:
+ tasks:
+ - name: A
+ template: echo
+ arguments:
+ parameters: [{name: message, value: A}]
+ - name: B
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters: [{name: message, value: B}]
+ - name: C
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters: [{name: message, value: C}]
+ - name: D
+ dependencies: [B, C]
+ template: echo
+ arguments:
+ parameters: [{name: message, value: D}]
diff --git a/examples/dag-multiroot.yaml b/examples/dag-multiroot.yaml
new file mode 100644
index 000000000000..f5e553af5507
--- /dev/null
+++ b/examples/dag-multiroot.yaml
@@ -0,0 +1,41 @@
+# The following workflow executes a multi-root workflow
+#
+# A B
+# / \ /
+# C D
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-multiroot-
+spec:
+ entrypoint: multiroot
+ templates:
+ - name: echo
+ inputs:
+ parameters:
+ - name: message
+ container:
+ image: alpine:3.7
+ command: [echo, "{{inputs.parameters.message}}"]
+ - name: multiroot
+ dag:
+ tasks:
+ - name: A
+ template: echo
+ arguments:
+ parameters: [{name: message, value: A}]
+ - name: B
+ dependencies:
+ template: echo
+ arguments:
+ parameters: [{name: message, value: B}]
+ - name: C
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters: [{name: message, value: C}]
+ - name: D
+ dependencies: [A, B]
+ template: echo
+ arguments:
+ parameters: [{name: message, value: D}]
diff --git a/examples/dag-nested.yaml b/examples/dag-nested.yaml
new file mode 100644
index 000000000000..ab755bd15daa
--- /dev/null
+++ b/examples/dag-nested.yaml
@@ -0,0 +1,61 @@
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-nested-
+spec:
+ entrypoint: diamond
+ templates:
+ - name: echo
+ inputs:
+ parameters:
+ - name: message
+ container:
+ image: alpine:3.7
+ command: [echo, "{{inputs.parameters.message}}"]
+ - name: diamond
+ dag:
+ tasks:
+ - name: A
+ template: nested-diamond
+ arguments:
+ parameters: [{name: message, value: A}]
+ - name: B
+ dependencies: [A]
+ template: nested-diamond
+ arguments:
+ parameters: [{name: message, value: B}]
+ - name: C
+ dependencies: [A]
+ template: nested-diamond
+ arguments:
+ parameters: [{name: message, value: C}]
+ - name: D
+ dependencies: [B, C]
+ template: nested-diamond
+ arguments:
+ parameters: [{name: message, value: D}]
+ - name: nested-diamond
+ inputs:
+ parameters:
+ - name: message
+ dag:
+ tasks:
+ - name: A
+ template: echo
+ arguments:
+ parameters: [{name: message, value: "{{inputs.parameters.message}}A"}]
+ - name: B
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters: [{name: message, value: "{{inputs.parameters.message}}B"}]
+ - name: C
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters: [{name: message, value: "{{inputs.parameters.message}}C"}]
+ - name: D
+ dependencies: [B, C]
+ template: echo
+ arguments:
+ parameters: [{name: message, value: "{{inputs.parameters.message}}D"}]
diff --git a/hack/wfgraph.py b/hack/wfgraph.py
new file mode 100755
index 000000000000..96074ca9c54d
--- /dev/null
+++ b/hack/wfgraph.py
@@ -0,0 +1,134 @@
+#!/usr/bin/env python3
+
+import argparse
+import json
+import subprocess
+import tempfile
+
+from subprocess import run
+
+template = '''
+
+
+
+
%s
+
+
+
+
+
+
+
+%s
+
+
+
+
+
+
+'''
+
+def main():
+ parser = argparse.ArgumentParser(description='Visualize graph of a workflow')
+ parser.add_argument('workflow', type=str, help='workflow name')
+ args = parser.parse_args()
+ res = run(["kubectl", "get", "workflow", "-o", "json", args.workflow ], stdout=subprocess.PIPE)
+ wf = json.loads(res.stdout.decode("utf-8"))
+ nodes = []
+ edges = []
+ colors = {
+ 'Pending': 'fill: #D0D0D0',
+ 'Running': 'fill: #A0FFFF',
+ 'Failed': 'fill: #f77',
+ 'Succeeded': 'fill: #afa',
+ 'Skipped': 'fill: #D0D0D0',
+ 'Error': 'fill: #f77',
+ }
+ wf_name = wf['metadata']['name']
+ for node_id, node_status in wf['status']['nodes'].items():
+ if node_status['name'] == wf_name:
+ label = node_status['name']
+ else:
+ label = node_status['name'].replace(wf_name, "")
+ node = {'id': node_id, 'label': label, 'color': colors[node_status['phase']]}
+ nodes.append(node)
+ if 'children' in node_status:
+ for child_id in node_status['children']:
+ edge = {'from': node_id, 'to': child_id, 'arrows': 'to'}
+ edges.append(edge)
+ html = template % (wf_name, wf_name, json.dumps(nodes), json.dumps(edges))
+ tmpfile = tempfile.NamedTemporaryFile(suffix='.html', delete=False)
+ tmpfile.write(html.encode())
+ tmpfile.flush()
+ run(["open", tmpfile.name])
+
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file
diff --git a/pkg/apis/workflow/v1alpha1/openapi_generated.go b/pkg/apis/workflow/v1alpha1/openapi_generated.go
index cf769ccc0a3a..38c81c005098 100644
--- a/pkg/apis/workflow/v1alpha1/openapi_generated.go
+++ b/pkg/apis/workflow/v1alpha1/openapi_generated.go
@@ -213,6 +213,84 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA
Dependencies: []string{
"k8s.io/api/core/v1.SecretKeySelector"},
},
+ "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.DAG": {
+ Schema: spec.Schema{
+ SchemaProps: spec.SchemaProps{
+ Description: "DAG is a template subtype for directed acyclic graph templates",
+ Properties: map[string]spec.Schema{
+ "target": {
+ SchemaProps: spec.SchemaProps{
+ Description: "Target are one or more names of targets to execute in a DAG",
+ Type: []string{"string"},
+ Format: "",
+ },
+ },
+ "tasks": {
+ SchemaProps: spec.SchemaProps{
+ Description: "Tasks are a list of DAG tasks",
+ Type: []string{"array"},
+ Items: &spec.SchemaOrArray{
+ Schema: &spec.Schema{
+ SchemaProps: spec.SchemaProps{
+ Ref: ref("github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.DAGTask"),
+ },
+ },
+ },
+ },
+ },
+ },
+ Required: []string{"tasks"},
+ },
+ },
+ Dependencies: []string{
+ "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.DAGTask"},
+ },
+ "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.DAGTask": {
+ Schema: spec.Schema{
+ SchemaProps: spec.SchemaProps{
+ Description: "DAGTask represents a node in the graph during DAG execution",
+ Properties: map[string]spec.Schema{
+ "name": {
+ SchemaProps: spec.SchemaProps{
+ Description: "Name is the name of the target",
+ Type: []string{"string"},
+ Format: "",
+ },
+ },
+ "template": {
+ SchemaProps: spec.SchemaProps{
+ Description: "Name of template to execute",
+ Type: []string{"string"},
+ Format: "",
+ },
+ },
+ "arguments": {
+ SchemaProps: spec.SchemaProps{
+ Description: "Arguments are the parameter and artifact arguments to the template",
+ Ref: ref("github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Arguments"),
+ },
+ },
+ "dependencies": {
+ SchemaProps: spec.SchemaProps{
+ Description: "Dependencies are name of other targets which this depends on",
+ Type: []string{"array"},
+ Items: &spec.SchemaOrArray{
+ Schema: &spec.Schema{
+ SchemaProps: spec.SchemaProps{
+ Type: []string{"string"},
+ Format: "",
+ },
+ },
+ },
+ },
+ },
+ },
+ Required: []string{"name", "template"},
+ },
+ },
+ Dependencies: []string{
+ "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Arguments"},
+ },
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.GitArtifact": {
Schema: spec.Schema{
SchemaProps: spec.SchemaProps{
@@ -948,6 +1026,12 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA
Ref: ref("github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ResourceTemplate"),
},
},
+ "dag": {
+ SchemaProps: spec.SchemaProps{
+ Description: "DAG template subtype which runs a DAG",
+ Ref: ref("github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.DAG"),
+ },
+ },
"sidecars": {
SchemaProps: spec.SchemaProps{
Description: "Sidecars is a list of containers which run alongside the main container Sidecars are automatically killed when the main container completes",
@@ -985,7 +1069,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA
},
},
Dependencies: []string{
- "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ArtifactLocation", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Inputs", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Outputs", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ResourceTemplate", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.RetryStrategy", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Script", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Sidecar", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.WorkflowStep", "k8s.io/api/core/v1.Affinity", "k8s.io/api/core/v1.Container"},
+ "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ArtifactLocation", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.DAG", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Inputs", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Outputs", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ResourceTemplate", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.RetryStrategy", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Script", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Sidecar", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.WorkflowStep", "k8s.io/api/core/v1.Affinity", "k8s.io/api/core/v1.Container"},
},
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ValueFrom": {
Schema: spec.Schema{
diff --git a/pkg/apis/workflow/v1alpha1/types.go b/pkg/apis/workflow/v1alpha1/types.go
index ac17f1a6e8de..271ec51284c5 100644
--- a/pkg/apis/workflow/v1alpha1/types.go
+++ b/pkg/apis/workflow/v1alpha1/types.go
@@ -18,6 +18,7 @@ const (
TemplateTypeSteps TemplateType = "Steps"
TemplateTypeScript TemplateType = "Script"
TemplateTypeResource TemplateType = "Resource"
+ TemplateTypeDAG TemplateType = "DAG"
)
// NodePhase is a label for the condition of a node at the current time.
@@ -32,6 +33,19 @@ const (
NodeError NodePhase = "Error"
)
+// NodeType is the type of a node
+type NodeType string
+
+// Node types
+const (
+ NodeTypePod NodeType = "Pod"
+ NodeTypeSteps NodeType = "Steps"
+ NodeTypeStepGroup NodeType = "StepGroup"
+ NodeTypeDAG NodeType = "DAG"
+ NodeTypeRetry NodeType = "Retry"
+ NodeTypeSkipped NodeType = "Skipped"
+)
+
// Workflow is the definition of a workflow resource
// +genclient
// +genclient:noStatus
@@ -130,6 +144,9 @@ type Template struct {
// Resource template subtype which can run k8s resources
Resource *ResourceTemplate `json:"resource,omitempty"`
+ // DAG template subtype which runs a DAG
+ DAG *DAG `json:"dag,omitempty"`
+
// Sidecars is a list of containers which run alongside the main container
// Sidecars are automatically killed when the main container completes
Sidecars []Sidecar `json:"sidecars,omitempty"`
@@ -352,10 +369,16 @@ type NodeStatus struct {
// It can represent a container, step group, or the entire workflow
Name string `json:"name"`
+ // Type indicates type of node
+ Type NodeType `json:"type"`
+
// Phase a simple, high-level summary of where the node is in its lifecycle.
// Can be used as a state machine.
Phase NodePhase `json:"phase"`
+ // BoundaryID indicates the node ID of the associated template root node in which this node belongs to
+ BoundaryID string `json:"boundaryID,omitempty"`
+
// A human readable message indicating details about why the node is in this condition.
Message string `json:"message,omitempty"`
@@ -379,6 +402,20 @@ type NodeStatus struct {
// Children is a list of child node IDs
Children []string `json:"children,omitempty"`
+
+ // OutboundNodes tracks the node IDs which are considered "outbound" nodes to a template invocation.
+ // For every invocation of a template, there are nodes which we considered as "outbound". Essentially,
+ // these are last nodes in the execution sequence to run, before the template is considered completed.
+ // These nodes are then connected as parents to a following step.
+ //
+ // In the case of single pod steps (i.e. container, script, resource templates), this list will be nil
+ // since the pod itself is already considered the "outbound" node.
+ // In the case of DAGs, outbound nodes are the "target" tasks (tasks with no children).
+ // In the case of steps, outbound nodes are all the containers involved in the last step group.
+ // NOTE: since templates are composable, the list of outbound nodes are carried upwards when
+ // a DAG/steps template invokes another DAG/steps template. In other words, the outbound nodes of
+ // a template, will be a superset of the outbound nodes of its last children.
+ OutboundNodes []string `json:"outboundNodes,omitempty"`
}
func (n NodeStatus) String() string {
@@ -522,6 +559,9 @@ func (tmpl *Template) GetType() TemplateType {
if tmpl.Steps != nil {
return TemplateTypeSteps
}
+ if tmpl.DAG != nil {
+ return TemplateTypeDAG
+ }
if tmpl.Script != nil {
return TemplateTypeScript
}
@@ -531,6 +571,30 @@ func (tmpl *Template) GetType() TemplateType {
return "Unknown"
}
+// DAG is a template subtype for directed acyclic graph templates
+type DAG struct {
+ // Target are one or more names of targets to execute in a DAG
+ Targets string `json:"target,omitempty"`
+
+ // Tasks are a list of DAG tasks
+ Tasks []DAGTask `json:"tasks"`
+}
+
+// DAGTask represents a node in the graph during DAG execution
+type DAGTask struct {
+ // Name is the name of the target
+ Name string `json:"name"`
+
+ // Name of template to execute
+ Template string `json:"template"`
+
+ // Arguments are the parameter and artifact arguments to the template
+ Arguments Arguments `json:"arguments,omitempty"`
+
+ // Dependencies are name of other targets which this depends on
+ Dependencies []string `json:"dependencies,omitempty"`
+}
+
// GetArtifactByName returns an input artifact by its name
func (in *Inputs) GetArtifactByName(name string) *Artifact {
for _, art := range in.Artifacts {
diff --git a/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go
index 65b79d252cce..355ce84f4834 100644
--- a/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go
+++ b/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go
@@ -177,6 +177,51 @@ func (in *ArtifactoryAuth) DeepCopy() *ArtifactoryAuth {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *DAG) DeepCopyInto(out *DAG) {
+ *out = *in
+ if in.Tasks != nil {
+ in, out := &in.Tasks, &out.Tasks
+ *out = make([]DAGTask, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DAG.
+func (in *DAG) DeepCopy() *DAG {
+ if in == nil {
+ return nil
+ }
+ out := new(DAG)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *DAGTask) DeepCopyInto(out *DAGTask) {
+ *out = *in
+ in.Arguments.DeepCopyInto(&out.Arguments)
+ if in.Dependencies != nil {
+ in, out := &in.Dependencies, &out.Dependencies
+ *out = make([]string, len(*in))
+ copy(*out, *in)
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DAGTask.
+func (in *DAGTask) DeepCopy() *DAGTask {
+ if in == nil {
+ return nil
+ }
+ out := new(DAGTask)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *GitArtifact) DeepCopyInto(out *GitArtifact) {
*out = *in
@@ -294,6 +339,11 @@ func (in *NodeStatus) DeepCopyInto(out *NodeStatus) {
*out = make([]string, len(*in))
copy(*out, *in)
}
+ if in.OutboundNodes != nil {
+ in, out := &in.OutboundNodes, &out.OutboundNodes
+ *out = make([]string, len(*in))
+ copy(*out, *in)
+ }
return
}
@@ -624,6 +674,15 @@ func (in *Template) DeepCopyInto(out *Template) {
**out = **in
}
}
+ if in.DAG != nil {
+ in, out := &in.DAG, &out.DAG
+ if *in == nil {
+ *out = nil
+ } else {
+ *out = new(DAG)
+ (*in).DeepCopyInto(*out)
+ }
+ }
if in.Sidecars != nil {
in, out := &in.Sidecars, &out.Sidecars
*out = make([]Sidecar, len(*in))
diff --git a/test/e2e/expectedfailures/dag-fail.yaml b/test/e2e/expectedfailures/dag-fail.yaml
new file mode 100644
index 000000000000..21d085ba804b
--- /dev/null
+++ b/test/e2e/expectedfailures/dag-fail.yaml
@@ -0,0 +1,44 @@
+# The following workflow executes a diamond workflow where C fails
+#
+# A
+# / \
+# B C
+# \ /
+# D
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-fail-
+spec:
+ entrypoint: diamond
+ templates:
+ - name: echo
+ inputs:
+ parameters:
+ - name: cmd
+ container:
+ image: alpine:3.7
+ command: [sh, -c]
+ args: ["{{inputs.parameters.cmd}}"]
+ - name: diamond
+ dag:
+ tasks:
+ - name: A
+ template: echo
+ arguments:
+ parameters: [{name: cmd, value: echo A}]
+ - name: B
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters: [{name: cmd, value: echo B}]
+ - name: C
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters: [{name: cmd, value: echo C; exit 1}]
+ - name: D
+ dependencies: [B, C]
+ template: echo
+ arguments:
+ parameters: [{name: cmd, value: echo D}]
diff --git a/test/e2e/functional/dag-argument-passing.yaml b/test/e2e/functional/dag-argument-passing.yaml
new file mode 100644
index 000000000000..21335a695944
--- /dev/null
+++ b/test/e2e/functional/dag-argument-passing.yaml
@@ -0,0 +1,34 @@
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-arg-passing-
+spec:
+ entrypoint: dag-arg-passing
+ templates:
+ - name: echo
+ inputs:
+ parameters:
+ - name: message
+ container:
+ image: alpine:3.7
+ command: [echo, "{{inputs.parameters.message}}"]
+ outputs:
+ parameters:
+ - name: hosts
+ path: /etc/hosts
+ - name: dag-arg-passing
+ dag:
+ tasks:
+ - name: A
+ template: echo
+ arguments:
+ parameters:
+ - name: message
+ value: val
+ - name: B
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters:
+ - name: message
+ value: "{{dependencies.A.outputs.parameters.hosts}}"
\ No newline at end of file
diff --git a/workflow/common/validate.go b/workflow/common/validate.go
index 9c04925d63ae..e2a48de29400 100644
--- a/workflow/common/validate.go
+++ b/workflow/common/validate.go
@@ -53,7 +53,6 @@ func ValidateWorkflow(wf *wfv1.Workflow) error {
for _, param := range ctx.wf.Spec.Arguments.Parameters {
ctx.globalParams["workflow.parameters."+param.Name] = placeholderValue
}
-
if ctx.wf.Spec.Entrypoint == "" {
return errors.New(errors.CodeBadRequest, "spec.entrypoint is required")
}
@@ -61,7 +60,6 @@ func ValidateWorkflow(wf *wfv1.Workflow) error {
if entryTmpl == nil {
return errors.Errorf(errors.CodeBadRequest, "spec.entrypoint template '%s' undefined", ctx.wf.Spec.Entrypoint)
}
-
err = ctx.validateTemplate(entryTmpl, ctx.wf.Spec.Arguments)
if err != nil {
return err
@@ -113,17 +111,23 @@ func (ctx *wfValidationCtx) validateTemplate(tmpl *wfv1.Template, args wfv1.Argu
if tmpl.Resource != nil {
tmplTypes++
}
+ if tmpl.DAG != nil {
+ tmplTypes++
+ }
switch tmplTypes {
case 0:
- return errors.New(errors.CodeBadRequest, "template type unspecified. choose one of: container, steps, script, resource")
+ return errors.New(errors.CodeBadRequest, "template type unspecified. choose one of: container, steps, script, resource, dag")
case 1:
default:
- return errors.New(errors.CodeBadRequest, "multiple template types specified. choose one of: container, steps, script, resource")
+ return errors.New(errors.CodeBadRequest, "multiple template types specified. choose one of: container, steps, script, resource, dag")
}
- if tmpl.Steps == nil {
- err = validateLeaf(scope, tmpl)
- } else {
+ switch tmpl.GetType() {
+ case wfv1.TemplateTypeSteps:
err = ctx.validateSteps(scope, tmpl)
+ case wfv1.TemplateTypeDAG:
+ err = ctx.validateDAG(scope, tmpl)
+ default:
+ err = validateLeaf(scope, tmpl)
}
if err != nil {
return err
@@ -138,11 +142,11 @@ func (ctx *wfValidationCtx) validateTemplate(tmpl *wfv1.Template, args wfv1.Argu
func validateInputs(tmpl *wfv1.Template) (map[string]interface{}, error) {
err := validateWorkflowFieldNames(tmpl.Inputs.Parameters)
if err != nil {
- return nil, errors.Errorf(errors.CodeBadRequest, "template '%s' inputs.parameters%s", tmpl.Name, err.Error())
+ return nil, errors.Errorf(errors.CodeBadRequest, "templates.%s.inputs.parameters%s", tmpl.Name, err.Error())
}
err = validateWorkflowFieldNames(tmpl.Inputs.Artifacts)
if err != nil {
- return nil, errors.Errorf(errors.CodeBadRequest, "template '%s' inputs.artifacts%s", tmpl.Name, err.Error())
+ return nil, errors.Errorf(errors.CodeBadRequest, "templates.%s.inputs.artifacts%s", tmpl.Name, err.Error())
}
scope := make(map[string]interface{})
for _, param := range tmpl.Inputs.Parameters {
@@ -154,17 +158,17 @@ func validateInputs(tmpl *wfv1.Template) (map[string]interface{}, error) {
scope[artRef] = true
if isLeaf {
if art.Path == "" {
- return nil, errors.Errorf(errors.CodeBadRequest, "template '%s' %s.path not specified", tmpl.Name, artRef)
+ return nil, errors.Errorf(errors.CodeBadRequest, "templates.%s.%s.path not specified", tmpl.Name, artRef)
}
} else {
if art.Path != "" {
- return nil, errors.Errorf(errors.CodeBadRequest, "template '%s' %s.path only valid in container/script templates", tmpl.Name, artRef)
+ return nil, errors.Errorf(errors.CodeBadRequest, "templates.%s.%s.path only valid in container/script templates", tmpl.Name, artRef)
}
}
if art.From != "" {
- return nil, errors.Errorf(errors.CodeBadRequest, "template '%s' %s.from not valid in inputs", tmpl.Name, artRef)
+ return nil, errors.Errorf(errors.CodeBadRequest, "templates.%s.%s.from not valid in inputs", tmpl.Name, artRef)
}
- errPrefix := fmt.Sprintf("template '%s' %s", tmpl.Name, artRef)
+ errPrefix := fmt.Sprintf("templates.%s.%s", tmpl.Name, artRef)
err = validateArtifactLocation(errPrefix, art)
if err != nil {
return nil, err
@@ -211,7 +215,7 @@ func validateLeaf(scope map[string]interface{}, tmpl *wfv1.Template) error {
}
err = resolveAllVariables(scope, string(tmplBytes))
if err != nil {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' %s", tmpl.Name, err.Error())
+ return errors.Errorf(errors.CodeBadRequest, "template.%s: %s", tmpl.Name, err.Error())
}
if tmpl.Container != nil {
// Ensure there are no collisions with volume mountPaths and artifact load paths
@@ -256,19 +260,19 @@ func (ctx *wfValidationCtx) validateSteps(scope map[string]interface{}, tmpl *wf
for i, stepGroup := range tmpl.Steps {
for _, step := range stepGroup {
if step.Name == "" {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' steps[%d].name is required", tmpl.Name, i)
+ return errors.Errorf(errors.CodeBadRequest, "template.%s.steps[%d].name is required", tmpl.Name, i)
}
_, ok := stepNames[step.Name]
if ok {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' steps[%d].name '%s' is not unique", tmpl.Name, i, step.Name)
+ return errors.Errorf(errors.CodeBadRequest, "template.%s.steps[%d].name '%s' is not unique", tmpl.Name, i, step.Name)
}
if errs := IsValidWorkflowFieldName(step.Name); len(errs) != 0 {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' steps[%d].name '%s' is invalid: %s", tmpl.Name, i, step.Name, strings.Join(errs, ";"))
+ return errors.Errorf(errors.CodeBadRequest, "template.%s.steps[%d].name '%s' is invalid: %s", tmpl.Name, i, step.Name, strings.Join(errs, ";"))
}
stepNames[step.Name] = true
err := addItemsToScope(&step, scope)
if err != nil {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' steps[%d].%s %s", tmpl.Name, i, step.Name, err.Error())
+ return errors.Errorf(errors.CodeBadRequest, "template.%s.steps[%d].%s %s", tmpl.Name, i, step.Name, err.Error())
}
stepBytes, err := json.Marshal(stepGroup)
if err != nil {
@@ -276,13 +280,13 @@ func (ctx *wfValidationCtx) validateSteps(scope map[string]interface{}, tmpl *wf
}
err = resolveAllVariables(scope, string(stepBytes))
if err != nil {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' steps[%d].%s %s", tmpl.Name, i, step.Name, err.Error())
+ return errors.Errorf(errors.CodeBadRequest, "template.%s.steps[%d].%s %s", tmpl.Name, i, step.Name, err.Error())
}
childTmpl := ctx.wf.GetTemplate(step.Template)
if childTmpl == nil {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' steps[%d].%s.template '%s' undefined", tmpl.Name, i, step.Name, step.Template)
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s.steps[%d].%s.template '%s' undefined", tmpl.Name, i, step.Name, step.Template)
}
- err = validateArguments(fmt.Sprintf("template '%s' steps[%d].%s.arguments.", tmpl.Name, i, step.Name), step.Arguments)
+ err = validateArguments(fmt.Sprintf("templates.%s.steps[%d].%s.arguments.", tmpl.Name, i, step.Name), step.Arguments)
if err != nil {
return err
}
@@ -292,7 +296,7 @@ func (ctx *wfValidationCtx) validateSteps(scope map[string]interface{}, tmpl *wf
}
}
for _, step := range stepGroup {
- ctx.addOutputsToScope(step.Template, step.Name, scope)
+ ctx.addOutputsToScope(step.Template, fmt.Sprintf("steps.%s", step.Name), scope)
}
}
return nil
@@ -320,30 +324,30 @@ func addItemsToScope(step *wfv1.WorkflowStep, scope map[string]interface{}) erro
return nil
}
-func (ctx *wfValidationCtx) addOutputsToScope(templateName string, stepName string, scope map[string]interface{}) {
+func (ctx *wfValidationCtx) addOutputsToScope(templateName string, prefix string, scope map[string]interface{}) {
tmpl := ctx.wf.GetTemplate(templateName)
if tmpl.Daemon != nil && *tmpl.Daemon {
- scope[fmt.Sprintf("steps.%s.ip", stepName)] = true
+ scope[fmt.Sprintf("%s.ip", prefix)] = true
}
if tmpl.Script != nil {
- scope[fmt.Sprintf("steps.%s.outputs.result", stepName)] = true
+ scope[fmt.Sprintf("%s.outputs.result", prefix)] = true
}
for _, param := range tmpl.Outputs.Parameters {
- scope[fmt.Sprintf("steps.%s.outputs.parameters.%s", stepName, param.Name)] = true
+ scope[fmt.Sprintf("%s.outputs.parameters.%s", prefix, param.Name)] = true
}
for _, art := range tmpl.Outputs.Artifacts {
- scope[fmt.Sprintf("steps.%s.outputs.artifacts.%s", stepName, art.Name)] = true
+ scope[fmt.Sprintf("%s.outputs.artifacts.%s", prefix, art.Name)] = true
}
}
func validateOutputs(scope map[string]interface{}, tmpl *wfv1.Template) error {
err := validateWorkflowFieldNames(tmpl.Outputs.Parameters)
if err != nil {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' outputs.parameters%s", tmpl.Name, err.Error())
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s.outputs.parameters%s", tmpl.Name, err.Error())
}
err = validateWorkflowFieldNames(tmpl.Outputs.Artifacts)
if err != nil {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' outputs.artifacts%s", tmpl.Name, err.Error())
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s.outputs.artifacts%s", tmpl.Name, err.Error())
}
outputBytes, err := json.Marshal(tmpl.Outputs)
if err != nil {
@@ -359,11 +363,11 @@ func validateOutputs(scope map[string]interface{}, tmpl *wfv1.Template) error {
artRef := fmt.Sprintf("outputs.artifacts.%s", art.Name)
if isLeaf {
if art.Path == "" {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' %s.path not specified", tmpl.Name, artRef)
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s.%s.path not specified", tmpl.Name, artRef)
}
} else {
if art.Path != "" {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' %s.path only valid in container/script templates", tmpl.Name, artRef)
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s.%s.path only valid in container/script templates", tmpl.Name, artRef)
}
}
}
@@ -415,3 +419,107 @@ func validateWorkflowFieldNames(slice interface{}) error {
}
return nil
}
+
+func (ctx *wfValidationCtx) validateDAG(scope map[string]interface{}, tmpl *wfv1.Template) error {
+ err := validateWorkflowFieldNames(tmpl.DAG.Tasks)
+ if err != nil {
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks%s", tmpl.Name, err.Error())
+ }
+ nameToTask := make(map[string]wfv1.DAGTask)
+ for _, task := range tmpl.DAG.Tasks {
+ nameToTask[task.Name] = task
+ }
+
+ // Verify dependencies for all tasks can be resolved as well as template names
+ for _, task := range tmpl.DAG.Tasks {
+ if task.Template == "" {
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks.%s.template is required", tmpl.Name, task.Name)
+ }
+ taskTmpl := ctx.wf.GetTemplate(task.Template)
+ if taskTmpl == nil {
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks%s.template '%s' undefined", tmpl.Name, task.Name, task.Template)
+ }
+ dupDependencies := make(map[string]bool)
+ for j, depName := range task.Dependencies {
+ if _, ok := dupDependencies[depName]; ok {
+ return errors.Errorf(errors.CodeBadRequest,
+ "templates.%s.tasks.%s.dependencies[%d] dependency '%s' duplicated",
+ tmpl.Name, task.Name, j, depName)
+ }
+ dupDependencies[depName] = true
+ if _, ok := nameToTask[depName]; !ok {
+ return errors.Errorf(errors.CodeBadRequest,
+ "templates.%s.tasks.%s.dependencies[%d] dependency '%s' not defined",
+ tmpl.Name, task.Name, j, depName)
+ }
+ }
+ }
+
+ err = verifyNoCycles(tmpl, nameToTask)
+ if err != nil {
+ return err
+ }
+
+ for _, task := range tmpl.DAG.Tasks {
+ taskBytes, err := json.Marshal(task)
+ if err != nil {
+ return errors.InternalWrapError(err)
+ }
+ // add outputs of all our dependencies to scope
+ taskScope := make(map[string]interface{})
+ for k, v := range scope {
+ taskScope[k] = v
+ }
+ for _, depName := range task.Dependencies {
+ ctx.addOutputsToScope(nameToTask[depName].Template, fmt.Sprintf("dependencies.%s", depName), taskScope)
+ }
+ err = resolveAllVariables(taskScope, string(taskBytes))
+ if err != nil {
+ return errors.Errorf(errors.CodeBadRequest, "template.%s.tasks.%s %s", tmpl.Name, task.Name, err.Error())
+ }
+ taskTmpl := ctx.wf.GetTemplate(task.Template)
+ err = ctx.validateTemplate(taskTmpl, task.Arguments)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// verifyNoCycles verifies there are no cycles in the DAG graph
+func verifyNoCycles(tmpl *wfv1.Template, nameToTask map[string]wfv1.DAGTask) error {
+ visited := make(map[string]bool)
+ var noCyclesHelper func(taskName string, cycle []string) error
+ noCyclesHelper = func(taskName string, cycle []string) error {
+ if _, ok := visited[taskName]; ok {
+ return nil
+ }
+ task := nameToTask[taskName]
+ for _, depName := range task.Dependencies {
+ for _, name := range cycle {
+ if name == depName {
+ return errors.Errorf(errors.CodeBadRequest,
+ "templates.%s.tasks dependency cycle detected: %s->%s",
+ tmpl.Name, strings.Join(cycle, "->"), name)
+ }
+ }
+ cycle = append(cycle, depName)
+ err := noCyclesHelper(depName, cycle)
+ if err != nil {
+ return err
+ }
+ cycle = cycle[0 : len(cycle)-1]
+ }
+ visited[taskName] = true
+ return nil
+ }
+
+ for _, task := range tmpl.DAG.Tasks {
+ err := noCyclesHelper(task.Name, []string{})
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/workflow/common/validate_dag_test.go b/workflow/common/validate_dag_test.go
new file mode 100644
index 000000000000..55a058c9f1a4
--- /dev/null
+++ b/workflow/common/validate_dag_test.go
@@ -0,0 +1,174 @@
+package common
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+var dagCycle = `
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-cycle-
+spec:
+ entrypoint: cycle
+ templates:
+ - name: echo
+ container:
+ image: alpine:3.7
+ command: [echo, hello]
+ - name: cycle
+ dag:
+ tasks:
+ - name: A
+ dependencies: [C]
+ template: echo
+ - name: B
+ dependencies: [A]
+ template: echo
+ - name: C
+ dependencies: [A]
+ template: echo
+`
+
+func TestDAGCycle(t *testing.T) {
+ err := validate(dagCycle)
+ if assert.NotNil(t, err) {
+ assert.Contains(t, err.Error(), "cycle")
+ }
+}
+
+var duplicateDependencies = `
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-dup-depends-
+spec:
+ entrypoint: cycle
+ templates:
+ - name: echo
+ container:
+ image: alpine:3.7
+ command: [echo, hello]
+ - name: cycle
+ dag:
+ tasks:
+ - name: A
+ template: echo
+ - name: B
+ dependencies: [A, A]
+ template: echo
+`
+
+func TestDuplicateDependencies(t *testing.T) {
+ err := validate(duplicateDependencies)
+ if assert.NotNil(t, err) {
+ assert.Contains(t, err.Error(), "duplicate")
+ }
+}
+
+var dagUndefinedTemplate = `
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-undefined-
+spec:
+ entrypoint: undef
+ templates:
+ - name: undef
+ dag:
+ tasks:
+ - name: A
+ template: echo
+`
+
+func TestDAGUndefinedTemplate(t *testing.T) {
+ err := validate(dagUndefinedTemplate)
+ if assert.NotNil(t, err) {
+ assert.Contains(t, err.Error(), "undefined")
+ }
+}
+
+var dagUnresolvedVar = `
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-cycle-
+spec:
+ entrypoint: unresolved
+ templates:
+ - name: echo
+ inputs:
+ parameters:
+ - name: message
+ container:
+ image: alpine:3.7
+ command: [echo, "{{inputs.parameters.message}}"]
+ outputs:
+ parameters:
+ - name: hosts
+ path: /etc/hosts
+ - name: unresolved
+ dag:
+ tasks:
+ - name: A
+ template: echo
+ arguments:
+ parameters:
+ - name: message
+ value: val
+ - name: B
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters:
+ - name: message
+ value: "{{dependencies.A.outputs.parameters.unresolvable}}"
+`
+
+var dagResolvedVar = `
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-cycle-
+spec:
+ entrypoint: unresolved
+ templates:
+ - name: echo
+ inputs:
+ parameters:
+ - name: message
+ container:
+ image: alpine:3.7
+ command: [echo, "{{inputs.parameters.message}}"]
+ outputs:
+ parameters:
+ - name: hosts
+ path: /etc/hosts
+ - name: unresolved
+ dag:
+ tasks:
+ - name: A
+ template: echo
+ arguments:
+ parameters:
+ - name: message
+ value: val
+ - name: B
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters:
+ - name: message
+ value: "{{dependencies.A.outputs.parameters.hosts}}"
+`
+
+func TestDAGVariableResolution(t *testing.T) {
+ err := validate(dagUnresolvedVar)
+ if assert.NotNil(t, err) {
+ assert.Contains(t, err.Error(), "failed to resolve {{dependencies.A.outputs.parameters.unresolvable}}")
+ }
+ err = validate(dagResolvedVar)
+ assert.Nil(t, err)
+}
diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go
new file mode 100644
index 000000000000..e7f4da3bd5d7
--- /dev/null
+++ b/workflow/controller/dag.go
@@ -0,0 +1,291 @@
+package controller
+
+import (
+ "encoding/json"
+ "fmt"
+ "strings"
+
+ "github.com/argoproj/argo/errors"
+ wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
+ "github.com/argoproj/argo/workflow/common"
+ "github.com/valyala/fasttemplate"
+)
+
+// dagContext holds context information about this context's DAG
+type dagContext struct {
+ // boundaryName is the node name of the boundary node to this DAG.
+ // This is used to incorporate into each of the task's node names.
+ boundaryName string
+ boundaryID string
+
+ // tasks are all the tasks in the template
+ tasks []wfv1.DAGTask
+
+ // visited keeps track of tasks we have already visited during an invocation of executeDAG
+ // in order to avoid duplicating work
+ visited map[string]bool
+
+ // tmpl is the template spec. it is needed to resolve hard-wired artifacts
+ tmpl *wfv1.Template
+
+ // wf is stored to formulate nodeIDs
+ wf *wfv1.Workflow
+}
+
+func (d *dagContext) getTask(taskName string) *wfv1.DAGTask {
+ for _, task := range d.tasks {
+ if task.Name == taskName {
+ return &task
+ }
+ }
+ panic("target " + taskName + " does not exist")
+}
+
+// taskNodeName formulates the nodeName for a dag task
+func (d *dagContext) taskNodeName(taskName string) string {
+ return fmt.Sprintf("%s.%s", d.boundaryName, taskName)
+}
+
+// taskNodeID formulates the node ID for a dag task
+func (d *dagContext) taskNodeID(taskName string) string {
+ nodeName := d.taskNodeName(taskName)
+ return d.wf.NodeID(nodeName)
+}
+
+func (d *dagContext) getTaskNode(taskName string) *wfv1.NodeStatus {
+ nodeID := d.taskNodeID(taskName)
+ node, ok := d.wf.Status.Nodes[nodeID]
+ if !ok {
+ return nil
+ }
+ return &node
+}
+
+func (woc *wfOperationCtx) executeDAG(nodeName string, tmpl *wfv1.Template, boundaryID string) *wfv1.NodeStatus {
+ node := woc.getNodeByName(nodeName)
+ if node != nil && node.Completed() {
+ return node
+ }
+ dagCtx := &dagContext{
+ boundaryName: nodeName,
+ boundaryID: woc.wf.NodeID(nodeName),
+ tasks: tmpl.DAG.Tasks,
+ visited: make(map[string]bool),
+ tmpl: tmpl,
+ wf: woc.wf,
+ }
+ var targetTasks []string
+ if tmpl.DAG.Targets == "" {
+ targetTasks = findLeafTaskNames(tmpl.DAG.Tasks)
+ } else {
+ targetTasks = strings.Split(tmpl.DAG.Targets, " ")
+ }
+
+ if node == nil {
+ node = woc.initializeNode(nodeName, wfv1.NodeTypeDAG, boundaryID, wfv1.NodeRunning)
+ rootTasks := findRootTaskNames(dagCtx, targetTasks)
+ woc.log.Infof("Root tasks of %s identified as %s", nodeName, rootTasks)
+ for _, rootTaskName := range rootTasks {
+ woc.addChildNode(node.Name, dagCtx.taskNodeName(rootTaskName))
+ }
+ }
+ // kick off execution of each target task asynchronously
+ for _, taskNames := range targetTasks {
+ woc.executeDAGTask(dagCtx, taskNames)
+ }
+ // return early if we have yet to complete execution of any one of our dependencies
+ for _, depName := range targetTasks {
+ depNode := dagCtx.getTaskNode(depName)
+ if depNode == nil || !depNode.Completed() {
+ return node
+ }
+ }
+ // all desired tasks completed. now it is time to assess state
+ for _, depName := range targetTasks {
+ depNode := dagCtx.getTaskNode(depName)
+ if !depNode.Successful() {
+ // One of our dependencies failed/errored. Mark this failed
+ // TODO: consider creating a virtual fan-in node
+ return woc.markNodePhase(nodeName, depNode.Phase)
+ }
+ }
+
+ // set the outbound nodes from the target tasks
+ node = woc.getNodeByName(nodeName)
+ outbound := make([]string, 0)
+ for _, depName := range targetTasks {
+ depNode := dagCtx.getTaskNode(depName)
+ outboundNodeIDs := woc.getOutboundNodes(depNode.ID)
+ for _, outNodeID := range outboundNodeIDs {
+ outbound = append(outbound, outNodeID)
+ }
+ }
+ woc.log.Infof("Outbound nodes of %s set to %s", node.ID, outbound)
+ node.OutboundNodes = outbound
+ woc.wf.Status.Nodes[node.ID] = *node
+
+ return woc.markNodePhase(nodeName, wfv1.NodeSucceeded)
+}
+
+// findRootTaskNames finds the names of all tasks which have no dependencies.
+// Once identified, these root tasks are marked as children to the encompassing node.
+func findRootTaskNames(dagCtx *dagContext, targetTasks []string) []string {
+ //rootTaskNames := make(map[string]bool)
+ rootTaskNames := make([]string, 0)
+ visited := make(map[string]bool)
+ var findRootHelper func(s string)
+ findRootHelper = func(taskName string) {
+ if _, ok := visited[taskName]; ok {
+ return
+ }
+ visited[taskName] = true
+ task := dagCtx.getTask(taskName)
+ if len(task.Dependencies) == 0 {
+ rootTaskNames = append(rootTaskNames, taskName)
+ return
+ }
+ for _, depName := range task.Dependencies {
+ findRootHelper(depName)
+ }
+ }
+ for _, targetTaskName := range targetTasks {
+ findRootHelper(targetTaskName)
+ }
+ return rootTaskNames
+}
+
+// executeDAGTask traverses and executes the upward chain of dependencies of a task
+func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
+ if _, ok := dagCtx.visited[taskName]; ok {
+ return
+ }
+ dagCtx.visited[taskName] = true
+
+ node := dagCtx.getTaskNode(taskName)
+ if node != nil && node.Completed() {
+ return
+ }
+ // Check if our dependencies completed. If not, recurse our parents executing them if necessary
+ task := dagCtx.getTask(taskName)
+ dependenciesCompleted := true
+ dependenciesSuccessful := true
+ nodeName := dagCtx.taskNodeName(taskName)
+ for _, depName := range task.Dependencies {
+ depNode := dagCtx.getTaskNode(depName)
+ if depNode != nil {
+ if depNode.Completed() {
+ if !depNode.Successful() {
+ dependenciesSuccessful = false
+ }
+ continue
+ }
+ }
+ dependenciesCompleted = false
+ dependenciesSuccessful = false
+ // recurse our dependency
+ woc.executeDAGTask(dagCtx, depName)
+ }
+ if !dependenciesCompleted {
+ return
+ }
+
+ // All our dependencies completed. Now add the child relationship from our dependency's
+ // outbound nodes to this node.
+ node = dagCtx.getTaskNode(taskName)
+ if node == nil {
+ woc.log.Infof("All of node %s dependencies %s completed", nodeName, task.Dependencies)
+ // Add all outbound nodes of our dependencies as parents to this node
+ for _, depName := range task.Dependencies {
+ depNode := dagCtx.getTaskNode(depName)
+ woc.log.Infof("node %s outbound nodes: %s", depNode, depNode.OutboundNodes)
+ if depNode.Type == wfv1.NodeTypePod {
+ woc.addChildNode(depNode.Name, nodeName)
+ } else {
+ for _, outNodeID := range depNode.OutboundNodes {
+ woc.addChildNode(woc.wf.Status.Nodes[outNodeID].Name, nodeName)
+ }
+ }
+ }
+ }
+
+ if !dependenciesSuccessful {
+ woc.log.Infof("Task %s being marked %s due to dependency failure", taskName, wfv1.NodeFailed)
+ woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagCtx.boundaryID, wfv1.NodeFailed)
+ return
+ }
+
+ // All our dependencies were satisfied and successful. It's our turn to run
+ // Substitute params/artifacts from our dependencies and execute the template
+ newTask, err := woc.resolveDependencyReferences(dagCtx, task)
+ if err != nil {
+ woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagCtx.boundaryID, wfv1.NodeError, err.Error())
+ return
+ }
+ _ = woc.executeTemplate(newTask.Template, newTask.Arguments, nodeName, dagCtx.boundaryID)
+}
+
+// resolveDependencyReferences replaces any references to outputs of task dependencies, or artifacts in the inputs
+// NOTE: by now, input parameters should have been substituted throughout the template
+func (woc *wfOperationCtx) resolveDependencyReferences(dagCtx *dagContext, task *wfv1.DAGTask) (*wfv1.DAGTask, error) {
+ // build up the scope
+ scope := wfScope{
+ tmpl: dagCtx.tmpl,
+ scope: make(map[string]interface{}),
+ }
+ for _, depName := range task.Dependencies {
+ depNode := dagCtx.getTaskNode(depName)
+ prefix := fmt.Sprintf("dependencies.%s", depName)
+ scope.addNodeOutputsToScope(prefix, depNode)
+ }
+
+ // Perform replacement
+ taskBytes, err := json.Marshal(task)
+ if err != nil {
+ return nil, errors.InternalWrapError(err)
+ }
+ fstTmpl := fasttemplate.New(string(taskBytes), "{{", "}}")
+ newTaskStr, err := common.Replace(fstTmpl, scope.replaceMap(), false, "")
+ if err != nil {
+ return nil, err
+ }
+ var newTask wfv1.DAGTask
+ err = json.Unmarshal([]byte(newTaskStr), &newTask)
+ if err != nil {
+ return nil, errors.InternalWrapError(err)
+ }
+
+ // replace all artifact references
+ for j, art := range newTask.Arguments.Artifacts {
+ if art.From == "" {
+ continue
+ }
+ resolvedArt, err := scope.resolveArtifact(art.From)
+ if err != nil {
+ return nil, err
+ }
+ resolvedArt.Name = art.Name
+ newTask.Arguments.Artifacts[j] = *resolvedArt
+ }
+ return &newTask, nil
+}
+
+// findLeafTaskNames finds the names of all tasks whom no other nodes depend on.
+// This list of tasks is used as the the default list of targets when dag.targets is omitted.
+func findLeafTaskNames(tasks []wfv1.DAGTask) []string {
+ taskIsLeaf := make(map[string]bool)
+ for _, task := range tasks {
+ if _, ok := taskIsLeaf[task.Name]; !ok {
+ taskIsLeaf[task.Name] = true
+ }
+ for _, dependency := range task.Dependencies {
+ taskIsLeaf[dependency] = false
+ }
+ }
+ leafTaskNames := make([]string, 0)
+ for taskName, isLeaf := range taskIsLeaf {
+ if isLeaf {
+ leafTaskNames = append(leafTaskNames, taskName)
+ }
+ }
+ return leafTaskNames
+}
diff --git a/workflow/controller/dag_test.go b/workflow/controller/dag_test.go
new file mode 100644
index 000000000000..b0b429f89997
--- /dev/null
+++ b/workflow/controller/dag_test.go
@@ -0,0 +1 @@
+package controller
diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go
index 3d9bf50286c6..96e893767a9e 100644
--- a/workflow/controller/operator.go
+++ b/workflow/controller/operator.go
@@ -122,11 +122,13 @@ func (woc *wfOperationCtx) operate() {
err := woc.createPVCs()
if err != nil {
- woc.log.Errorf("%s error: %+v", woc.wf.ObjectMeta.Name, err)
+ woc.log.Errorf("%s pvc create error: %+v", woc.wf.ObjectMeta.Name, err)
woc.markWorkflowError(err, true)
return
}
- err = woc.executeTemplate(woc.wf.Spec.Entrypoint, woc.wf.Spec.Arguments, woc.wf.ObjectMeta.Name)
+ var workflowStatus wfv1.NodePhase
+ var workflowMessage string
+ err = woc.executeTemplate(woc.wf.Spec.Entrypoint, woc.wf.Spec.Arguments, woc.wf.ObjectMeta.Name, "")
if err != nil {
if errors.IsCode(errors.CodeTimeout, err) {
// A timeout indicates we took too long operating on the workflow.
@@ -137,22 +139,24 @@ func (woc *wfOperationCtx) operate() {
}
woc.log.Errorf("%s error: %+v", woc.wf.ObjectMeta.Name, err)
}
- node := woc.wf.Status.Nodes[woc.wf.NodeID(woc.wf.ObjectMeta.Name)]
+ node := woc.getNodeByName(woc.wf.ObjectMeta.Name)
if !node.Completed() {
return
}
+ workflowStatus = node.Phase
+ workflowMessage = node.Message
var onExitNode *wfv1.NodeStatus
if woc.wf.Spec.OnExit != "" {
- if node.Phase == wfv1.NodeSkipped {
+ if workflowStatus == wfv1.NodeSkipped {
// treat skipped the same as Succeeded for workflow.status
woc.globalParams[common.GlobalVarWorkflowStatus] = string(wfv1.NodeSucceeded)
} else {
- woc.globalParams[common.GlobalVarWorkflowStatus] = string(node.Phase)
+ woc.globalParams[common.GlobalVarWorkflowStatus] = string(workflowStatus)
}
woc.log.Infof("Running OnExit handler: %s", woc.wf.Spec.OnExit)
onExitNodeName := woc.wf.ObjectMeta.Name + ".onExit"
- err = woc.executeTemplate(woc.wf.Spec.OnExit, woc.wf.Spec.Arguments, onExitNodeName)
+ err = woc.executeTemplate(woc.wf.Spec.OnExit, woc.wf.Spec.Arguments, onExitNodeName, "")
if err != nil {
if errors.IsCode(errors.CodeTimeout, err) {
woc.requeue()
@@ -180,7 +184,7 @@ func (woc *wfOperationCtx) operate() {
// If we get here, the workflow completed, all PVCs were deleted successfully, and
// exit handlers were executed. We now need to infer the workflow phase from the
// node phase.
- switch node.Phase {
+ switch workflowStatus {
case wfv1.NodeSucceeded, wfv1.NodeSkipped:
if onExitNode != nil && !onExitNode.Successful() {
// if main workflow succeeded, but the exit node was unsuccessful
@@ -190,9 +194,9 @@ func (woc *wfOperationCtx) operate() {
woc.markWorkflowSuccess()
}
case wfv1.NodeFailed:
- woc.markWorkflowFailed(node.Message)
+ woc.markWorkflowFailed(workflowMessage)
case wfv1.NodeError:
- woc.markWorkflowPhase(wfv1.NodeError, true, node.Message)
+ woc.markWorkflowPhase(wfv1.NodeError, true, workflowMessage)
default:
// NOTE: we should never make it here because if the the node was 'Running'
// we should have returned earlier.
@@ -201,7 +205,16 @@ func (woc *wfOperationCtx) operate() {
}
}
-// persistUpdates will update a workflow with any updates made during workflow operation.
+func (woc *wfOperationCtx) getNodeByName(nodeName string) *wfv1.NodeStatus {
+ nodeID := woc.wf.NodeID(nodeName)
+ node, ok := woc.wf.Status.Nodes[nodeID]
+ if !ok {
+ return nil
+ }
+ return &node
+}
+
+// persistUpdates will PATCH a workflow with any updates made during workflow operation.
// It also labels any pods as completed if we have extracted everything we need from it.
func (woc *wfOperationCtx) persistUpdates() {
if !woc.updated {
@@ -366,7 +379,7 @@ func (woc *wfOperationCtx) podReconciliation() error {
// is now impossible to infer status. The only thing we can do at this point is
// to mark the node with Error.
for nodeID, node := range woc.wf.Status.Nodes {
- if len(node.Children) > 0 || node.Completed() {
+ if node.Type != wfv1.NodeTypePod || node.Completed() {
// node is not a pod, or it is already complete
continue
}
@@ -663,7 +676,6 @@ func (woc *wfOperationCtx) createPVCs() error {
}
pvc, err := pvcClient.Create(&pvcTmpl)
if err != nil {
- woc.markNodeError(woc.wf.ObjectMeta.Name, err)
return err
}
vol := apiv1.Volume{
@@ -726,109 +738,44 @@ func (woc *wfOperationCtx) getLastChildNode(node *wfv1.NodeStatus) (*wfv1.NodeSt
return &lastChildNode, nil
}
-func (woc *wfOperationCtx) getNode(nodeName string) wfv1.NodeStatus {
- nodeID := woc.wf.NodeID(nodeName)
- node, ok := woc.wf.Status.Nodes[nodeID]
- if !ok {
- panic("Failed to find node " + nodeName)
- }
-
- return node
-}
-
-func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Arguments, nodeName string) error {
+func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Arguments, nodeName string, boundaryID string) error {
woc.log.Debugf("Evaluating node %s: template: %s", nodeName, templateName)
- nodeID := woc.wf.NodeID(nodeName)
- node, ok := woc.wf.Status.Nodes[nodeID]
- if ok && node.Completed() {
+ node := woc.getNodeByName(nodeName)
+ if node != nil && node.Completed() {
woc.log.Debugf("Node %s already completed", nodeName)
return nil
}
tmpl := woc.wf.GetTemplate(templateName)
if tmpl == nil {
err := errors.Errorf(errors.CodeBadRequest, "Node %v error: template '%s' undefined", node, templateName)
- woc.markNodeError(nodeName, err)
+ woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, boundaryID, wfv1.NodeError, err.Error())
return err
}
tmpl, err := common.ProcessArgs(tmpl, args, woc.globalParams, false)
if err != nil {
- woc.markNodeError(nodeName, err)
+ woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, boundaryID, wfv1.NodeError, err.Error())
return err
}
switch tmpl.GetType() {
case wfv1.TemplateTypeContainer:
- if ok {
- if node.RetryStrategy != nil {
- if err = woc.processNodeRetries(&node); err != nil {
- return err
- }
-
- // The updated node status could've changed. Get the latest copy of the node.
- node = woc.getNode(node.Name)
- log.Infof("Node %s: Status: %s", node.Name, node.Phase)
- if node.Completed() {
- return nil
- }
- lastChildNode, err := woc.getLastChildNode(&node)
- if err != nil {
- return err
- }
- if !lastChildNode.Completed() {
- // last child node is still running.
- return nil
- }
- } else {
- // There are no retries configured and there's already a node entry for the container.
- // This means the container was already scheduled (or had a create pod error). Nothing
- // to more to do with this node.
- return nil
- }
- }
-
- // If the user has specified retries, a special "retries" non-leaf node
- // is created. This node acts as the parent of all retries that will be
- // done for the container. The status of this node should be "Success"
- // if any of the retries succeed. Otherwise, it is "Failed".
-
- // TODO(shri): Mark the current node as a "retry" node
- // Create a new child node as the first attempt node and
- // run the template in that node.
- nodeToExecute := nodeName
if tmpl.RetryStrategy != nil {
- node := woc.markNodePhase(nodeName, wfv1.NodeRunning)
- retries := wfv1.RetryStrategy{}
- node.RetryStrategy = &retries
- node.RetryStrategy.Limit = tmpl.RetryStrategy.Limit
- woc.wf.Status.Nodes[nodeID] = *node
-
- // Create new node as child of 'node'
- newContainerName := fmt.Sprintf("%s(%d)", nodeName, len(node.Children))
- woc.markNodePhase(newContainerName, wfv1.NodeRunning)
- woc.addChildNode(nodeName, newContainerName)
- nodeToExecute = newContainerName
+ err = woc.executeRetryContainer(nodeName, tmpl, boundaryID)
+ } else {
+ err = woc.executeContainer(nodeName, tmpl, boundaryID)
}
-
- // We have not yet created the pod
- err = woc.executeContainer(nodeToExecute, tmpl)
case wfv1.TemplateTypeSteps:
- if !ok {
- node = *woc.markNodePhase(nodeName, wfv1.NodeRunning)
- woc.log.Infof("Initialized workflow node %v", node)
- }
- err = woc.executeSteps(nodeName, tmpl)
+ err = woc.executeSteps(nodeName, tmpl, boundaryID)
case wfv1.TemplateTypeScript:
- if !ok {
- err = woc.executeScript(nodeName, tmpl)
- }
+ err = woc.executeScript(nodeName, tmpl, boundaryID)
case wfv1.TemplateTypeResource:
- if !ok {
- err = woc.executeResource(nodeName, tmpl)
- }
+ err = woc.executeResource(nodeName, tmpl, boundaryID)
+ case wfv1.TemplateTypeDAG:
+ _ = woc.executeDAG(nodeName, tmpl, boundaryID)
default:
- err = errors.Errorf("Template '%s' missing specification", tmpl.Name)
- woc.markNodeError(nodeName, err)
+ err = errors.Errorf(errors.CodeBadRequest, "Template '%s' missing specification", tmpl.Name)
+ woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, boundaryID, wfv1.NodeError, err.Error())
}
if err != nil {
return err
@@ -892,30 +839,54 @@ func (woc *wfOperationCtx) markWorkflowError(err error, markCompleted bool) {
woc.markWorkflowPhase(wfv1.NodeError, markCompleted, err.Error())
}
+func (woc *wfOperationCtx) initializeNode(nodeName string, nodeType wfv1.NodeType, boundaryID string, phase wfv1.NodePhase, message ...string) *wfv1.NodeStatus {
+ nodeID := woc.wf.NodeID(nodeName)
+ _, ok := woc.wf.Status.Nodes[nodeID]
+ if ok {
+ panic(fmt.Sprintf("node %s already initialized", nodeName))
+ }
+ node := wfv1.NodeStatus{
+ ID: nodeID,
+ Name: nodeName,
+ Type: nodeType,
+ BoundaryID: boundaryID,
+ Phase: phase,
+ StartedAt: metav1.Time{Time: time.Now().UTC()},
+ }
+ if node.Completed() && node.FinishedAt.IsZero() {
+ node.FinishedAt = node.StartedAt
+ }
+ woc.wf.Status.Nodes[nodeID] = node
+ woc.log.Infof("node %s initialized %s", node, node.Phase)
+ woc.updated = true
+ return &node
+}
+
// markNodePhase marks a node with the given phase, creating the node if necessary and handles timestamps
func (woc *wfOperationCtx) markNodePhase(nodeName string, phase wfv1.NodePhase, message ...string) *wfv1.NodeStatus {
- nodeID := woc.wf.NodeID(nodeName)
- node, ok := woc.wf.Status.Nodes[nodeID]
- if !ok {
- node = wfv1.NodeStatus{
- ID: nodeID,
- Name: nodeName,
- Phase: phase,
- StartedAt: metav1.Time{Time: time.Now().UTC()},
- }
- } else {
+ node := woc.getNodeByName(nodeName)
+ if node == nil {
+ panic(fmt.Sprintf("node %s uninitialized", nodeName))
+ }
+ if node.Phase != phase {
+ woc.log.Infof("node %s phase %s -> %s", node, node.Phase, phase)
node.Phase = phase
+ woc.updated = true
}
if len(message) > 0 {
- node.Message = message[0]
+ if message[0] != node.Message {
+ woc.log.Infof("node %s message: %s", node, message[0])
+ node.Message = message[0]
+ woc.updated = true
+ }
}
if node.Completed() && node.FinishedAt.IsZero() {
node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
+ woc.log.Infof("node %s finished: %s", node, node.FinishedAt)
+ woc.updated = true
}
- woc.wf.Status.Nodes[nodeID] = node
- woc.updated = true
- woc.log.Debugf("Marked node %s %s", nodeName, phase)
- return &node
+ woc.wf.Status.Nodes[node.ID] = *node
+ return node
}
// markNodeError is a convenience method to mark a node with an error and set the message from the error
@@ -923,18 +894,77 @@ func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeS
return woc.markNodePhase(nodeName, wfv1.NodeError, err.Error())
}
-func (woc *wfOperationCtx) executeContainer(nodeName string, tmpl *wfv1.Template) error {
+// If the user has specified retries, a special "retries" non-leaf node
+// is created. This node acts as the parent of all retries that will be
+// done for the container. The status of this node should be "Success"
+// if any of the retries succeed. Otherwise, it is "Failed".
+func (woc *wfOperationCtx) executeRetryContainer(nodeName string, tmpl *wfv1.Template, boundaryID string) error {
+ node := woc.getNodeByName(nodeName)
+ if node == nil {
+ node = woc.initializeNode(nodeName, wfv1.NodeTypeRetry, boundaryID, wfv1.NodeRunning)
+ node.RetryStrategy = tmpl.RetryStrategy
+ woc.wf.Status.Nodes[node.ID] = *node
+ }
+ if err := woc.processNodeRetries(node); err != nil {
+ return err
+ }
+ // The updated node status could've changed. Get the latest copy of the node.
+ node = woc.getNodeByName(node.Name)
+ woc.log.Infof("Node %s: Status: %s", node.Name, node.Phase)
+ if node.Completed() {
+ return nil
+ }
+ lastChildNode, err := woc.getLastChildNode(node)
+ if err != nil {
+ return err
+ }
+ if lastChildNode != nil && !lastChildNode.Completed() {
+ // last child node is still running.
+ return nil
+ }
+ // Create new node as child of 'node'
+ newContainerName := fmt.Sprintf("%s(%d)", nodeName, len(node.Children))
+ err = woc.executeContainer(newContainerName, tmpl, boundaryID)
+ woc.addChildNode(nodeName, newContainerName)
+ return err
+}
+
+func (woc *wfOperationCtx) executeContainer(nodeName string, tmpl *wfv1.Template, boundaryID string) error {
+ node := woc.getNodeByName(nodeName)
+ if node != nil {
+ return nil
+ }
woc.log.Debugf("Executing node %s with container template: %v\n", nodeName, tmpl)
_, err := woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl)
if err != nil {
- woc.markNodeError(nodeName, err)
+ woc.initializeNode(nodeName, wfv1.NodeTypePod, boundaryID, wfv1.NodeError, err.Error())
return err
}
- node := woc.markNodePhase(nodeName, wfv1.NodeRunning)
+ node = woc.initializeNode(nodeName, wfv1.NodeTypePod, boundaryID, wfv1.NodeRunning)
woc.log.Infof("Initialized container node %v", node)
return nil
}
+func (woc *wfOperationCtx) getOutboundNodes(nodeID string) []string {
+ node := woc.wf.Status.Nodes[nodeID]
+ if node.Type == wfv1.NodeTypePod {
+ return []string{node.ID}
+ }
+ outbound := make([]string, 0)
+ for _, outboundNodeID := range node.OutboundNodes {
+ outNode := woc.wf.Status.Nodes[outboundNodeID]
+ if outNode.Type == wfv1.NodeTypePod {
+ outbound = append(outbound, outboundNodeID)
+ } else {
+ subOutIDs := woc.getOutboundNodes(outboundNodeID)
+ for _, subOutID := range subOutIDs {
+ outbound = append(outbound, subOutID)
+ }
+ }
+ }
+ return outbound
+}
+
// getTemplateOutputsFromScope resolves a template's outputs from the scope of the template
func getTemplateOutputsFromScope(tmpl *wfv1.Template, scope *wfScope) (*wfv1.Outputs, error) {
if !tmpl.Outputs.HasOutputs() {
@@ -967,7 +997,11 @@ func getTemplateOutputsFromScope(tmpl *wfv1.Template, scope *wfScope) (*wfv1.Out
return &outputs, nil
}
-func (woc *wfOperationCtx) executeScript(nodeName string, tmpl *wfv1.Template) error {
+func (woc *wfOperationCtx) executeScript(nodeName string, tmpl *wfv1.Template, boundaryID string) error {
+ node := woc.getNodeByName(nodeName)
+ if node != nil {
+ return nil
+ }
mainCtr := apiv1.Container{
Image: tmpl.Script.Image,
Command: tmpl.Script.Command,
@@ -975,14 +1009,48 @@ func (woc *wfOperationCtx) executeScript(nodeName string, tmpl *wfv1.Template) e
}
_, err := woc.createWorkflowPod(nodeName, mainCtr, tmpl)
if err != nil {
- woc.markNodeError(nodeName, err)
+ woc.initializeNode(nodeName, wfv1.NodeTypePod, boundaryID, wfv1.NodeError, err.Error())
return err
}
- node := woc.markNodePhase(nodeName, wfv1.NodeRunning)
+ node = woc.initializeNode(nodeName, wfv1.NodeTypePod, boundaryID, wfv1.NodeRunning)
woc.log.Infof("Initialized script node %v", node)
return nil
}
+// addNodeOutputsToScope adds all of a nodes outputs to the scope with the given prefix
+func (wfs *wfScope) addNodeOutputsToScope(prefix string, node *wfv1.NodeStatus) {
+ if node.PodIP != "" {
+ key := fmt.Sprintf("%s.ip", prefix)
+ wfs.addParamToScope(key, node.PodIP)
+ }
+ if node.Outputs != nil {
+ if node.Outputs.Result != nil {
+ key := fmt.Sprintf("%s.outputs.result", prefix)
+ wfs.addParamToScope(key, *node.Outputs.Result)
+ }
+ for _, outParam := range node.Outputs.Parameters {
+ key := fmt.Sprintf("%s.outputs.parameters.%s", prefix, outParam.Name)
+ wfs.addParamToScope(key, *outParam.Value)
+ }
+ for _, outArt := range node.Outputs.Artifacts {
+ key := fmt.Sprintf("%s.outputs.artifacts.%s", prefix, outArt.Name)
+ wfs.addArtifactToScope(key, outArt)
+ }
+ }
+}
+
+// replaceMap returns a replacement map of strings intended to be used simple string substitution
+func (wfs *wfScope) replaceMap() map[string]string {
+ replaceMap := make(map[string]string)
+ for key, val := range wfs.scope {
+ valStr, ok := val.(string)
+ if ok {
+ replaceMap[key] = valStr
+ }
+ }
+ return replaceMap
+}
+
func (wfs *wfScope) addParamToScope(key, val string) {
wfs.scope[key] = val
}
@@ -1035,6 +1103,7 @@ func (wfs *wfScope) resolveArtifact(v string) (*wfv1.Artifact, error) {
}
// addChildNode adds a nodeID as a child to a parent
+// parent and child are both node names
func (woc *wfOperationCtx) addChildNode(parent string, child string) {
parentID := woc.wf.NodeID(parent)
childID := woc.wf.NodeID(child)
@@ -1057,7 +1126,11 @@ func (woc *wfOperationCtx) addChildNode(parent string, child string) {
}
// executeResource is runs a kubectl command against a manifest
-func (woc *wfOperationCtx) executeResource(nodeName string, tmpl *wfv1.Template) error {
+func (woc *wfOperationCtx) executeResource(nodeName string, tmpl *wfv1.Template, boundaryID string) error {
+ node := woc.getNodeByName(nodeName)
+ if node != nil {
+ return nil
+ }
mainCtr := apiv1.Container{
Image: woc.controller.Config.ExecutorImage,
Command: []string{"argoexec"},
@@ -1069,10 +1142,10 @@ func (woc *wfOperationCtx) executeResource(nodeName string, tmpl *wfv1.Template)
}
_, err := woc.createWorkflowPod(nodeName, mainCtr, tmpl)
if err != nil {
- woc.markNodeError(nodeName, err)
+ woc.initializeNode(nodeName, wfv1.NodeTypePod, boundaryID, wfv1.NodeError, err.Error())
return err
}
- node := woc.markNodePhase(nodeName, wfv1.NodeRunning)
+ node = woc.initializeNode(nodeName, wfv1.NodeTypePod, boundaryID, wfv1.NodeRunning)
woc.log.Infof("Initialized resource node %v", node)
return nil
}
diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go
index b0fa44dfa7b7..51020bd7dced 100644
--- a/workflow/controller/operator_test.go
+++ b/workflow/controller/operator_test.go
@@ -115,7 +115,7 @@ func TestProcessNodesWithRetries(t *testing.T) {
// Add the parent node for retries.
nodeName := "test-node"
nodeID := woc.wf.NodeID(nodeName)
- node := woc.markNodePhase(nodeName, wfv1.NodeRunning)
+ node := woc.initializeNode(nodeName, wfv1.NodeTypeRetry, "", wfv1.NodeRunning)
retries := wfv1.RetryStrategy{}
var retryLimit int32
retryLimit = 2
@@ -135,44 +135,44 @@ func TestProcessNodesWithRetries(t *testing.T) {
// Add child nodes.
for i := 0; i < 2; i++ {
childNode := fmt.Sprintf("child-node-%d", i)
- woc.markNodePhase(childNode, wfv1.NodeRunning)
+ woc.initializeNode(childNode, wfv1.NodeTypePod, "", wfv1.NodeRunning)
woc.addChildNode(nodeName, childNode)
}
- n := woc.getNode(nodeName)
- lastChild, err = woc.getLastChildNode(&n)
+ n := woc.getNodeByName(nodeName)
+ lastChild, err = woc.getLastChildNode(n)
assert.Nil(t, err)
assert.NotNil(t, lastChild)
// Last child is still running. processNodesWithRetries() should return false since
// there should be no retries at this point.
- err = woc.processNodeRetries(&n)
+ err = woc.processNodeRetries(n)
assert.Nil(t, err)
- n = woc.getNode(nodeName)
+ n = woc.getNodeByName(nodeName)
assert.Equal(t, n.Phase, wfv1.NodeRunning)
// Mark lastChild as successful.
woc.markNodePhase(lastChild.Name, wfv1.NodeSucceeded)
- err = woc.processNodeRetries(&n)
+ err = woc.processNodeRetries(n)
assert.Nil(t, err)
// The parent node also gets marked as Succeeded.
- n = woc.getNode(nodeName)
+ n = woc.getNodeByName(nodeName)
assert.Equal(t, n.Phase, wfv1.NodeSucceeded)
// Mark the parent node as running again and the lastChild as failed.
woc.markNodePhase(n.Name, wfv1.NodeRunning)
woc.markNodePhase(lastChild.Name, wfv1.NodeFailed)
- woc.processNodeRetries(&n)
- n = woc.getNode(nodeName)
+ woc.processNodeRetries(n)
+ n = woc.getNodeByName(nodeName)
assert.Equal(t, n.Phase, wfv1.NodeRunning)
// Add a third node that has failed.
childNode := "child-node-3"
- woc.markNodePhase(childNode, wfv1.NodeFailed)
+ woc.initializeNode(childNode, wfv1.NodeTypePod, "", wfv1.NodeFailed)
woc.addChildNode(nodeName, childNode)
- n = woc.getNode(nodeName)
- err = woc.processNodeRetries(&n)
+ n = woc.getNodeByName(nodeName)
+ err = woc.processNodeRetries(n)
assert.Nil(t, err)
- n = woc.getNode(nodeName)
+ n = woc.getNodeByName(nodeName)
assert.Equal(t, n.Phase, wfv1.NodeFailed)
}
diff --git a/workflow/controller/steps.go b/workflow/controller/steps.go
index 60abff4021be..1b5c5f8c1a04 100644
--- a/workflow/controller/steps.go
+++ b/workflow/controller/steps.go
@@ -15,42 +15,73 @@ import (
"github.com/valyala/fasttemplate"
)
-func (woc *wfOperationCtx) executeSteps(nodeName string, tmpl *wfv1.Template) error {
- nodeID := woc.wf.NodeID(nodeName)
+// stepsContext holds context information about this context's steps
+type stepsContext struct {
+ // boundaryID is the node ID of the boundary which all immediate child steps are bound to
+ boundaryID string
+
+ // scope holds parameter and artifacts which are referenceable in scope during execution
+ scope *wfScope
+}
+
+func (woc *wfOperationCtx) executeSteps(nodeName string, tmpl *wfv1.Template, boundaryID string) error {
+ node := woc.getNodeByName(nodeName)
+ if node == nil {
+ node = woc.initializeNode(nodeName, wfv1.NodeTypeSteps, boundaryID, wfv1.NodeRunning)
+ }
defer func() {
- if woc.wf.Status.Nodes[nodeID].Completed() {
- _ = woc.killDeamonedChildren(nodeID)
+ if woc.wf.Status.Nodes[node.ID].Completed() {
+ _ = woc.killDeamonedChildren(node.ID)
}
}()
- scope := wfScope{
- tmpl: tmpl,
- scope: make(map[string]interface{}),
+ stepsCtx := stepsContext{
+ boundaryID: node.ID,
+ scope: &wfScope{
+ tmpl: tmpl,
+ scope: make(map[string]interface{}),
+ },
}
for i, stepGroup := range tmpl.Steps {
sgNodeName := fmt.Sprintf("%s[%d]", nodeName, i)
- woc.addChildNode(nodeName, sgNodeName)
- err := woc.executeStepGroup(stepGroup, sgNodeName, &scope)
+ sgNode := woc.getNodeByName(sgNodeName)
+ if sgNode == nil {
+ // initialize the step group
+ sgNode = woc.initializeNode(sgNodeName, wfv1.NodeTypeStepGroup, stepsCtx.boundaryID, wfv1.NodeRunning)
+ if i == 0 {
+ woc.addChildNode(nodeName, sgNodeName)
+ } else {
+ // This logic will connect all the outbound nodes of the previous
+ // step group as parents to the current step group node
+ prevStepGroupName := fmt.Sprintf("%s[%d]", nodeName, i-1)
+ prevStepGroupNode := woc.getNodeByName(prevStepGroupName)
+ for _, childID := range prevStepGroupNode.Children {
+ outboundNodeIDs := woc.getOutboundNodes(childID)
+ woc.log.Infof("SG Outbound nodes of %s are %s", childID, outboundNodeIDs)
+ for _, outNodeID := range outboundNodeIDs {
+ woc.addChildNode(woc.wf.Status.Nodes[outNodeID].Name, sgNodeName)
+ }
+ }
+ }
+ }
+ err := woc.executeStepGroup(stepGroup, sgNodeName, &stepsCtx)
if err != nil {
- if errors.IsCode(errors.CodeTimeout, err) {
- return err
+ if !errors.IsCode(errors.CodeTimeout, err) {
+ woc.markNodeError(nodeName, err)
}
- woc.markNodeError(nodeName, err)
return err
}
- sgNodeID := woc.wf.NodeID(sgNodeName)
- if !woc.wf.Status.Nodes[sgNodeID].Completed() {
- woc.log.Infof("Workflow step group node %v not yet completed", woc.wf.Status.Nodes[sgNodeID])
+ if !sgNode.Completed() {
+ woc.log.Infof("Workflow step group node %v not yet completed", sgNode)
return nil
}
- if !woc.wf.Status.Nodes[sgNodeID].Successful() {
- failMessage := fmt.Sprintf("step group %s was unsuccessful", sgNodeName)
+ if !sgNode.Successful() {
+ failMessage := fmt.Sprintf("step group %s was unsuccessful", sgNode)
woc.log.Info(failMessage)
woc.markNodePhase(nodeName, wfv1.NodeFailed, failMessage)
return nil
}
- // HACK: need better way to add children to scope
for _, step := range stepGroup {
childNodeName := fmt.Sprintf("%s.%s", sgNodeName, step.Name)
childNodeID := woc.wf.NodeID(childNodeName)
@@ -61,56 +92,50 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmpl *wfv1.Template) er
// are not easily referenceable by user.
continue
}
- if childNode.PodIP != "" {
- key := fmt.Sprintf("steps.%s.ip", step.Name)
- scope.addParamToScope(key, childNode.PodIP)
- }
- if childNode.Outputs != nil {
- if childNode.Outputs.Result != nil {
- key := fmt.Sprintf("steps.%s.outputs.result", step.Name)
- scope.addParamToScope(key, *childNode.Outputs.Result)
- }
- for _, outParam := range childNode.Outputs.Parameters {
- key := fmt.Sprintf("steps.%s.outputs.parameters.%s", step.Name, outParam.Name)
- scope.addParamToScope(key, *outParam.Value)
- }
- for _, outArt := range childNode.Outputs.Artifacts {
- key := fmt.Sprintf("steps.%s.outputs.artifacts.%s", step.Name, outArt.Name)
- scope.addArtifactToScope(key, outArt)
- }
- }
+ prefix := fmt.Sprintf("steps.%s", step.Name)
+ stepsCtx.scope.addNodeOutputsToScope(prefix, &childNode)
}
}
- outputs, err := getTemplateOutputsFromScope(tmpl, &scope)
+ // If this template has outputs from any of its steps, copy them to this node here
+ outputs, err := getTemplateOutputsFromScope(tmpl, stepsCtx.scope)
if err != nil {
woc.markNodeError(nodeName, err)
return err
}
if outputs != nil {
- node := woc.wf.Status.Nodes[nodeID]
+ node = woc.getNodeByName(nodeName)
node.Outputs = outputs
- woc.wf.Status.Nodes[nodeID] = node
+ woc.wf.Status.Nodes[node.ID] = *node
+ }
+ // Now that we have completed, set the outbound nodes from the last step group
+ outbound := make([]string, 0)
+ lastSGNode := woc.getNodeByName(fmt.Sprintf("%s[%d]", nodeName, len(tmpl.Steps)-1))
+ for _, childID := range lastSGNode.Children {
+ outboundNodeIDs := woc.getOutboundNodes(childID)
+ woc.log.Infof("Outbound nodes of %s is %s", childID, outboundNodeIDs)
+ for _, outNodeID := range outboundNodeIDs {
+ outbound = append(outbound, outNodeID)
+ }
}
- woc.markNodePhase(nodeName, wfv1.NodeSucceeded)
+ node = woc.getNodeByName(nodeName)
+ woc.log.Infof("Outbound nodes of %s is %s", node.ID, outbound)
+ node.OutboundNodes = outbound
+ woc.wf.Status.Nodes[node.ID] = *node
+
+ node = woc.markNodePhase(nodeName, wfv1.NodeSucceeded)
return nil
}
// executeStepGroup examines a map of parallel steps and executes them in parallel.
// Handles referencing of variables in scope, expands `withItem` clauses, and evaluates `when` expressions
-func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNodeName string, scope *wfScope) error {
- nodeID := woc.wf.NodeID(sgNodeName)
- node, ok := woc.wf.Status.Nodes[nodeID]
- if ok && node.Completed() {
+func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNodeName string, stepsCtx *stepsContext) error {
+ node := woc.getNodeByName(sgNodeName)
+ if node.Completed() {
woc.log.Debugf("Step group node %v already marked completed", node)
return nil
}
- if !ok {
- node = *woc.markNodePhase(sgNodeName, wfv1.NodeRunning)
- woc.log.Infof("Initializing step group node %v", node)
- }
-
// First, resolve any references to outputs from previous steps, and perform substitution
- stepGroup, err := woc.resolveReferences(stepGroup, scope)
+ stepGroup, err := woc.resolveReferences(stepGroup, stepsCtx.scope)
if err != nil {
woc.markNodeError(sgNodeName, err)
return err
@@ -136,12 +161,14 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod
return err
}
if !proceed {
- skipReason := fmt.Sprintf("when '%s' evaluated false", step.When)
- woc.log.Infof("Skipping %s: %s", childNodeName, skipReason)
- woc.markNodePhase(childNodeName, wfv1.NodeSkipped, skipReason)
+ if woc.getNodeByName(childNodeName) == nil {
+ skipReason := fmt.Sprintf("when '%s' evaluated false", step.When)
+ woc.log.Infof("Skipping %s: %s", childNodeName, skipReason)
+ woc.initializeNode(childNodeName, wfv1.NodeTypeSkipped, stepsCtx.boundaryID, wfv1.NodeSkipped, skipReason)
+ }
continue
}
- err = woc.executeTemplate(step.Template, step.Arguments, childNodeName)
+ err = woc.executeTemplate(step.Template, step.Arguments, childNodeName, stepsCtx.boundaryID)
if err != nil {
if !errors.IsCode(errors.CodeTimeout, err) {
woc.markNodeError(childNodeName, err)
@@ -151,7 +178,7 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod
}
}
- node = woc.wf.Status.Nodes[nodeID]
+ node = woc.getNodeByName(sgNodeName)
// Return if not all children completed
for _, childNodeID := range node.Children {
if !woc.wf.Status.Nodes[childNodeID].Completed() {
@@ -168,8 +195,8 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod
return nil
}
}
- woc.markNodePhase(node.Name, wfv1.NodeSucceeded)
- woc.log.Infof("Step group node %v successful", woc.wf.Status.Nodes[nodeID])
+ node = woc.markNodePhase(node.Name, wfv1.NodeSucceeded)
+ woc.log.Infof("Step group node %v successful", node)
return nil
}
diff --git a/workflow/controller/workflowpod_test.go b/workflow/controller/workflowpod_test.go
index 9a03fec1d2f9..af098627fcae 100644
--- a/workflow/controller/workflowpod_test.go
+++ b/workflow/controller/workflowpod_test.go
@@ -75,7 +75,7 @@ script:
// TestScriptTemplateWithVolume ensure we can a script pod with input artifacts
func TestScriptTemplateWithVolume(t *testing.T) {
tmpl := unmarshalTemplate(scriptTemplateWithInputArtifact)
- err := newWoc().executeScript(tmpl.Name, tmpl)
+ err := newWoc().executeScript(tmpl.Name, tmpl, "")
assert.Nil(t, err)
}
@@ -84,7 +84,7 @@ func TestScriptTemplateWithVolume(t *testing.T) {
func TestServiceAccount(t *testing.T) {
woc := newWoc()
woc.wf.Spec.ServiceAccountName = "foo"
- err := woc.executeContainer(woc.wf.Spec.Entrypoint, &woc.wf.Spec.Templates[0])
+ err := woc.executeContainer(woc.wf.Spec.Entrypoint, &woc.wf.Spec.Templates[0], "")
assert.Nil(t, err)
podName := getPodName(woc.wf)
pod, err := woc.controller.kubeclientset.CoreV1().Pods("").Get(podName, metav1.GetOptions{})
@@ -100,7 +100,7 @@ func TestImagePullSecrets(t *testing.T) {
Name: "secret-name",
},
}
- err := woc.executeContainer(woc.wf.Spec.Entrypoint, &woc.wf.Spec.Templates[0])
+ err := woc.executeContainer(woc.wf.Spec.Entrypoint, &woc.wf.Spec.Templates[0], "")
assert.Nil(t, err)
podName := getPodName(woc.wf)
pod, err := woc.controller.kubeclientset.CoreV1().Pods("").Get(podName, metav1.GetOptions{})
@@ -131,7 +131,7 @@ func TestAffinity(t *testing.T) {
},
},
}
- err := woc.executeContainer(woc.wf.Spec.Entrypoint, &woc.wf.Spec.Templates[0])
+ err := woc.executeContainer(woc.wf.Spec.Entrypoint, &woc.wf.Spec.Templates[0], "")
assert.Nil(t, err)
podName := getPodName(woc.wf)
pod, err := woc.controller.kubeclientset.CoreV1().Pods("").Get(podName, metav1.GetOptions{})