From 290f6799752ef602b27c193212495e27f40dd687 Mon Sep 17 00:00:00 2001 From: Shri Javadekar Date: Thu, 11 Jan 2018 10:30:57 -0800 Subject: [PATCH] Allow containers to be retried. (#661) * Allow containers to be retried. This commit allows `container` templates to be retried. The user simply adds a 'retries' section to the templated. Currently, this section only has a 'limit' field that has the number of times the container should be retried in case of failure. Subsequent commits will add more features such as retries policies, retries on specific errors, erc. When a container template has a retries section, the workflow controller adds an intermediate node to the workflow. This node acts as a parent of all the retries. The argo command line tool's 'get' command correctly shows the intermediate node and the child nodes along with their pod names and status'. Added unit tests that check the various state transitions of the intermediate node based on the status of the child nodes (which actually execute the pod). Testing Done. * Unit tests succeeded. * Ran simple workflow that had retries. It completed and status of the nodes was correct. * Ran workflows with parallel steps, each of which had containers with retries. The workflow completed correctly and the retries were applied to each individual container correctly. * Ran other workflows which did not have retries. The completed correctly. * `argo get ` shows the retries and the pod names. * Incorporate review comments. * Rename example files for retries. * Update autogenerated code to include RetryStrategy. * Call processNodeRetries from executeTemplate. Earlier, this was getting called during podReconciliation. Calling this from executeTemplate is better since other non-leaf nodes also getting processed in executeTemplate. podReconciliation should only process leaf-nodes (which are based on execution of pods). * Reduce failure probabilities of retry examples. This will prevent spurious failures during e2e tests. Users wanting to experiment with retries will have to explicitly change the failure probability in the yamls. --- cmd/argo/commands/get.go | 105 +++++----- examples/retry-container-to-completion.yaml | 19 ++ examples/retry-container.yaml | 19 ++ examples/retry-with-steps.yaml | 41 ++++ pkg/apis/workflow/v1alpha1/types.go | 25 +++ .../v1alpha1/zz_generated.deepcopy.go | 43 +++++ workflow/controller/operator.go | 181 +++++++++++++++--- workflow/controller/operator_test.go | 86 +++++++++ workflow/controller/workflowpod_test.go | 3 + 9 files changed, 452 insertions(+), 70 deletions(-) create mode 100644 examples/retry-container-to-completion.yaml create mode 100644 examples/retry-container.yaml create mode 100644 examples/retry-with-steps.yaml create mode 100644 workflow/controller/operator_test.go diff --git a/cmd/argo/commands/get.go b/cmd/argo/commands/get.go index 89f83394e567..440ddb490fe8 100644 --- a/cmd/argo/commands/get.go +++ b/cmd/argo/commands/get.go @@ -148,57 +148,76 @@ func printNodeTree(w *tabwriter.Writer, wf *wfv1.Workflow, node wfv1.NodeStatus, fmt.Fprintf(w, "%s%s\t%s\t%s\n", args...) } - // If the node has children, the node is a workflow template and - // node.Children prepresent a list of parallel steps. We skip - // a generation when recursing since the children nodes of workflow - // templates represent a virtual step group, which are not worh printing. - for i, stepGroupNodeID := range node.Children { - lastStepGroup := bool(i == len(node.Children)-1) - var part1, subp1 string - if lastStepGroup { - part1 = "└-" + if node.RetryStrategy != nil { + for i, childNodeID := range node.Children { + var part1, subp1 string subp1 = " " - } else { - part1 = "├-" - subp1 = "| " - } - stepGroupNode := wf.Status.Nodes[stepGroupNodeID] - for j, childNodeID := range stepGroupNode.Children { + childNode := wf.Status.Nodes[childNodeID] - if j > 0 { - if lastStepGroup { - part1 = " " - } else { - part1 = "| " - } + if i > 0 && i < len(node.Children)-1 { + part1 = "├-" + } else { + part1 = "└-" } - firstParallel := bool(j == 0) - lastParallel := bool(j == len(stepGroupNode.Children)-1) var part2, subp2 string - if firstParallel { - if len(stepGroupNode.Children) == 1 { - part2 = "--" - } else { - part2 = "·-" + part2 = "--" + childNodePrefix := childPrefix + part1 + part2 + childChldPrefix := childPrefix + subp1 + subp2 + printNodeTree(w, wf, childNode, depth+1, childNodePrefix, childChldPrefix) + } + } else { + // If the node has children, the node is a workflow template and + // node.Children prepresent a list of parallel steps. We skip + // a generation when recursing since the children nodes of workflow + // templates represent a virtual step group, which are not worh printing. + for i, stepGroupNodeID := range node.Children { + lastStepGroup := bool(i == len(node.Children)-1) + var part1, subp1 string + if lastStepGroup { + part1 = "└-" + subp1 = " " + } else { + part1 = "├-" + subp1 = "| " + } + stepGroupNode := wf.Status.Nodes[stepGroupNodeID] + for j, childNodeID := range stepGroupNode.Children { + childNode := wf.Status.Nodes[childNodeID] + if j > 0 { + if lastStepGroup { + part1 = " " + } else { + part1 = "| " + } } - if !lastParallel { - subp2 = "| " - } else { + firstParallel := bool(j == 0) + lastParallel := bool(j == len(stepGroupNode.Children)-1) + var part2, subp2 string + if firstParallel { + if len(stepGroupNode.Children) == 1 { + part2 = "--" + } else { + part2 = "·-" + } + if !lastParallel { + subp2 = "| " + } else { + subp2 = " " + } + + } else if lastParallel { + part2 = "└-" subp2 = " " + } else { + part2 = "├-" + subp2 = "| " } - - } else if lastParallel { - part2 = "└-" - subp2 = " " - } else { - part2 = "├-" - subp2 = "| " + childNodePrefix := childPrefix + part1 + part2 + childChldPrefix := childPrefix + subp1 + subp2 + // Remove stepgroup name from being displayed + childNode.Name = strings.TrimPrefix(childNode.Name, stepGroupNode.Name+".") + printNodeTree(w, wf, childNode, depth+1, childNodePrefix, childChldPrefix) } - childNodePrefix := childPrefix + part1 + part2 - childChldPrefix := childPrefix + subp1 + subp2 - // Remove stepgroup name from being displayed - childNode.Name = strings.TrimPrefix(childNode.Name, stepGroupNode.Name+".") - printNodeTree(w, wf, childNode, depth+1, childNodePrefix, childChldPrefix) } } } diff --git a/examples/retry-container-to-completion.yaml b/examples/retry-container-to-completion.yaml new file mode 100644 index 000000000000..ca8dac3bb954 --- /dev/null +++ b/examples/retry-container-to-completion.yaml @@ -0,0 +1,19 @@ +# This example demonstrates the use of infinite retries for running +# the container to completion. It uses the `random-fail` container. +# For more details, see +# https://github.com/shrinandj/random-fail + +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: container-retries- +spec: + entrypoint: container-retries + templates: + - name: container-retries + retryStrategy: {} + container: + image: shrinand/random-fail + command: ["python"] + args: ["/run.py", "40"] + diff --git a/examples/retry-container.yaml b/examples/retry-container.yaml new file mode 100644 index 000000000000..08ab3ea5461e --- /dev/null +++ b/examples/retry-container.yaml @@ -0,0 +1,19 @@ +# This example demonstrates the use of retries for a single container. +# It uses the `random-fail` container. For more details, see +# https://github.com/shrinandj/random-fail + +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: container-retries- +spec: + entrypoint: container-retries + templates: + - name: container-retries + retryStrategy: + limit: 4 + container: + image: shrinand/random-fail + command: ["python"] + args: ["/run.py", "0"] + diff --git a/examples/retry-with-steps.yaml b/examples/retry-with-steps.yaml new file mode 100644 index 000000000000..d658e0ff6ed0 --- /dev/null +++ b/examples/retry-with-steps.yaml @@ -0,0 +1,41 @@ +# This example demonstrates the use of retries with steps. +# It uses the `random-fail` container. For more details, see +# https://github.com/shrinandj/random-fail + +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: retry-with-steps- +spec: + entrypoint: hello-hello-hello + templates: + - name: hello-hello-hello + steps: + - - name: hello1 + template: random-fail + arguments: + parameters: + - name: failPct + value: "0" + - - name: hello2a + template: random-fail + arguments: + parameters: + - name: failPct + value: "0" + - name: hello2b + template: random-fail + arguments: + parameters: + - name: failPct + value: "0" + - name: random-fail + inputs: + parameters: + - name: failPct + retryStrategy: + limit: 4 + container: + image: shrinand/random-fail + command: ["python"] + args: ["/run.py", "{{inputs.parameters.failPct}}"] diff --git a/pkg/apis/workflow/v1alpha1/types.go b/pkg/apis/workflow/v1alpha1/types.go index e15b0a33013a..4214197b2679 100644 --- a/pkg/apis/workflow/v1alpha1/types.go +++ b/pkg/apis/workflow/v1alpha1/types.go @@ -128,6 +128,8 @@ type Template struct { // before the system actively tries to terminate the pod; value must be positive integer // This field is only applicable to container and script templates. ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` + + RetryStrategy *RetryStrategy `json:"retryStrategy,omitempty"` } // Inputs are the mechanism for passing parameters, artifacts, volumes from one template to another @@ -272,6 +274,21 @@ type WorkflowStatus struct { PersistentVolumeClaims []apiv1.Volume `json:"persistentVolumeClaims,omitempty"` } +// GetNodesWithRetries returns a list of nodes that have retries. +func (wfs *WorkflowStatus) GetNodesWithRetries() []NodeStatus { + var nodesWithRetries []NodeStatus + for _, node := range wfs.Nodes { + if node.RetryStrategy != nil { + nodesWithRetries = append(nodesWithRetries, node) + } + } + return nodesWithRetries +} + +type RetryStrategy struct { + Limit *int32 `json:"limit,omitempty"` +} + type NodeStatus struct { // ID is a unique identifier of a node within the worklow // It is implemented as a hash of the node name, which makes the ID deterministic @@ -300,6 +317,8 @@ type NodeStatus struct { // Daemoned tracks whether or not this node was daemoned and need to be terminated Daemoned *bool `json:"daemoned,omitempty"` + RetryStrategy *RetryStrategy `json:"retryStrategy,omitempty"` + // Outputs captures output parameter values and artifact locations Outputs *Outputs `json:"outputs,omitempty"` @@ -332,6 +351,12 @@ func (n NodeStatus) Successful() bool { return n.Phase == NodeSucceeded || n.Phase == NodeSkipped } +// CanRetry returns whether the node should be retried or not. +func (n NodeStatus) CanRetry() bool { + // TODO(shri): Check if there are some 'unretryable' errors. + return n.Completed() && !n.Successful() +} + type S3Bucket struct { Endpoint string `json:"endpoint"` Bucket string `json:"bucket"` diff --git a/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go index cfff6274ad79..4c2414e8e315 100644 --- a/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go @@ -262,6 +262,15 @@ func (in *NodeStatus) DeepCopyInto(out *NodeStatus) { **out = **in } } + if in.RetryStrategy != nil { + in, out := &in.RetryStrategy, &out.RetryStrategy + if *in == nil { + *out = nil + } else { + *out = new(RetryStrategy) + (*in).DeepCopyInto(*out) + } + } if in.Outputs != nil { in, out := &in.Outputs, &out.Outputs if *in == nil { @@ -387,6 +396,31 @@ func (in *ResourceTemplate) DeepCopy() *ResourceTemplate { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RetryStrategy) DeepCopyInto(out *RetryStrategy) { + *out = *in + if in.Limit != nil { + in, out := &in.Limit, &out.Limit + if *in == nil { + *out = nil + } else { + *out = new(int32) + **out = **in + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RetryStrategy. +func (in *RetryStrategy) DeepCopy() *RetryStrategy { + if in == nil { + return nil + } + out := new(RetryStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *S3Artifact) DeepCopyInto(out *S3Artifact) { *out = *in @@ -581,6 +615,15 @@ func (in *Template) DeepCopyInto(out *Template) { **out = **in } } + if in.RetryStrategy != nil { + in, out := &in.RetryStrategy, &out.RetryStrategy + if *in == nil { + *out = nil + } else { + *out = new(RetryStrategy) + (*in).DeepCopyInto(*out) + } + } return } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 1355dfc18b30..f7deae015577 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -57,6 +57,29 @@ type wfScope struct { scope map[string]interface{} } +// newWorkflowOperationCtx creates and initializes a new wfOperationCtx object. +func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOperationCtx { + woc := wfOperationCtx{ + wf: wf.DeepCopyObject().(*wfv1.Workflow), + orig: wf, + updated: false, + log: log.WithFields(log.Fields{ + "workflow": wf.ObjectMeta.Name, + "namespace": wf.ObjectMeta.Namespace, + }), + controller: wfc, + globalParams: make(map[string]string), + completedPods: make(map[string]bool), + deadline: time.Now().UTC().Add(maxOperationTime), + } + + if woc.wf.Status.Nodes == nil { + woc.wf.Status.Nodes = make(map[string]wfv1.NodeStatus) + } + + return &woc +} + // operateWorkflow is the main operator logic of a workflow. // It evaluates the current state of the workflow, and its pods // and decides how to proceed down the execution path. @@ -72,19 +95,7 @@ func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) { // NEVER modify objects from the store. It's a read-only, local cache. // You can use DeepCopy() to make a deep copy of original object and modify this copy // Or create a copy manually for better performance - woc := wfOperationCtx{ - wf: wf.DeepCopyObject().(*wfv1.Workflow), - orig: wf, - updated: false, - log: log.WithFields(log.Fields{ - "workflow": wf.ObjectMeta.Name, - "namespace": wf.ObjectMeta.Namespace, - }), - controller: wfc, - globalParams: make(map[string]string), - completedPods: make(map[string]bool), - deadline: time.Now().UTC().Add(maxOperationTime), - } + woc := newWorkflowOperationCtx(wf, wfc) defer woc.persistUpdates() defer func() { if r := recover(); r != nil { @@ -247,6 +258,46 @@ func (woc *wfOperationCtx) persistUpdates() { } } +func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus) error { + if node.Completed() { + return nil + } + lastChildNode, err := woc.getLastChildNode(node) + if err != nil { + return fmt.Errorf("Failed to find last child of node " + node.Name) + } + + if lastChildNode == nil { + return nil + } + + if !lastChildNode.Completed() { + // last child node is still running. + return nil + } + + if lastChildNode.Successful() { + node.Outputs = lastChildNode.Outputs.DeepCopy() + woc.markNodePhase(node.Name, wfv1.NodeSucceeded) + return nil + } + + if !lastChildNode.CanRetry() { + woc.log.Infof("Node cannot be retried. Marking it failed") + woc.markNodePhase(node.Name, wfv1.NodeFailed, lastChildNode.Message) + return nil + } + + if node.RetryStrategy.Limit != nil && int32(len(node.Children)) > *node.RetryStrategy.Limit { + woc.log.Infoln("No more retries left. Failing...") + woc.markNodePhase(node.Name, wfv1.NodeFailed, "No more retries left") + return nil + } + + woc.log.Infof("%d child nodes of %s failed. Trying again...", len(node.Children), node.Name) + return nil +} + // podReconciliation is the process by which a workflow will examine all its related // pods and update the node state before continuing the evaluation of the workflow. // Records all pods which were observed completed, which will be labeled completed=true @@ -258,10 +309,12 @@ func (woc *wfOperationCtx) podReconciliation() error { } seenPods := make(map[string]bool) for _, pod := range podList.Items { - seenPods[pod.ObjectMeta.Name] = true - if node, ok := woc.wf.Status.Nodes[pod.ObjectMeta.Name]; ok { + nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName] + nodeID := woc.wf.NodeID(nodeNameForPod) + seenPods[nodeID] = true + if node, ok := woc.wf.Status.Nodes[nodeID]; ok { if newState := assessNodeStatus(&pod, &node); newState != nil { - woc.wf.Status.Nodes[pod.ObjectMeta.Name] = *newState + woc.wf.Status.Nodes[nodeID] = *newState woc.updated = true } if woc.wf.Status.Nodes[pod.ObjectMeta.Name].Completed() { @@ -269,6 +322,7 @@ func (woc *wfOperationCtx) podReconciliation() error { } } } + if len(podList.Items) > 0 { return nil } @@ -288,10 +342,12 @@ func (woc *wfOperationCtx) podReconciliation() error { return err } for _, pod := range podList.Items { - seenPods[pod.ObjectMeta.Name] = true - if node, ok := woc.wf.Status.Nodes[pod.ObjectMeta.Name]; ok { + nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName] + nodeID := woc.wf.NodeID(nodeNameForPod) + seenPods[nodeID] = true + if node, ok := woc.wf.Status.Nodes[nodeID]; ok { if newState := assessNodeStatus(&pod, &node); newState != nil { - woc.wf.Status.Nodes[pod.ObjectMeta.Name] = *newState + woc.wf.Status.Nodes[nodeID] = *newState woc.updated = true } if woc.wf.Status.Nodes[pod.ObjectMeta.Name].Completed() { @@ -299,6 +355,7 @@ func (woc *wfOperationCtx) podReconciliation() error { } } } + // Now iterate the workflow pod nodes which we still believe to be incomplete. // If the pod was not seen in the pod list, it means the pod was deleted and it // is now impossible to infer status. The only thing we can do at this point is @@ -308,6 +365,7 @@ func (woc *wfOperationCtx) podReconciliation() error { // node is not a pod, or it is already complete continue } + if _, ok := seenPods[nodeID]; !ok { node.Message = "pod deleted" node.Phase = wfv1.NodeError @@ -633,8 +691,32 @@ func (woc *wfOperationCtx) deletePVCs() error { return firstErr } +func (woc *wfOperationCtx) getLastChildNode(node *wfv1.NodeStatus) (*wfv1.NodeStatus, error) { + if len(node.Children) <= 0 { + return nil, nil + } + + lastChildNodeName := node.Children[len(node.Children)-1] + lastChildNode, ok := woc.wf.Status.Nodes[lastChildNodeName] + if !ok { + return nil, fmt.Errorf("Failed to find node " + lastChildNodeName) + } + + return &lastChildNode, nil +} + +func (woc *wfOperationCtx) getNode(nodeName string) wfv1.NodeStatus { + nodeID := woc.wf.NodeID(nodeName) + node, ok := woc.wf.Status.Nodes[nodeID] + if !ok { + panic("Failed to find node " + nodeName) + } + + return node +} + func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Arguments, nodeName string) error { - woc.log.Debugf("Evaluating node %s: template: %s", nodeName, templateName) + woc.log.Infof("Evaluating node %s: template: %s", nodeName, templateName) nodeID := woc.wf.NodeID(nodeName) node, ok := woc.wf.Status.Nodes[nodeID] if ok && node.Completed() { @@ -657,12 +739,58 @@ func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Argume switch tmpl.GetType() { case wfv1.TemplateTypeContainer: if ok { - // There's already a node entry for the container. This means the container was already - // scheduled (or had a create pod error). Nothing to more to do with this node. - return nil + if node.RetryStrategy != nil { + if err = woc.processNodeRetries(&node); err != nil { + return err + } + + // The updated node status could've changed. Get the latest copy of the node. + node = woc.getNode(node.Name) + fmt.Printf("Node %s: Status: %s\n", node.Name, node.Phase) + if node.Completed() { + return nil + } + lastChildNode, err := woc.getLastChildNode(&node) + if err != nil { + return err + } + if !lastChildNode.Completed() { + // last child node is still running. + return nil + } + } else { + // There are no retries configured and there's already a node entry for the container. + // This means the container was already scheduled (or had a create pod error). Nothing + // to more to do with this node. + return nil + } } + + // If the user has specified retries, a special "retries" non-leaf node + // is created. This node acts as the parent of all retries that will be + // done for the container. The status of this node should be "Success" + // if any of the retries succeed. Otherwise, it is "Failed". + + // TODO(shri): Mark the current node as a "retry" node + // Create a new child node as the first attempt node and + // run the template in that node. + nodeToExecute := nodeName + if tmpl.RetryStrategy != nil { + node := woc.markNodePhase(nodeName, wfv1.NodeRunning) + retries := wfv1.RetryStrategy{} + node.RetryStrategy = &retries + node.RetryStrategy.Limit = tmpl.RetryStrategy.Limit + woc.wf.Status.Nodes[nodeID] = *node + + // Create new node as child of 'node' + newContainerName := fmt.Sprintf("%s(%d)", nodeName, len(node.Children)) + woc.markNodePhase(newContainerName, wfv1.NodeRunning) + woc.addChildNode(nodeName, newContainerName) + nodeToExecute = newContainerName + } + // We have not yet created the pod - err = woc.executeContainer(nodeName, tmpl) + err = woc.executeContainer(nodeToExecute, tmpl) case wfv1.TemplateTypeSteps: if !ok { node = *woc.markNodePhase(nodeName, wfv1.NodeRunning) @@ -741,9 +869,6 @@ func (woc *wfOperationCtx) markWorkflowError(err error, markCompleted bool) { // markNodePhase marks a node with the given phase, creating the node if necessary and handles timestamps func (woc *wfOperationCtx) markNodePhase(nodeName string, phase wfv1.NodePhase, message ...string) *wfv1.NodeStatus { - if woc.wf.Status.Nodes == nil { - woc.wf.Status.Nodes = make(map[string]wfv1.NodeStatus) - } nodeID := woc.wf.NodeID(nodeName) node, ok := woc.wf.Status.Nodes[nodeID] if !ok { @@ -764,6 +889,7 @@ func (woc *wfOperationCtx) markNodePhase(nodeName string, phase wfv1.NodePhase, } woc.wf.Status.Nodes[nodeID] = node woc.updated = true + woc.log.Debugf("Marked node %s %s", nodeName, phase) return &node } @@ -773,6 +899,7 @@ func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeS } func (woc *wfOperationCtx) executeContainer(nodeName string, tmpl *wfv1.Template) error { + woc.log.Infof("Executing node %s with container template: %v\n", nodeName, tmpl) pod, err := woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl) if err != nil { woc.markNodeError(nodeName, err) diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go new file mode 100644 index 000000000000..35321033c7cc --- /dev/null +++ b/workflow/controller/operator_test.go @@ -0,0 +1,86 @@ +package controller + +import ( + "fmt" + "testing" + + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + "github.com/stretchr/testify/assert" +) + +// TestProcessNodesWithRetries tests the processNodesWithRetries() method. +func TestProcessNodesWithRetries(t *testing.T) { + controller := newController() + assert.NotNil(t, controller) + wf := unmarshalWF(helloWorldWf) + assert.NotNil(t, wf) + woc := newWorkflowOperationCtx(wf, controller) + assert.NotNil(t, woc) + + // Verify that there are no nodes in the wf status. + assert.Zero(t, len(woc.wf.Status.Nodes)) + + // Add the parent node for retries. + nodeName := "test-node" + nodeID := woc.wf.NodeID(nodeName) + node := woc.markNodePhase(nodeName, wfv1.NodeRunning) + retries := wfv1.RetryStrategy{} + var retryLimit int32 + retryLimit = 2 + retries.Limit = &retryLimit + node.RetryStrategy = &retries + woc.wf.Status.Nodes[nodeID] = *node + + retryNodes := woc.wf.Status.GetNodesWithRetries() + assert.Equal(t, len(retryNodes), 1) + assert.Equal(t, node.Phase, wfv1.NodeRunning) + + // Ensure there are no child nodes yet. + lastChild, err := woc.getLastChildNode(node) + assert.Nil(t, err) + assert.Nil(t, lastChild) + + // Add child nodes. + for i := 0; i < 2; i++ { + childNode := fmt.Sprintf("child-node-%d", i) + woc.markNodePhase(childNode, wfv1.NodeRunning) + woc.addChildNode(nodeName, childNode) + } + + n := woc.getNode(nodeName) + lastChild, err = woc.getLastChildNode(&n) + assert.Nil(t, err) + assert.NotNil(t, lastChild) + + // Last child is still running. processNodesWithRetries() should return false since + // there should be no retries at this point. + err = woc.processNodeRetries(&n) + assert.Nil(t, err) + n = woc.getNode(nodeName) + assert.Equal(t, n.Phase, wfv1.NodeRunning) + + // Mark lastChild as successful. + woc.markNodePhase(lastChild.Name, wfv1.NodeSucceeded) + err = woc.processNodeRetries(&n) + assert.Nil(t, err) + // The parent node also gets marked as Succeeded. + n = woc.getNode(nodeName) + assert.Equal(t, n.Phase, wfv1.NodeSucceeded) + + // Mark the parent node as running again and the lastChild as failed. + woc.markNodePhase(n.Name, wfv1.NodeRunning) + woc.markNodePhase(lastChild.Name, wfv1.NodeFailed) + woc.processNodeRetries(&n) + n = woc.getNode(nodeName) + assert.Equal(t, n.Phase, wfv1.NodeRunning) + + // Add a third node that has failed. + childNode := "child-node-3" + woc.markNodePhase(childNode, wfv1.NodeFailed) + woc.addChildNode(nodeName, childNode) + n = woc.getNode(nodeName) + err = woc.processNodeRetries(&n) + assert.Nil(t, err) + n = woc.getNode(nodeName) + assert.Equal(t, n.Phase, wfv1.NodeFailed) +} diff --git a/workflow/controller/workflowpod_test.go b/workflow/controller/workflowpod_test.go index 29245d1f1110..2b68f546eacc 100644 --- a/workflow/controller/workflowpod_test.go +++ b/workflow/controller/workflowpod_test.go @@ -38,6 +38,9 @@ func newWoc(wfs ...wfv1.Workflow) *wfOperationCtx { controller: newController(), completedPods: make(map[string]bool), } + if woc.wf.Status.Nodes == nil { + woc.wf.Status.Nodes = make(map[string]wfv1.NodeStatus) + } return &woc }