diff --git a/cmd/argo/commands/get.go b/cmd/argo/commands/get.go index 89f83394e567..440ddb490fe8 100644 --- a/cmd/argo/commands/get.go +++ b/cmd/argo/commands/get.go @@ -148,57 +148,76 @@ func printNodeTree(w *tabwriter.Writer, wf *wfv1.Workflow, node wfv1.NodeStatus, fmt.Fprintf(w, "%s%s\t%s\t%s\n", args...) } - // If the node has children, the node is a workflow template and - // node.Children prepresent a list of parallel steps. We skip - // a generation when recursing since the children nodes of workflow - // templates represent a virtual step group, which are not worh printing. - for i, stepGroupNodeID := range node.Children { - lastStepGroup := bool(i == len(node.Children)-1) - var part1, subp1 string - if lastStepGroup { - part1 = "└-" + if node.RetryStrategy != nil { + for i, childNodeID := range node.Children { + var part1, subp1 string subp1 = " " - } else { - part1 = "├-" - subp1 = "| " - } - stepGroupNode := wf.Status.Nodes[stepGroupNodeID] - for j, childNodeID := range stepGroupNode.Children { + childNode := wf.Status.Nodes[childNodeID] - if j > 0 { - if lastStepGroup { - part1 = " " - } else { - part1 = "| " - } + if i > 0 && i < len(node.Children)-1 { + part1 = "├-" + } else { + part1 = "└-" } - firstParallel := bool(j == 0) - lastParallel := bool(j == len(stepGroupNode.Children)-1) var part2, subp2 string - if firstParallel { - if len(stepGroupNode.Children) == 1 { - part2 = "--" - } else { - part2 = "·-" + part2 = "--" + childNodePrefix := childPrefix + part1 + part2 + childChldPrefix := childPrefix + subp1 + subp2 + printNodeTree(w, wf, childNode, depth+1, childNodePrefix, childChldPrefix) + } + } else { + // If the node has children, the node is a workflow template and + // node.Children prepresent a list of parallel steps. We skip + // a generation when recursing since the children nodes of workflow + // templates represent a virtual step group, which are not worh printing. + for i, stepGroupNodeID := range node.Children { + lastStepGroup := bool(i == len(node.Children)-1) + var part1, subp1 string + if lastStepGroup { + part1 = "└-" + subp1 = " " + } else { + part1 = "├-" + subp1 = "| " + } + stepGroupNode := wf.Status.Nodes[stepGroupNodeID] + for j, childNodeID := range stepGroupNode.Children { + childNode := wf.Status.Nodes[childNodeID] + if j > 0 { + if lastStepGroup { + part1 = " " + } else { + part1 = "| " + } } - if !lastParallel { - subp2 = "| " - } else { + firstParallel := bool(j == 0) + lastParallel := bool(j == len(stepGroupNode.Children)-1) + var part2, subp2 string + if firstParallel { + if len(stepGroupNode.Children) == 1 { + part2 = "--" + } else { + part2 = "·-" + } + if !lastParallel { + subp2 = "| " + } else { + subp2 = " " + } + + } else if lastParallel { + part2 = "└-" subp2 = " " + } else { + part2 = "├-" + subp2 = "| " } - - } else if lastParallel { - part2 = "└-" - subp2 = " " - } else { - part2 = "├-" - subp2 = "| " + childNodePrefix := childPrefix + part1 + part2 + childChldPrefix := childPrefix + subp1 + subp2 + // Remove stepgroup name from being displayed + childNode.Name = strings.TrimPrefix(childNode.Name, stepGroupNode.Name+".") + printNodeTree(w, wf, childNode, depth+1, childNodePrefix, childChldPrefix) } - childNodePrefix := childPrefix + part1 + part2 - childChldPrefix := childPrefix + subp1 + subp2 - // Remove stepgroup name from being displayed - childNode.Name = strings.TrimPrefix(childNode.Name, stepGroupNode.Name+".") - printNodeTree(w, wf, childNode, depth+1, childNodePrefix, childChldPrefix) } } } diff --git a/examples/retry-container-to-completion.yaml b/examples/retry-container-to-completion.yaml new file mode 100644 index 000000000000..ca8dac3bb954 --- /dev/null +++ b/examples/retry-container-to-completion.yaml @@ -0,0 +1,19 @@ +# This example demonstrates the use of infinite retries for running +# the container to completion. It uses the `random-fail` container. +# For more details, see +# https://github.com/shrinandj/random-fail + +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: container-retries- +spec: + entrypoint: container-retries + templates: + - name: container-retries + retryStrategy: {} + container: + image: shrinand/random-fail + command: ["python"] + args: ["/run.py", "40"] + diff --git a/examples/retry-container.yaml b/examples/retry-container.yaml new file mode 100644 index 000000000000..08ab3ea5461e --- /dev/null +++ b/examples/retry-container.yaml @@ -0,0 +1,19 @@ +# This example demonstrates the use of retries for a single container. +# It uses the `random-fail` container. For more details, see +# https://github.com/shrinandj/random-fail + +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: container-retries- +spec: + entrypoint: container-retries + templates: + - name: container-retries + retryStrategy: + limit: 4 + container: + image: shrinand/random-fail + command: ["python"] + args: ["/run.py", "0"] + diff --git a/examples/retry-with-steps.yaml b/examples/retry-with-steps.yaml new file mode 100644 index 000000000000..d658e0ff6ed0 --- /dev/null +++ b/examples/retry-with-steps.yaml @@ -0,0 +1,41 @@ +# This example demonstrates the use of retries with steps. +# It uses the `random-fail` container. For more details, see +# https://github.com/shrinandj/random-fail + +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: retry-with-steps- +spec: + entrypoint: hello-hello-hello + templates: + - name: hello-hello-hello + steps: + - - name: hello1 + template: random-fail + arguments: + parameters: + - name: failPct + value: "0" + - - name: hello2a + template: random-fail + arguments: + parameters: + - name: failPct + value: "0" + - name: hello2b + template: random-fail + arguments: + parameters: + - name: failPct + value: "0" + - name: random-fail + inputs: + parameters: + - name: failPct + retryStrategy: + limit: 4 + container: + image: shrinand/random-fail + command: ["python"] + args: ["/run.py", "{{inputs.parameters.failPct}}"] diff --git a/pkg/apis/workflow/v1alpha1/types.go b/pkg/apis/workflow/v1alpha1/types.go index e15b0a33013a..4214197b2679 100644 --- a/pkg/apis/workflow/v1alpha1/types.go +++ b/pkg/apis/workflow/v1alpha1/types.go @@ -128,6 +128,8 @@ type Template struct { // before the system actively tries to terminate the pod; value must be positive integer // This field is only applicable to container and script templates. ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` + + RetryStrategy *RetryStrategy `json:"retryStrategy,omitempty"` } // Inputs are the mechanism for passing parameters, artifacts, volumes from one template to another @@ -272,6 +274,21 @@ type WorkflowStatus struct { PersistentVolumeClaims []apiv1.Volume `json:"persistentVolumeClaims,omitempty"` } +// GetNodesWithRetries returns a list of nodes that have retries. +func (wfs *WorkflowStatus) GetNodesWithRetries() []NodeStatus { + var nodesWithRetries []NodeStatus + for _, node := range wfs.Nodes { + if node.RetryStrategy != nil { + nodesWithRetries = append(nodesWithRetries, node) + } + } + return nodesWithRetries +} + +type RetryStrategy struct { + Limit *int32 `json:"limit,omitempty"` +} + type NodeStatus struct { // ID is a unique identifier of a node within the worklow // It is implemented as a hash of the node name, which makes the ID deterministic @@ -300,6 +317,8 @@ type NodeStatus struct { // Daemoned tracks whether or not this node was daemoned and need to be terminated Daemoned *bool `json:"daemoned,omitempty"` + RetryStrategy *RetryStrategy `json:"retryStrategy,omitempty"` + // Outputs captures output parameter values and artifact locations Outputs *Outputs `json:"outputs,omitempty"` @@ -332,6 +351,12 @@ func (n NodeStatus) Successful() bool { return n.Phase == NodeSucceeded || n.Phase == NodeSkipped } +// CanRetry returns whether the node should be retried or not. +func (n NodeStatus) CanRetry() bool { + // TODO(shri): Check if there are some 'unretryable' errors. + return n.Completed() && !n.Successful() +} + type S3Bucket struct { Endpoint string `json:"endpoint"` Bucket string `json:"bucket"` diff --git a/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go index cfff6274ad79..4c2414e8e315 100644 --- a/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go @@ -262,6 +262,15 @@ func (in *NodeStatus) DeepCopyInto(out *NodeStatus) { **out = **in } } + if in.RetryStrategy != nil { + in, out := &in.RetryStrategy, &out.RetryStrategy + if *in == nil { + *out = nil + } else { + *out = new(RetryStrategy) + (*in).DeepCopyInto(*out) + } + } if in.Outputs != nil { in, out := &in.Outputs, &out.Outputs if *in == nil { @@ -387,6 +396,31 @@ func (in *ResourceTemplate) DeepCopy() *ResourceTemplate { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RetryStrategy) DeepCopyInto(out *RetryStrategy) { + *out = *in + if in.Limit != nil { + in, out := &in.Limit, &out.Limit + if *in == nil { + *out = nil + } else { + *out = new(int32) + **out = **in + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RetryStrategy. +func (in *RetryStrategy) DeepCopy() *RetryStrategy { + if in == nil { + return nil + } + out := new(RetryStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *S3Artifact) DeepCopyInto(out *S3Artifact) { *out = *in @@ -581,6 +615,15 @@ func (in *Template) DeepCopyInto(out *Template) { **out = **in } } + if in.RetryStrategy != nil { + in, out := &in.RetryStrategy, &out.RetryStrategy + if *in == nil { + *out = nil + } else { + *out = new(RetryStrategy) + (*in).DeepCopyInto(*out) + } + } return } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 1355dfc18b30..f7deae015577 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -57,6 +57,29 @@ type wfScope struct { scope map[string]interface{} } +// newWorkflowOperationCtx creates and initializes a new wfOperationCtx object. +func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOperationCtx { + woc := wfOperationCtx{ + wf: wf.DeepCopyObject().(*wfv1.Workflow), + orig: wf, + updated: false, + log: log.WithFields(log.Fields{ + "workflow": wf.ObjectMeta.Name, + "namespace": wf.ObjectMeta.Namespace, + }), + controller: wfc, + globalParams: make(map[string]string), + completedPods: make(map[string]bool), + deadline: time.Now().UTC().Add(maxOperationTime), + } + + if woc.wf.Status.Nodes == nil { + woc.wf.Status.Nodes = make(map[string]wfv1.NodeStatus) + } + + return &woc +} + // operateWorkflow is the main operator logic of a workflow. // It evaluates the current state of the workflow, and its pods // and decides how to proceed down the execution path. @@ -72,19 +95,7 @@ func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) { // NEVER modify objects from the store. It's a read-only, local cache. // You can use DeepCopy() to make a deep copy of original object and modify this copy // Or create a copy manually for better performance - woc := wfOperationCtx{ - wf: wf.DeepCopyObject().(*wfv1.Workflow), - orig: wf, - updated: false, - log: log.WithFields(log.Fields{ - "workflow": wf.ObjectMeta.Name, - "namespace": wf.ObjectMeta.Namespace, - }), - controller: wfc, - globalParams: make(map[string]string), - completedPods: make(map[string]bool), - deadline: time.Now().UTC().Add(maxOperationTime), - } + woc := newWorkflowOperationCtx(wf, wfc) defer woc.persistUpdates() defer func() { if r := recover(); r != nil { @@ -247,6 +258,46 @@ func (woc *wfOperationCtx) persistUpdates() { } } +func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus) error { + if node.Completed() { + return nil + } + lastChildNode, err := woc.getLastChildNode(node) + if err != nil { + return fmt.Errorf("Failed to find last child of node " + node.Name) + } + + if lastChildNode == nil { + return nil + } + + if !lastChildNode.Completed() { + // last child node is still running. + return nil + } + + if lastChildNode.Successful() { + node.Outputs = lastChildNode.Outputs.DeepCopy() + woc.markNodePhase(node.Name, wfv1.NodeSucceeded) + return nil + } + + if !lastChildNode.CanRetry() { + woc.log.Infof("Node cannot be retried. Marking it failed") + woc.markNodePhase(node.Name, wfv1.NodeFailed, lastChildNode.Message) + return nil + } + + if node.RetryStrategy.Limit != nil && int32(len(node.Children)) > *node.RetryStrategy.Limit { + woc.log.Infoln("No more retries left. Failing...") + woc.markNodePhase(node.Name, wfv1.NodeFailed, "No more retries left") + return nil + } + + woc.log.Infof("%d child nodes of %s failed. Trying again...", len(node.Children), node.Name) + return nil +} + // podReconciliation is the process by which a workflow will examine all its related // pods and update the node state before continuing the evaluation of the workflow. // Records all pods which were observed completed, which will be labeled completed=true @@ -258,10 +309,12 @@ func (woc *wfOperationCtx) podReconciliation() error { } seenPods := make(map[string]bool) for _, pod := range podList.Items { - seenPods[pod.ObjectMeta.Name] = true - if node, ok := woc.wf.Status.Nodes[pod.ObjectMeta.Name]; ok { + nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName] + nodeID := woc.wf.NodeID(nodeNameForPod) + seenPods[nodeID] = true + if node, ok := woc.wf.Status.Nodes[nodeID]; ok { if newState := assessNodeStatus(&pod, &node); newState != nil { - woc.wf.Status.Nodes[pod.ObjectMeta.Name] = *newState + woc.wf.Status.Nodes[nodeID] = *newState woc.updated = true } if woc.wf.Status.Nodes[pod.ObjectMeta.Name].Completed() { @@ -269,6 +322,7 @@ func (woc *wfOperationCtx) podReconciliation() error { } } } + if len(podList.Items) > 0 { return nil } @@ -288,10 +342,12 @@ func (woc *wfOperationCtx) podReconciliation() error { return err } for _, pod := range podList.Items { - seenPods[pod.ObjectMeta.Name] = true - if node, ok := woc.wf.Status.Nodes[pod.ObjectMeta.Name]; ok { + nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName] + nodeID := woc.wf.NodeID(nodeNameForPod) + seenPods[nodeID] = true + if node, ok := woc.wf.Status.Nodes[nodeID]; ok { if newState := assessNodeStatus(&pod, &node); newState != nil { - woc.wf.Status.Nodes[pod.ObjectMeta.Name] = *newState + woc.wf.Status.Nodes[nodeID] = *newState woc.updated = true } if woc.wf.Status.Nodes[pod.ObjectMeta.Name].Completed() { @@ -299,6 +355,7 @@ func (woc *wfOperationCtx) podReconciliation() error { } } } + // Now iterate the workflow pod nodes which we still believe to be incomplete. // If the pod was not seen in the pod list, it means the pod was deleted and it // is now impossible to infer status. The only thing we can do at this point is @@ -308,6 +365,7 @@ func (woc *wfOperationCtx) podReconciliation() error { // node is not a pod, or it is already complete continue } + if _, ok := seenPods[nodeID]; !ok { node.Message = "pod deleted" node.Phase = wfv1.NodeError @@ -633,8 +691,32 @@ func (woc *wfOperationCtx) deletePVCs() error { return firstErr } +func (woc *wfOperationCtx) getLastChildNode(node *wfv1.NodeStatus) (*wfv1.NodeStatus, error) { + if len(node.Children) <= 0 { + return nil, nil + } + + lastChildNodeName := node.Children[len(node.Children)-1] + lastChildNode, ok := woc.wf.Status.Nodes[lastChildNodeName] + if !ok { + return nil, fmt.Errorf("Failed to find node " + lastChildNodeName) + } + + return &lastChildNode, nil +} + +func (woc *wfOperationCtx) getNode(nodeName string) wfv1.NodeStatus { + nodeID := woc.wf.NodeID(nodeName) + node, ok := woc.wf.Status.Nodes[nodeID] + if !ok { + panic("Failed to find node " + nodeName) + } + + return node +} + func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Arguments, nodeName string) error { - woc.log.Debugf("Evaluating node %s: template: %s", nodeName, templateName) + woc.log.Infof("Evaluating node %s: template: %s", nodeName, templateName) nodeID := woc.wf.NodeID(nodeName) node, ok := woc.wf.Status.Nodes[nodeID] if ok && node.Completed() { @@ -657,12 +739,58 @@ func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Argume switch tmpl.GetType() { case wfv1.TemplateTypeContainer: if ok { - // There's already a node entry for the container. This means the container was already - // scheduled (or had a create pod error). Nothing to more to do with this node. - return nil + if node.RetryStrategy != nil { + if err = woc.processNodeRetries(&node); err != nil { + return err + } + + // The updated node status could've changed. Get the latest copy of the node. + node = woc.getNode(node.Name) + fmt.Printf("Node %s: Status: %s\n", node.Name, node.Phase) + if node.Completed() { + return nil + } + lastChildNode, err := woc.getLastChildNode(&node) + if err != nil { + return err + } + if !lastChildNode.Completed() { + // last child node is still running. + return nil + } + } else { + // There are no retries configured and there's already a node entry for the container. + // This means the container was already scheduled (or had a create pod error). Nothing + // to more to do with this node. + return nil + } } + + // If the user has specified retries, a special "retries" non-leaf node + // is created. This node acts as the parent of all retries that will be + // done for the container. The status of this node should be "Success" + // if any of the retries succeed. Otherwise, it is "Failed". + + // TODO(shri): Mark the current node as a "retry" node + // Create a new child node as the first attempt node and + // run the template in that node. + nodeToExecute := nodeName + if tmpl.RetryStrategy != nil { + node := woc.markNodePhase(nodeName, wfv1.NodeRunning) + retries := wfv1.RetryStrategy{} + node.RetryStrategy = &retries + node.RetryStrategy.Limit = tmpl.RetryStrategy.Limit + woc.wf.Status.Nodes[nodeID] = *node + + // Create new node as child of 'node' + newContainerName := fmt.Sprintf("%s(%d)", nodeName, len(node.Children)) + woc.markNodePhase(newContainerName, wfv1.NodeRunning) + woc.addChildNode(nodeName, newContainerName) + nodeToExecute = newContainerName + } + // We have not yet created the pod - err = woc.executeContainer(nodeName, tmpl) + err = woc.executeContainer(nodeToExecute, tmpl) case wfv1.TemplateTypeSteps: if !ok { node = *woc.markNodePhase(nodeName, wfv1.NodeRunning) @@ -741,9 +869,6 @@ func (woc *wfOperationCtx) markWorkflowError(err error, markCompleted bool) { // markNodePhase marks a node with the given phase, creating the node if necessary and handles timestamps func (woc *wfOperationCtx) markNodePhase(nodeName string, phase wfv1.NodePhase, message ...string) *wfv1.NodeStatus { - if woc.wf.Status.Nodes == nil { - woc.wf.Status.Nodes = make(map[string]wfv1.NodeStatus) - } nodeID := woc.wf.NodeID(nodeName) node, ok := woc.wf.Status.Nodes[nodeID] if !ok { @@ -764,6 +889,7 @@ func (woc *wfOperationCtx) markNodePhase(nodeName string, phase wfv1.NodePhase, } woc.wf.Status.Nodes[nodeID] = node woc.updated = true + woc.log.Debugf("Marked node %s %s", nodeName, phase) return &node } @@ -773,6 +899,7 @@ func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeS } func (woc *wfOperationCtx) executeContainer(nodeName string, tmpl *wfv1.Template) error { + woc.log.Infof("Executing node %s with container template: %v\n", nodeName, tmpl) pod, err := woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl) if err != nil { woc.markNodeError(nodeName, err) diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go new file mode 100644 index 000000000000..35321033c7cc --- /dev/null +++ b/workflow/controller/operator_test.go @@ -0,0 +1,86 @@ +package controller + +import ( + "fmt" + "testing" + + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + "github.com/stretchr/testify/assert" +) + +// TestProcessNodesWithRetries tests the processNodesWithRetries() method. +func TestProcessNodesWithRetries(t *testing.T) { + controller := newController() + assert.NotNil(t, controller) + wf := unmarshalWF(helloWorldWf) + assert.NotNil(t, wf) + woc := newWorkflowOperationCtx(wf, controller) + assert.NotNil(t, woc) + + // Verify that there are no nodes in the wf status. + assert.Zero(t, len(woc.wf.Status.Nodes)) + + // Add the parent node for retries. + nodeName := "test-node" + nodeID := woc.wf.NodeID(nodeName) + node := woc.markNodePhase(nodeName, wfv1.NodeRunning) + retries := wfv1.RetryStrategy{} + var retryLimit int32 + retryLimit = 2 + retries.Limit = &retryLimit + node.RetryStrategy = &retries + woc.wf.Status.Nodes[nodeID] = *node + + retryNodes := woc.wf.Status.GetNodesWithRetries() + assert.Equal(t, len(retryNodes), 1) + assert.Equal(t, node.Phase, wfv1.NodeRunning) + + // Ensure there are no child nodes yet. + lastChild, err := woc.getLastChildNode(node) + assert.Nil(t, err) + assert.Nil(t, lastChild) + + // Add child nodes. + for i := 0; i < 2; i++ { + childNode := fmt.Sprintf("child-node-%d", i) + woc.markNodePhase(childNode, wfv1.NodeRunning) + woc.addChildNode(nodeName, childNode) + } + + n := woc.getNode(nodeName) + lastChild, err = woc.getLastChildNode(&n) + assert.Nil(t, err) + assert.NotNil(t, lastChild) + + // Last child is still running. processNodesWithRetries() should return false since + // there should be no retries at this point. + err = woc.processNodeRetries(&n) + assert.Nil(t, err) + n = woc.getNode(nodeName) + assert.Equal(t, n.Phase, wfv1.NodeRunning) + + // Mark lastChild as successful. + woc.markNodePhase(lastChild.Name, wfv1.NodeSucceeded) + err = woc.processNodeRetries(&n) + assert.Nil(t, err) + // The parent node also gets marked as Succeeded. + n = woc.getNode(nodeName) + assert.Equal(t, n.Phase, wfv1.NodeSucceeded) + + // Mark the parent node as running again and the lastChild as failed. + woc.markNodePhase(n.Name, wfv1.NodeRunning) + woc.markNodePhase(lastChild.Name, wfv1.NodeFailed) + woc.processNodeRetries(&n) + n = woc.getNode(nodeName) + assert.Equal(t, n.Phase, wfv1.NodeRunning) + + // Add a third node that has failed. + childNode := "child-node-3" + woc.markNodePhase(childNode, wfv1.NodeFailed) + woc.addChildNode(nodeName, childNode) + n = woc.getNode(nodeName) + err = woc.processNodeRetries(&n) + assert.Nil(t, err) + n = woc.getNode(nodeName) + assert.Equal(t, n.Phase, wfv1.NodeFailed) +} diff --git a/workflow/controller/workflowpod_test.go b/workflow/controller/workflowpod_test.go index 29245d1f1110..2b68f546eacc 100644 --- a/workflow/controller/workflowpod_test.go +++ b/workflow/controller/workflowpod_test.go @@ -38,6 +38,9 @@ func newWoc(wfs ...wfv1.Workflow) *wfOperationCtx { controller: newController(), completedPods: make(map[string]bool), } + if woc.wf.Status.Nodes == nil { + woc.wf.Status.Nodes = make(map[string]wfv1.NodeStatus) + } return &woc }