Skip to content

Commit

Permalink
Introduce argo retry to retry a failed workflow with the same name (r…
Browse files Browse the repository at this point in the history
…esolves argoproj#762)

onExit and related nodes should never be executed during resubmit/retry (resolves argoproj#780)
  • Loading branch information
jessesuen committed Mar 6, 2018
1 parent 90c08bf commit 60c48a9
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 0 deletions.
44 changes: 44 additions & 0 deletions cmd/argo/commands/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package commands

import (
"os"

"github.com/argoproj/argo/workflow/common"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func NewRetryCommand() *cobra.Command {
var (
submitArgs submitFlags
)
var command = &cobra.Command{
Use: "retry WORKFLOW",
Short: "retry a workflow",
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 0 {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}
kubeClient := initKubeClient()
wfClient := InitWorkflowClient()
wf, err := wfClient.Get(args[0], metav1.GetOptions{})
if err != nil {
log.Fatal(err)
}
wf, err = common.RetryWorkflow(kubeClient, wfClient, wf)
if err != nil {
log.Fatal(err)
}
printWorkflow(wf, submitArgs.output)
if submitArgs.wait {
wsp := NewWorkflowStatusPoller(wfClient, false, submitArgs.output == "json")
wsp.WaitWorkflows([]string{wf.ObjectMeta.Name})
}
},
}
command.Flags().StringVarP(&submitArgs.output, "output", "o", "", "Output format. One of: name|json|yaml|wide")
command.Flags().BoolVarP(&submitArgs.wait, "wait", "w", false, "wait for the workflow to complete")
return command
}
1 change: 1 addition & 0 deletions cmd/argo/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func NewCommand() *cobra.Command {
command.AddCommand(NewLogsCommand())
command.AddCommand(NewResubmitCommand())
command.AddCommand(NewResumeCommand())
command.AddCommand(NewRetryCommand())
command.AddCommand(NewSubmitCommand())
command.AddCommand(NewSuspendCommand())
command.AddCommand(NewUninstallCommand())
Expand Down
53 changes: 53 additions & 0 deletions workflow/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,9 +513,13 @@ func FormulateResubmitWorkflow(wf *wfv1.Workflow, memoized bool) (*wfv1.Workflow
// Iterate the previous nodes. If it was successful Pod carry it forward
replaceRegexp := regexp.MustCompile("^" + wf.ObjectMeta.Name)
newWF.Status.Nodes = make(map[string]wfv1.NodeStatus)
onExitNodeName := wf.ObjectMeta.Name + ".onExit"
for _, node := range wf.Status.Nodes {
switch node.Phase {
case wfv1.NodeSucceeded, wfv1.NodeSkipped:
if strings.HasPrefix(node.Name, onExitNodeName) {
continue
}
originalID := node.ID
node.Name = replaceRegexp.ReplaceAllString(node.Name, newWF.ObjectMeta.Name)
node.ID = newWF.NodeID(node.Name)
Expand Down Expand Up @@ -555,3 +559,52 @@ func convertNodeID(newWf *wfv1.Workflow, regex *regexp.Regexp, oldNodeID string,
newNodeName := regex.ReplaceAllString(node.Name, newWf.ObjectMeta.Name)
return newWf.NodeID(newNodeName)
}

// RetryWorkflow updates a workflow, deleting all failed steps as well as the onExit node (and children)
func RetryWorkflow(kubeClient kubernetes.Interface, wfClient v1alpha1.WorkflowInterface, wf *wfv1.Workflow) (*wfv1.Workflow, error) {
switch wf.Status.Phase {
case wfv1.NodeFailed, wfv1.NodeError:
default:
return nil, errors.Errorf(errors.CodeBadRequest, "workflow must be Failed/Error to retry")
}
newWF := wf.DeepCopy()
podIf := kubeClient.CoreV1().Pods(wf.ObjectMeta.Namespace)

// Delete/reset fields which indicate workflow completed
delete(newWF.Labels, LabelKeyCompleted)
delete(newWF.Labels, LabelKeyPhase)
newWF.Status.Phase = wfv1.NodeRunning
newWF.Status.Message = ""
newWF.Status.FinishedAt = metav1.Time{}

// Iterate the previous nodes. If it was successful Pod carry it forward
newWF.Status.Nodes = make(map[string]wfv1.NodeStatus)
onExitNodeName := wf.ObjectMeta.Name + ".onExit"
for _, node := range wf.Status.Nodes {
switch node.Phase {
case wfv1.NodeSucceeded, wfv1.NodeSkipped:
if !strings.HasPrefix(node.Name, onExitNodeName) {
newWF.Status.Nodes[node.ID] = node
continue
}
case wfv1.NodeError, wfv1.NodeFailed:
// do not add this status to the node. pretend as if this node never existed.
// NOTE: NodeRunning shouldn't really happen except in weird scenarios where controller
// mismanages state (e.g. panic when operating on a workflow)
default:
return nil, errors.InternalErrorf("Workflow cannot be retried with nodes in %s phase", node, node.Phase)
}
if node.Type == wfv1.NodeTypePod {
log.Infof("Deleting pod: %s", node.ID)
err := podIf.Delete(node.ID, &metav1.DeleteOptions{})
if err != nil && !apierr.IsNotFound(err) {
return nil, errors.InternalWrapError(err)
}
}
}
newWF, err := wfClient.Update(newWF)
if err != nil {
log.Fatal(err)
}
return newWF, nil
}
35 changes: 35 additions & 0 deletions workflow/common/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package common

import (
"testing"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// TestResubmitWorkflowWithOnExit ensures we do not carry over the onExit node even if successful
func TestResubmitWorkflowWithOnExit(t *testing.T) {
wfName := "test-wf"
onExitName := wfName + ".onExit"
wf := wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: "test-wf",
},
Status: wfv1.WorkflowStatus{
Phase: wfv1.NodeFailed,
Nodes: map[string]wfv1.NodeStatus{},
},
}
onExitID := wf.NodeID(onExitName)
wf.Status.Nodes[onExitID] = wfv1.NodeStatus{
Name: onExitName,
Phase: wfv1.NodeSucceeded,
}
newWF, err := FormulateResubmitWorkflow(&wf, true)
assert.Nil(t, err)
newWFOnExitName := newWF.ObjectMeta.Name + ".onExit"
newWFOneExitID := newWF.NodeID(newWFOnExitName)
_, ok := newWF.Status.Nodes[newWFOneExitID]
assert.False(t, ok)
}

0 comments on commit 60c48a9

Please sign in to comment.