Skip to content

Commit

Permalink
feat: Allow step restart on workflow retry. Closes #2334 (#2431)
Browse files Browse the repository at this point in the history
  • Loading branch information
markterm committed Apr 12, 2020
1 parent cf277eb commit 9c6351f
Show file tree
Hide file tree
Showing 11 changed files with 349 additions and 100 deletions.
7 changes: 7 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -6147,6 +6147,13 @@
},
"namespace": {
"type": "string"
},
"nodeFieldSelector": {
"type": "string"
},
"restartSuccesful": {
"type": "boolean",
"format": "boolean"
}
}
},
Expand Down
22 changes: 20 additions & 2 deletions cmd/argo/commands/retry.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package commands

import (
"log"

"github.com/argoproj/pkg/errors"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/fields"

"github.com/argoproj/argo/cmd/argo/commands/client"
workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"
)

type retryOps struct {
nodeFieldSelector string // --node-field-selector
restartSuccessful bool // --restart-successful
}

func NewRetryCommand() *cobra.Command {
var (
cliSubmitOpts cliSubmitOpts
retryOps retryOps
)
var command = &cobra.Command{
Use: "retry [WORKFLOW...]",
Expand All @@ -20,10 +29,17 @@ func NewRetryCommand() *cobra.Command {
serviceClient := apiClient.NewWorkflowServiceClient()
namespace := client.Namespace()

selector, err := fields.ParseSelector(retryOps.nodeFieldSelector)
if err != nil {
log.Fatalf("Unable to parse node field selector '%s': %s", retryOps.nodeFieldSelector, err)
}

for _, name := range args {
wf, err := serviceClient.RetryWorkflow(ctx, &workflowpkg.WorkflowRetryRequest{
Name: name,
Namespace: namespace,
Name: name,
Namespace: namespace,
RestartSuccesful: retryOps.restartSuccessful,
NodeFieldSelector: selector.String(),
})
if err != nil {
errors.CheckError(err)
Expand All @@ -37,5 +53,7 @@ func NewRetryCommand() *cobra.Command {
command.Flags().StringVarP(&cliSubmitOpts.output, "output", "o", "", "Output format. One of: name|json|yaml|wide")
command.Flags().BoolVarP(&cliSubmitOpts.wait, "wait", "w", false, "wait for the workflow to complete")
command.Flags().BoolVar(&cliSubmitOpts.watch, "watch", false, "watch the workflow until it completes")
command.Flags().BoolVar(&retryOps.restartSuccessful, "restart-successful", false, "indicates to restart succesful nodes matching the --node-field-selector")
command.Flags().StringVar(&retryOps.nodeFieldSelector, "node-field-selector", "", "selector of nodes to reset, eg: --node-field-selector inputs.paramaters.myparam.value=abc")
return command
}
242 changes: 168 additions & 74 deletions pkg/apiclient/workflow/workflow.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/apiclient/workflow/workflow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ message WorkflowResubmitRequest {
message WorkflowRetryRequest {
string name = 1;
string namespace = 2;
bool restartSuccesful = 3;
string nodeFieldSelector = 4;
}
message WorkflowResumeRequest {
string name = 1;
Expand Down
7 changes: 7 additions & 0 deletions pkg/apiclient/workflow/workflow.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -4943,6 +4943,13 @@
},
"namespace": {
"type": "string"
},
"restartSuccesful": {
"type": "boolean",
"format": "boolean"
},
"nodeFieldSelector": {
"type": "string"
}
}
},
Expand Down
2 changes: 1 addition & 1 deletion server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor
return nil, err
}

wf, err = util.RetryWorkflow(kubeClient, s.offloadNodeStatusRepo, wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), wf)
wf, err = util.RetryWorkflow(kubeClient, s.offloadNodeStatusRepo, wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), wf, req.RestartSuccesful, req.NodeFieldSelector)
if err != nil {
return nil, err
}
Expand Down
33 changes: 29 additions & 4 deletions test/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,17 +476,42 @@ func (s *CLISuite) TestWorkflowRetryNoPersistence() {
// When this is the case, this behavior is tested in cli_with_server_test.go
s.T().SkipNow()
}

var retryTime corev1.Time

s.Given().
Workflow("@testdata/exit-1.yaml").
Workflow("@testdata/retry-test.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(30*time.Second).
Given().
RunCli([]string{"retry", "exit-1"}, func(t *testing.T, output string, err error) {
WaitForWorkflowToStart(5*time.Second).
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
return wf.Status.AnyActiveSuspendNode()
}, "suspended node", 30*time.Second).
RunCli([]string{"terminate", "retry-test"}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err) {
assert.Contains(t, output, "workflow retry-test terminated")
}
}).
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
retryTime = wf.Status.FinishedAt
return wf.Status.Phase == wfv1.NodeFailed
}, "terminated", 20*time.Second).
RunCli([]string{"retry", "retry-test", "--restart-successful", "--node-field-selector", "templateName==steps-inner"}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err) {
assert.Contains(t, output, "Name:")
assert.Contains(t, output, "Namespace:")
}
}).
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
return wf.Status.AnyActiveSuspendNode()
}, "suspended node", 20*time.Second).
Then().
ExpectWorkflow(func(t *testing.T, _ *corev1.ObjectMeta, status *wfv1.WorkflowStatus) {
outerStepsPodNode := status.Nodes.FindByDisplayName("steps-outer-step1")
innerStepsPodNode := status.Nodes.FindByDisplayName("steps-inner-step1")

assert.True(t, outerStepsPodNode.FinishedAt.Before(&retryTime))
assert.True(t, retryTime.Before(&innerStepsPodNode.FinishedAt))
})
}

Expand Down
31 changes: 31 additions & 0 deletions test/e2e/testdata/retry-test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: retry-test
labels:
argo-e2e: "true"
spec:
entrypoint: steps-outer
templates:
- name: steps-outer
steps:
- - name: steps-outer-step1
template: whalesay
- - name: steps-outer-step2
template: steps-inner

- name: steps-inner
steps:
- - name: steps-inner-step1
template: whalesay
- - name: steps-inner-step2
template: approve

- name: approve
suspend: {}

- name: whalesay
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
25 changes: 21 additions & 4 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (woc *wfOperationCtx) operate() {
woc.workflowDeadline = woc.getWorkflowDeadline()
err := woc.podReconciliation()
if err == nil {
err = woc.failSuspendedNodesAfterDeadline()
err = woc.failSuspendedNodesAfterDeadlineOrShutdown()
}
if err != nil {
woc.log.Errorf("%s error: %+v", woc.wf.ObjectMeta.Name, err)
Expand Down Expand Up @@ -636,6 +636,17 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
return woc.markNodePhase(node.Name, wfv1.NodeSucceeded), true, nil
}

if woc.wf.Spec.Shutdown != "" || (woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)) {
var message string
if woc.wf.Spec.Shutdown != "" {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.wf.Spec.Shutdown)
} else {
message = fmt.Sprintf("retry exceeded workflow deadline %s", *woc.workflowDeadline)
}
woc.log.Infoln(message)
return woc.markNodePhase(node.Name, lastChildNode.Phase, message), true, nil
}

if retryStrategy.Backoff != nil {
// Process max duration limit
if retryStrategy.Backoff.MaxDuration != "" && len(node.Children) > 0 {
Expand Down Expand Up @@ -808,11 +819,17 @@ func (woc *wfOperationCtx) shouldPrintPodSpec(node wfv1.NodeStatus) bool {
}

//fails any suspended nodes if the workflow deadline has passed
func (woc *wfOperationCtx) failSuspendedNodesAfterDeadline() error {
if woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline) {
func (woc *wfOperationCtx) failSuspendedNodesAfterDeadlineOrShutdown() error {
if woc.wf.Spec.Shutdown != "" || (woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)) {
for _, node := range woc.wf.Status.Nodes {
if node.IsActiveSuspendNode() {
woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("step exceeded workflow deadline %s", *woc.workflowDeadline))
var message string
if woc.wf.Spec.Shutdown != "" {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.wf.Spec.Shutdown)
} else {
message = fmt.Sprintf("step exceeded workflow deadline %s", *woc.workflowDeadline)
}
woc.markNodePhase(node.Name, wfv1.NodeFailed, message)
}
}
}
Expand Down
74 changes: 61 additions & 13 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,20 @@ func ResumeWorkflow(wfIf v1alpha1.WorkflowInterface, repo sqldb.OffloadNodeStatu
}
}

func selectorMatchesNode(selector fields.Selector, node wfv1.NodeStatus) bool {
nodeFields := fields.Set{
"displayName": node.DisplayName,
"templateName": node.TemplateName,
}
if node.Inputs != nil {
for _, inParam := range node.Inputs.Parameters {
nodeFields[fmt.Sprintf("inputs.parameters.%s.value", inParam.Name)] = *inParam.Value
}
}

return selector.Matches(nodeFields)
}

func updateWorkflowNodeByKey(wfIf v1alpha1.WorkflowInterface, workflowName string, nodeFieldSelector string, phase wfv1.NodePhase, message string) error {
selector, err := fields.ParseSelector(nodeFieldSelector)

Expand All @@ -441,16 +455,7 @@ func updateWorkflowNodeByKey(wfIf v1alpha1.WorkflowInterface, workflowName strin
nodeUpdated := false
for nodeID, node := range wf.Status.Nodes {
if node.IsActiveSuspendNode() {
nodeFields := fields.Set{
"displayName": node.DisplayName,
}
if node.Inputs != nil {
for _, inParam := range node.Inputs.Parameters {
nodeFields[fmt.Sprintf("inputs.parameters.%s.value", inParam.Name)] = *inParam.Value
}
}

if selector.Matches(nodeFields) {
if selectorMatchesNode(selector, node) {
node.Phase = phase
node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
if len(message) > 0 {
Expand Down Expand Up @@ -612,7 +617,7 @@ func convertNodeID(newWf *wfv1.Workflow, regex *regexp.Regexp, oldNodeID string,
}

// RetryWorkflow updates a workflow, deleting all failed steps as well as the onExit node (and children)
func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatusRepo, wfClient v1alpha1.WorkflowInterface, wf *wfv1.Workflow) (*wfv1.Workflow, error) {
func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatusRepo, wfClient v1alpha1.WorkflowInterface, wf *wfv1.Workflow, restartSuccessful bool, nodeFieldSelector string) (*wfv1.Workflow, error) {
switch wf.Status.Phase {
case wfv1.NodeFailed, wfv1.NodeError:
default:
Expand All @@ -634,12 +639,12 @@ func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatus
newWF.Status.Phase = wfv1.NodeRunning
newWF.Status.Message = ""
newWF.Status.FinishedAt = metav1.Time{}
newWF.Spec.Shutdown = ""
if newWF.Spec.ActiveDeadlineSeconds != nil && *newWF.Spec.ActiveDeadlineSeconds == 0 {
// if it was terminated, unset the deadline
newWF.Spec.ActiveDeadlineSeconds = nil
}

// Iterate the previous nodes. If it was successful Pod carry it forward
newNodes := make(map[string]wfv1.NodeStatus)
onExitNodeName := wf.ObjectMeta.Name + ".onExit"
nodes := wf.Status.Nodes
Expand All @@ -654,10 +659,22 @@ func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatus
}
}

// Get all children of nodes that match filter
nodeIDsToReset, err := getNodeIDsToReset(restartSuccessful, nodeFieldSelector, nodes)
if err != nil {
return nil, err
}

// Iterate the previous nodes. If it was successful Pod carry it forward
for _, node := range nodes {
doForceResetNode := false
if _, present := nodeIDsToReset[node.ID]; present {
// if we are resetting this node then don't carry it across regardless of its phase
doForceResetNode = true
}
switch node.Phase {
case wfv1.NodeSucceeded, wfv1.NodeSkipped:
if !strings.HasPrefix(node.Name, onExitNodeName) {
if !strings.HasPrefix(node.Name, onExitNodeName) && !doForceResetNode {
newNodes[node.ID] = node
continue
}
Expand Down Expand Up @@ -719,6 +736,37 @@ func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatus
return wfClient.Update(newWF)
}

func getNodeIDsToReset(restartSuccessful bool, nodeFieldSelector string, nodes wfv1.Nodes) (map[string]bool, error) {
nodeIDsToReset := make(map[string]bool)
if !restartSuccessful || len(nodeFieldSelector) == 0 {
return nodeIDsToReset, nil
}

selector, err := fields.ParseSelector(nodeFieldSelector)
if err != nil {
return nil, err
} else {
for _, node := range nodes {
if selectorMatchesNode(selector, node) {
//traverse all children of the node
var queue []string
queue = append(queue, node.ID)

for len(queue) > 0 {
childNode := queue[0]
//if the child isn't already in nodeIDsToReset then we add it and traverse its children
if _, present := nodeIDsToReset[childNode]; !present {
nodeIDsToReset[childNode] = true
queue = append(queue, nodes[childNode].Children...)
}
queue = queue[1:]
}
}
}
}
return nodeIDsToReset, nil
}

var errSuspendedCompletedWorkflow = errors.Errorf(errors.CodeBadRequest, "cannot suspend completed workflows")

// IsWorkflowSuspended returns whether or not a workflow is considered suspended
Expand Down
4 changes: 2 additions & 2 deletions workflow/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func TestRetryWorkflowCompressed(t *testing.T) {

clearFunc = packer.SetMaxWorkflowSize(1557)
defer clearFunc()
wf, err := RetryWorkflow(kubeCs, sqldb.ExplosiveOffloadNodeStatusRepo, wfIf, origWf)
wf, err := RetryWorkflow(kubeCs, sqldb.ExplosiveOffloadNodeStatusRepo, wfIf, origWf, false, "")
assert.NoError(t, err)
assert.NotEmpty(t, wf.Status.CompressedNodes)
}
Expand All @@ -481,7 +481,7 @@ func TestRetryWorkflowOffloaded(t *testing.T) {
offloadNodeStatusRepo.On("Get", "7e74dbb9-d681-4c22-9bed-a581ec28383f", "123").Return(origNodes, nil)
offloadNodeStatusRepo.On("Save", "7e74dbb9-d681-4c22-9bed-a581ec28383f", mock.Anything, mock.Anything).Return("1234", nil)

_, err = RetryWorkflow(kubeCs, offloadNodeStatusRepo, wfIf, origWf)
_, err = RetryWorkflow(kubeCs, offloadNodeStatusRepo, wfIf, origWf, false, "")
assert.NoError(t, err)

wf, err := wfIf.Get("fail-template", metav1.GetOptions{})
Expand Down

0 comments on commit 9c6351f

Please sign in to comment.