Skip to content

Commit

Permalink
feat(cli): resubmit workflows by label and field selector (argoproj#5807
Browse files Browse the repository at this point in the history
)

Signed-off-by: Peixuan Ding <[email protected]>
  • Loading branch information
dinever committed May 4, 2021
1 parent 3f80866 commit 507b92c
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 27 deletions.
118 changes: 98 additions & 20 deletions cmd/argo/commands/resubmit.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,37 @@
package commands

import (
"github.com/argoproj/pkg/errors"
"context"

"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/pkg/errors"

"github.com/argoproj/argo-workflows/v3/cmd/argo/commands/client"
workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

type resubmitOps struct {
priority int32 // --priority
memoized bool // --memoized
namespace string // --namespace
labelSelector string // --selector
fieldSelector string // --field-selector
}

// hasSelector returns true if the CLI arguments selects multiple workflows
func (o *resubmitOps) hasSelector() bool {
if o.labelSelector != "" || o.fieldSelector != "" {
return true
}
return false
}

func NewResubmitCommand() *cobra.Command {
var (
memoized bool
priority int32
resubmitOpts resubmitOps
cliSubmitOpts cliSubmitOpts
)
command := &cobra.Command{
Expand All @@ -21,6 +41,18 @@ func NewResubmitCommand() *cobra.Command {
argo resubmit my-wf
# Resubmit multiple workflows:
argo resubmit my-wf my-other-wf my-third-wf
# Resubmit multiple workflows by label selector:
argo resubmit -l workflows.argoproj.io/test=true
# Resubmit multiple workflows by field selector:
argo resubmit --field-selector metadata.namespace=argo
# Resubmit and wait for completion:
argo resubmit --wait my-wf.yaml
Expand All @@ -39,31 +71,77 @@ func NewResubmitCommand() *cobra.Command {
`,
Run: func(cmd *cobra.Command, args []string) {
if cmd.Flag("priority").Changed {
cliSubmitOpts.priority = &priority
cliSubmitOpts.priority = &resubmitOpts.priority
}

ctx, apiClient := client.NewAPIClient()
serviceClient := apiClient.NewWorkflowServiceClient()
namespace := client.Namespace()

for _, name := range args {
created, err := serviceClient.ResubmitWorkflow(ctx, &workflowpkg.WorkflowResubmitRequest{
Namespace: namespace,
Name: name,
Memoized: memoized,
})
errors.CheckError(err)
printWorkflow(created, getFlags{output: cliSubmitOpts.output})
waitWatchOrLog(ctx, serviceClient, namespace, []string{created.Name}, cliSubmitOpts)
}
resubmitOpts.namespace = client.Namespace()
err := resubmitWorkflows(ctx, serviceClient, resubmitOpts, cliSubmitOpts, args)
errors.CheckError(err)
},
}

command.Flags().Int32Var(&priority, "priority", 0, "workflow priority")
command.Flags().Int32Var(&resubmitOpts.priority, "priority", 0, "workflow priority")
command.Flags().StringVarP(&cliSubmitOpts.output, "output", "o", "", "Output format. One of: name|json|yaml|wide")
command.Flags().BoolVarP(&cliSubmitOpts.wait, "wait", "w", false, "wait for the workflow to complete")
command.Flags().BoolVar(&cliSubmitOpts.watch, "watch", false, "watch the workflow until it completes")
command.Flags().BoolVarP(&cliSubmitOpts.wait, "wait", "w", false, "wait for the workflow to complete, only works when a single workflow is resubmitted")
command.Flags().BoolVar(&cliSubmitOpts.watch, "watch", false, "watch the workflow until it completes, only works when a single workflow is resubmitted")
command.Flags().BoolVar(&cliSubmitOpts.log, "log", false, "log the workflow until it completes")
command.Flags().BoolVar(&memoized, "memoized", false, "re-use successful steps & outputs from the previous run (experimental)")
command.Flags().BoolVar(&resubmitOpts.memoized, "memoized", false, "re-use successful steps & outputs from the previous run (experimental)")
command.Flags().StringVarP(&resubmitOpts.labelSelector, "selector", "l", "", "Selector (label query) to filter on, not including uninitialized ones, supports '=', '==', and '!='.(e.g. -l key1=value1,key2=value2)")
command.Flags().StringVar(&resubmitOpts.fieldSelector, "field-selector", "", "Selector (field query) to filter on, supports '=', '==', and '!='.(e.g. --field-selector key1=value1,key2=value2). The server only supports a limited number of field queries per type.")
return command
}

// resubmitWorkflows resubmits workflows by given resubmitOpts or workflow names
func resubmitWorkflows(ctx context.Context, serviceClient workflowpkg.WorkflowServiceClient, resubmitOpts resubmitOps, cliSubmitOpts cliSubmitOpts, args []string) error {
var (
wfs wfv1.Workflows
err error
)
if resubmitOpts.hasSelector() {
wfs, err = listWorkflows(ctx, serviceClient, listFlags{
namespace: resubmitOpts.namespace,
fields: resubmitOpts.fieldSelector,
labels: resubmitOpts.labelSelector,
})
if err != nil {
return err
}
}

for _, n := range args {
wfs = append(wfs, wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: n,
Namespace: resubmitOpts.namespace,
},
})
}

var lastResubmitted *wfv1.Workflow
resubmittedNames := make(map[string]bool)

for _, wf := range wfs {
if _, ok := resubmittedNames[wf.Name]; ok {
// de-duplication in case there is an overlap between the selector and given workflow names
continue
}
resubmittedNames[wf.Name] = true

lastResubmitted, err = serviceClient.ResubmitWorkflow(ctx, &workflowpkg.WorkflowResubmitRequest{
Namespace: wf.Namespace,
Name: wf.Name,
Memoized: resubmitOpts.memoized,
})
if err != nil {
return err
}
printWorkflow(lastResubmitted, getFlags{output: cliSubmitOpts.output})
}
if len(resubmittedNames) == 1 {
// watch or wait when there is only one workflow retried
waitWatchOrLog(ctx, serviceClient, lastResubmitted.Namespace, []string{lastResubmitted.Name}, cliSubmitOpts)
}
return nil
}
164 changes: 164 additions & 0 deletions cmd/argo/commands/resubmit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package commands

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
workflowmocks "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow/mocks"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

func Test_resubmitWorkflows(t *testing.T) {
t.Run("Resubmit workflow by names", func(t *testing.T) {
c := &workflowmocks.WorkflowServiceClient{}
resubmitOpts := resubmitOps{
namespace: "argo",
}
cliSubmitOpts := cliSubmitOpts{}

c.On("ResubmitWorkflow", mock.Anything, mock.Anything).Return(&wfv1.Workflow{}, nil)

err := resubmitWorkflows(context.Background(), c, resubmitOpts, cliSubmitOpts, []string{"foo", "bar"})
c.AssertNumberOfCalls(t, "ResubmitWorkflow", 2)

assert.NoError(t, err)
})

t.Run("Resubmit workflow with memoization", func(t *testing.T) {
c := &workflowmocks.WorkflowServiceClient{}
resubmitOpts := resubmitOps{
namespace: "argo",
memoized: true,
}
cliSubmitOpts := cliSubmitOpts{}

c.On("ResubmitWorkflow", mock.Anything, mock.Anything).Return(&wfv1.Workflow{}, nil)

err := resubmitWorkflows(context.Background(), c, resubmitOpts, cliSubmitOpts, []string{"foo"})
c.AssertNumberOfCalls(t, "ResubmitWorkflow", 1)
c.AssertCalled(t, "ResubmitWorkflow", mock.Anything, &workflowpkg.WorkflowResubmitRequest{
Name: "foo",
Namespace: "argo",
Memoized: true,
})

assert.NoError(t, err)
})

t.Run("Resubmit workflow by selector", func(t *testing.T) {
c := &workflowmocks.WorkflowServiceClient{}
resubmitOpts := resubmitOps{
namespace: "argo",
labelSelector: "custom-label=true",
}
cliSubmitOpts := cliSubmitOpts{}

wfListReq := &workflowpkg.WorkflowListRequest{
Namespace: "argo",
ListOptions: &metav1.ListOptions{
LabelSelector: resubmitOpts.labelSelector,
},
}

wfList := &wfv1.WorkflowList{Items: wfv1.Workflows{
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "argo"}},
{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "argo"}},
{ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "argo"}},
}}

c.On("ListWorkflows", mock.Anything, wfListReq).Return(wfList, nil)
c.On("ResubmitWorkflow", mock.Anything, mock.Anything).Return(&wfv1.Workflow{}, nil)

err := resubmitWorkflows(context.Background(), c, resubmitOpts, cliSubmitOpts, []string{})

c.AssertNumberOfCalls(t, "ResubmitWorkflow", 3)
for _, wf := range wfList.Items {
resubmitReq := &workflowpkg.WorkflowResubmitRequest{
Name: wf.Name,
Namespace: wf.Namespace,
Memoized: false,
}
c.AssertCalled(t, "ResubmitWorkflow", mock.Anything, resubmitReq)
}

assert.NoError(t, err)
})

t.Run("Resubmit workflow by selector and name", func(t *testing.T) {
c := &workflowmocks.WorkflowServiceClient{}
resubmitOpts := resubmitOps{
namespace: "argo",
labelSelector: "custom-label=true",
}
cliSubmitOpts := cliSubmitOpts{}

wfListReq := &workflowpkg.WorkflowListRequest{
Namespace: "argo",
ListOptions: &metav1.ListOptions{
LabelSelector: resubmitOpts.labelSelector,
},
}

wfList := &wfv1.WorkflowList{Items: wfv1.Workflows{
{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
{ObjectMeta: metav1.ObjectMeta{Name: "baz"}},
}}

c.On("ListWorkflows", mock.Anything, wfListReq).Return(wfList, nil)

c.On("ResubmitWorkflow", mock.Anything, mock.Anything).Return(&wfv1.Workflow{}, nil)

err := resubmitWorkflows(context.Background(), c, resubmitOpts, cliSubmitOpts, []string{"foo", "qux"})
// after de-duplication, there will be 4 workflows to resubmit
c.AssertNumberOfCalls(t, "ResubmitWorkflow", 4)

// the 3 workflows from the selectors: "foo", "bar", "baz"
for _, wf := range wfList.Items {
resubmitReq := &workflowpkg.WorkflowResubmitRequest{
Name: wf.Name,
Namespace: wf.Namespace,
Memoized: false,
}
c.AssertCalled(t, "ResubmitWorkflow", mock.Anything, resubmitReq)
}

// the 1 workflow by the given name "qux
c.AssertCalled(t, "ResubmitWorkflow", mock.Anything, &workflowpkg.WorkflowResubmitRequest{
Name: "qux",
Namespace: "argo",
Memoized: false,
})

assert.NoError(t, err)
})

t.Run("Resubmit workflow list error", func(t *testing.T) {
c := &workflowmocks.WorkflowServiceClient{}
resubmitOpts := resubmitOps{
namespace: "argo",
labelSelector: "custom-label=true",
}
cliSubmitOpts := cliSubmitOpts{}
c.On("ListWorkflows", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error"))
err := resubmitWorkflows(context.Background(), c, resubmitOpts, cliSubmitOpts, []string{})
assert.Errorf(t, err, "mock error")
})

t.Run("Resubmit workflow error", func(t *testing.T) {
c := &workflowmocks.WorkflowServiceClient{}
resubmitOpts := resubmitOps{
namespace: "argo",
}
cliSubmitOpts := cliSubmitOpts{}
c.On("ResubmitWorkflow", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error"))
err := resubmitWorkflows(context.Background(), c, resubmitOpts, cliSubmitOpts, []string{"foo"})
assert.Errorf(t, err, "mock error")
})
}
28 changes: 21 additions & 7 deletions docs/cli/argo_resubmit.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ argo resubmit [WORKFLOW...] [flags]
argo resubmit my-wf
# Resubmit multiple workflows:
argo resubmit my-wf my-other-wf my-third-wf
# Resubmit multiple workflows by label selector:
argo resubmit -l workflows.argoproj.io/test=true
# Resubmit multiple workflows by field selector:
argo resubmit --field-selector metadata.namespace=argo
# Resubmit and wait for completion:
argo resubmit --wait my-wf.yaml
Expand All @@ -38,13 +50,15 @@ argo resubmit [WORKFLOW...] [flags]
### Options

```
-h, --help help for resubmit
--log log the workflow until it completes
--memoized re-use successful steps & outputs from the previous run (experimental)
-o, --output string Output format. One of: name|json|yaml|wide
--priority int32 workflow priority
-w, --wait wait for the workflow to complete
--watch watch the workflow until it completes
--field-selector string Selector (field query) to filter on, supports '=', '==', and '!='.(e.g. --field-selector key1=value1,key2=value2). The server only supports a limited number of field queries per type.
-h, --help help for resubmit
--log log the workflow until it completes
--memoized re-use successful steps & outputs from the previous run (experimental)
-o, --output string Output format. One of: name|json|yaml|wide
--priority int32 workflow priority
-l, --selector string Selector (label query) to filter on, not including uninitialized ones, supports '=', '==', and '!='.(e.g. -l key1=value1,key2=value2)
-w, --wait wait for the workflow to complete, only works when a single workflow is resubmitted
--watch watch the workflow until it completes, only works when a single workflow is resubmitted
```

### Options inherited from parent commands
Expand Down
Loading

0 comments on commit 507b92c

Please sign in to comment.