Skip to content

Commit

Permalink
feat: More control over resuming suspended nodes Fixes argoproj#1893 (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
markterm committed Mar 26, 2020
1 parent b277124 commit e2cc698
Show file tree
Hide file tree
Showing 12 changed files with 558 additions and 112 deletions.
9 changes: 9 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -5798,6 +5798,9 @@
},
"namespace": {
"type": "string"
},
"nodeFieldSelector": {
"type": "string"
}
}
},
Expand All @@ -5815,11 +5818,17 @@
"io.argoproj.workflow.v1alpha1.WorkflowStopRequest": {
"type": "object",
"properties": {
"message": {
"type": "string"
},
"name": {
"type": "string"
},
"namespace": {
"type": "string"
},
"nodeFieldSelector": {
"type": "string"
}
}
},
Expand Down
21 changes: 19 additions & 2 deletions cmd/argo/commands/resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,39 @@ import (
"log"

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/fields"

"github.com/argoproj/argo/cmd/argo/commands/client"
workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"
)

type resumeOps struct {
nodeFieldSelector string // --node-field-selector
}

func NewResumeCommand() *cobra.Command {
var (
resumeArgs resumeOps
)

var command = &cobra.Command{
Use: "resume WORKFLOW1 WORKFLOW2...",
Short: "resume zero or more workflows",
Run: func(cmd *cobra.Command, args []string) {
ctx, apiClient := client.NewAPIClient()
serviceClient := apiClient.NewWorkflowServiceClient()
namespace := client.Namespace()

selector, err := fields.ParseSelector(resumeArgs.nodeFieldSelector)
if err != nil {
log.Fatalf("Unable to parse node field selector '%s': %s", resumeArgs.nodeFieldSelector, err)
}

for _, wfName := range args {
_, err := serviceClient.ResumeWorkflow(ctx, &workflowpkg.WorkflowResumeRequest{
Name: wfName,
Namespace: namespace,
Name: wfName,
Namespace: namespace,
NodeFieldSelector: selector.String(),
})
if err != nil {
log.Fatalf("Failed to resume %s: %+v", wfName, err)
Expand All @@ -31,5 +47,6 @@ func NewResumeCommand() *cobra.Command {

},
}
command.Flags().StringVar(&resumeArgs.nodeFieldSelector, "node-field-selector", "", "selector of node to resume, eg: --node-field-selector inputs.paramaters.myparam.value=abc")
return command
}
25 changes: 23 additions & 2 deletions cmd/argo/commands/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,26 @@ package commands

import (
"fmt"
"log"

"github.com/argoproj/pkg/errors"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/fields"

"github.com/argoproj/argo/cmd/argo/commands/client"
workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"
)

type stopOps struct {
message string // --message
nodeFieldSelector string // --node-field-selector
}

func NewStopCommand() *cobra.Command {
var (
stopArgs stopOps
)

var command = &cobra.Command{
Use: "stop WORKFLOW WORKFLOW2...",
Short: "stop zero or more workflows",
Expand All @@ -19,15 +30,25 @@ func NewStopCommand() *cobra.Command {
ctx, apiClient := client.NewAPIClient()
serviceClient := apiClient.NewWorkflowServiceClient()
namespace := client.Namespace()

selector, err := fields.ParseSelector(stopArgs.nodeFieldSelector)
if err != nil {
log.Fatalf("Unable to parse node field selector '%s': %s", stopArgs.nodeFieldSelector, err)
}

for _, name := range args {
wf, err := serviceClient.StopWorkflow(ctx, &workflowpkg.WorkflowStopRequest{
Name: name,
Namespace: namespace,
Name: name,
Namespace: namespace,
NodeFieldSelector: selector.String(),
Message: stopArgs.message,
})
errors.CheckError(err)
fmt.Printf("workflow %s stopped\n", wf.Name)
}
},
}
command.Flags().StringVar(&stopArgs.message, "message", "", "Message to add to previously running nodes")
command.Flags().StringVar(&stopArgs.nodeFieldSelector, "node-field-selector", "", "selector of node to stop, eg: --node-field-selector inputs.paramaters.myparam.value=abc")
return command
}
297 changes: 226 additions & 71 deletions pkg/apiclient/workflow/workflow.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/apiclient/workflow/workflow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ message WorkflowRetryRequest {
message WorkflowResumeRequest {
string name = 1;
string namespace = 2;
string nodeFieldSelector = 3;
}

message WorkflowTerminateRequest {
Expand All @@ -58,6 +59,8 @@ message WorkflowTerminateRequest {
message WorkflowStopRequest {
string name = 1;
string namespace = 2;
string nodeFieldSelector = 3;
string message = 4;
}

message WorkflowSuspendRequest {
Expand Down
9 changes: 9 additions & 0 deletions pkg/apiclient/workflow/workflow.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -4903,6 +4903,9 @@
},
"namespace": {
"type": "string"
},
"nodeFieldSelector": {
"type": "string"
}
}
},
Expand All @@ -4925,6 +4928,12 @@
},
"namespace": {
"type": "string"
},
"nodeFieldSelector": {
"type": "string"
},
"message": {
"type": "string"
}
}
},
Expand Down
4 changes: 2 additions & 2 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (s *workflowServer) ResumeWorkflow(ctx context.Context, req *workflowpkg.Wo
return nil, err
}

err = util.ResumeWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), req.Name)
err = util.ResumeWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), req.Name, req.NodeFieldSelector)
if err != nil {
log.Warnf("Failed to resume %s: %+v", req.Name, err)
return nil, err
Expand Down Expand Up @@ -313,7 +313,7 @@ func (s *workflowServer) TerminateWorkflow(ctx context.Context, req *workflowpkg

func (s *workflowServer) StopWorkflow(ctx context.Context, req *workflowpkg.WorkflowStopRequest) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
err := util.StopWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), req.Name)
err := util.StopWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), req.Name, req.NodeFieldSelector, req.Message)
if err != nil {
return nil, err
}
Expand Down
35 changes: 35 additions & 0 deletions test/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,41 @@ func (s *CLISuite) TestWorkflowSuspendResume() {
})
}

func (s *CLISuite) TestNodeSuspendResume() {
s.Given().
Workflow("@testdata/node-suspend.yaml").
When().
SubmitWorkflow().
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
return wf.Status.AnyActiveSuspendNode()
}, "suspended node", 20*time.Second).
RunCli([]string{"resume", "node-suspend", "--node-selector", "inputs.parameters.tag.value=suspend1-tag1"}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err) {
assert.Contains(t, output, "workflow node-suspend resumed")
}
}).
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
return wf.Status.AnyActiveSuspendNode()
}, "suspended node", 10*time.Second).
RunCli([]string{"stop", "node-suspend", "--node-selector", "inputs.parameters.tag.value=suspend2-tag1", "--message", "because"}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err) {
assert.Contains(t, output, "workflow node-suspend stopped")
}
}).
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
return wf.Status.Phase == wfv1.NodeFailed
}, "suspended node", 10*time.Second).
Then().
ExpectWorkflow(func(t *testing.T, _ *corev1.ObjectMeta, status *wfv1.WorkflowStatus) {
if assert.Equal(t, wfv1.NodeFailed, status.Phase) {
r := regexp.MustCompile(`child '(node-suspend-[0-9]+)' failed`)
res := r.FindStringSubmatch(status.Message)
assert.Equal(t, len(res), 2)
assert.Equal(t, status.Nodes[res[1]].Message, "because")
}
})
}

func (s *CLISuite) TestWorkflowDelete() {
s.Run("DeleteByName", func() {
s.Given().
Expand Down
37 changes: 37 additions & 0 deletions test/e2e/testdata/node-suspend.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: node-suspend
labels:
argo-e2e: true
spec:
entrypoint: node-suspend
templates:
- name: node-suspend
steps:
- - name: step1
template: whalesay
- - name: suspend1
template: suspend
arguments:
parameters:
- name: "tag"
value: "suspend1-{{steps.step1.outputs.result}}"
- - name: suspend2
template: suspend
arguments:
parameters:
- name: "tag"
value: "suspend2-{{steps.step1.outputs.result}}"

- name: whalesay
script:
image: python:alpine3.6
command: [sh]
source: echo tag1

- name: suspend
inputs:
parameters:
- name: tag
suspend: {}
2 changes: 1 addition & 1 deletion test/e2e/testdata/workflow-template-nested-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ spec:
template: whalesay-inner-template
inputs:
parameters:
- name: message
- name: message
96 changes: 94 additions & 2 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ func TestSuspendResume(t *testing.T) {
assert.Equal(t, 0, len(pods.Items))

// resume the workflow and operate again. two pods should be able to be scheduled
err = util.ResumeWorkflow(wfcset, wf.ObjectMeta.Name)
err = util.ResumeWorkflow(wfcset, wf.ObjectMeta.Name, "")
assert.NoError(t, err)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
Expand Down Expand Up @@ -1124,10 +1124,17 @@ spec:
steps:
- - name: approve
template: approve
arguments:
parameters:
- name: param1
value: value1
- - name: release
template: whalesay
- name: approve
inputs:
parameters:
- name: param1
suspend: {}
- name: whalesay
Expand Down Expand Up @@ -1159,7 +1166,92 @@ func TestSuspendTemplate(t *testing.T) {
assert.Equal(t, 0, len(pods.Items))

// resume the workflow. verify resume workflow edits nodestatus correctly
err = util.ResumeWorkflow(wfcset, wf.ObjectMeta.Name)
err = util.ResumeWorkflow(wfcset, wf.ObjectMeta.Name, "")
assert.NoError(t, err)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
assert.False(t, util.IsWorkflowSuspended(wf))

// operate the workflow. it should reach the second step
woc = newWorkflowOperationCtx(wf, controller)
woc.operate()
pods, err = controller.kubeclientset.CoreV1().Pods("").List(metav1.ListOptions{})
assert.NoError(t, err)
assert.Equal(t, 1, len(pods.Items))
}

func TestSuspendTemplateWithFailedResume(t *testing.T) {
controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

// operate the workflow. it should become in a suspended state after
wf := unmarshalWF(suspendTemplate)
wf, err := wfcset.Create(wf)
assert.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
assert.True(t, util.IsWorkflowSuspended(wf))

// operate again and verify no pods were scheduled
woc = newWorkflowOperationCtx(wf, controller)
woc.operate()
pods, err := controller.kubeclientset.CoreV1().Pods("").List(metav1.ListOptions{})
assert.NoError(t, err)
assert.Equal(t, 0, len(pods.Items))

// resume the workflow. verify resume workflow edits nodestatus correctly
err = util.StopWorkflow(wfcset, wf.ObjectMeta.Name, "inputs.parameters.param1.value=value1", "Step failed!")
assert.NoError(t, err)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
assert.False(t, util.IsWorkflowSuspended(wf))

// operate the workflow. it should be failed and not reach the second step
woc = newWorkflowOperationCtx(wf, controller)
woc.operate()
assert.Equal(t, wfv1.NodeFailed, woc.wf.Status.Phase)
pods, err = controller.kubeclientset.CoreV1().Pods("").List(metav1.ListOptions{})
assert.NoError(t, err)
assert.Equal(t, 0, len(pods.Items))
}

func TestSuspendTemplateWithFilteredResume(t *testing.T) {
controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

// operate the workflow. it should become in a suspended state after
wf := unmarshalWF(suspendTemplate)
wf, err := wfcset.Create(wf)
assert.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
assert.True(t, util.IsWorkflowSuspended(wf))

// operate again and verify no pods were scheduled
woc = newWorkflowOperationCtx(wf, controller)
woc.operate()
pods, err := controller.kubeclientset.CoreV1().Pods("").List(metav1.ListOptions{})
assert.NoError(t, err)
assert.Equal(t, 0, len(pods.Items))

// resume the workflow, but with non-matching selector
err = util.ResumeWorkflow(wfcset, wf.ObjectMeta.Name, "inputs.paramaters.param1.value=value2")
assert.Error(t, err)

// operate the workflow. nothing should have happened
woc = newWorkflowOperationCtx(wf, controller)
woc.operate()
pods, err = controller.kubeclientset.CoreV1().Pods("").List(metav1.ListOptions{})
assert.NoError(t, err)
assert.Equal(t, 0, len(pods.Items))
assert.True(t, util.IsWorkflowSuspended(wf))

// resume the workflow, but with matching selector
err = util.ResumeWorkflow(wfcset, wf.ObjectMeta.Name, "inputs.parameters.param1.value=value1")
assert.NoError(t, err)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
Expand Down
Loading

0 comments on commit e2cc698

Please sign in to comment.