Skip to content

Commit

Permalink
fix: retry node with expression status Running -> Pending (#12637)
Browse files Browse the repository at this point in the history
Signed-off-by: shuangkun <[email protected]>
Signed-off-by: Isitha Subasinghe <[email protected]>
  • Loading branch information
shuangkun authored and isubasinghe committed Feb 28, 2024
1 parent c95c6ab commit 9b69363
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 1 deletion.
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
if err != nil {
return nil, false, err
}
if !shouldContinue {
if !shouldContinue && lastChildNode.Fulfilled() {
return woc.markNodePhase(node.Name, lastChildNode.Phase, "retryStrategy.expression evaluated to false"), true, nil
}
}
Expand Down
80 changes: 80 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,86 @@ func TestProcessNodeRetriesWithExponentialBackoff(t *testing.T) {
require.Equal(wfv1.NodeSucceeded, n.Phase)
}

// TestProcessNodeRetries tests retrying with Expression
func TestProcessNodeRetriesWithExpression(t *testing.T) {
cancel, controller := newController()
defer cancel()
assert.NotNil(t, controller)
wf := wfv1.MustUnmarshalWorkflow(helloWorldWf)
assert.NotNil(t, wf)
woc := newWorkflowOperationCtx(wf, controller)
assert.NotNil(t, woc)
// Verify that there are no nodes in the wf status.
assert.Zero(t, len(woc.wf.Status.Nodes))

// Add the parent node for retries.
nodeName := "test-node"
nodeID := woc.wf.NodeID(nodeName)
node := woc.initializeNode(nodeName, wfv1.NodeTypeRetry, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning, &wfv1.NodeFlag{})
retries := wfv1.RetryStrategy{}
retries.Expression = "false"
retries.Limit = intstrutil.ParsePtr("2")
retries.RetryPolicy = wfv1.RetryPolicyAlways
woc.wf.Status.Nodes[nodeID] = *node

assert.Equal(t, node.Phase, wfv1.NodeRunning)

// Ensure there are no child nodes yet.
lastChild := getChildNodeIndex(node, woc.wf.Status.Nodes, -1)
assert.Nil(t, lastChild)

// Add child nodes.
for i := 0; i < 2; i++ {
childNode := fmt.Sprintf("%s(%d)", nodeName, i)
woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning, &wfv1.NodeFlag{Retried: true})
woc.addChildNode(nodeName, childNode)
}

n, err := woc.wf.GetNodeByName(nodeName)
assert.NoError(t, err)
lastChild = getChildNodeIndex(n, woc.wf.Status.Nodes, -1)
assert.NotNil(t, lastChild)

// Last child is still running. processNodeRetries() should return false since
// there should be no retries at this point.
n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{})
assert.NoError(t, err)
assert.Equal(t, n.Phase, wfv1.NodeRunning)

// Mark lastChild Pending.
woc.markNodePhase(lastChild.Name, wfv1.NodePending)
n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{})
assert.Nil(t, err)
assert.Equal(t, n.Phase, wfv1.NodeRunning)

// Mark lastChild as successful.
woc.markNodePhase(lastChild.Name, wfv1.NodeSucceeded)
n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{})
assert.Nil(t, err)
// The parent node also gets marked as Succeeded.
assert.Equal(t, n.Phase, wfv1.NodeSucceeded)

// Mark the parent node as running again and the lastChild as errored.
n = woc.markNodePhase(n.Name, wfv1.NodeRunning)
woc.markNodePhase(lastChild.Name, wfv1.NodeError)
_, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{})
assert.NoError(t, err)
n, err = woc.wf.GetNodeByName(nodeName)
assert.NoError(t, err)
assert.Equal(t, n.Phase, wfv1.NodeError)

// Add a third node that has failed.
woc.markNodePhase(n.Name, wfv1.NodeRunning)
childNode := fmt.Sprintf("%s(%d)", nodeName, 3)
woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeFailed, &wfv1.NodeFlag{Retried: true})
woc.addChildNode(nodeName, childNode)
n, err = woc.wf.GetNodeByName(nodeName)
assert.NoError(t, err)
n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{})
assert.NoError(t, err)
assert.Equal(t, n.Phase, wfv1.NodeFailed)
}

func parseRetryMessage(message string) (int, error) {
pattern := regexp.MustCompile(`Backoff for (\d+) minutes (\d+) seconds`)
matches := pattern.FindStringSubmatch(message)
Expand Down

0 comments on commit 9b69363

Please sign in to comment.