Skip to content

Commit

Permalink
fix(controller): shutdownstrategy on running workflow (argoproj#5289)
Browse files Browse the repository at this point in the history
  • Loading branch information
sarabala1979 committed Mar 6, 2021
1 parent 112378f commit a5d1acc
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 26 deletions.
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,13 @@ type ShutdownStrategy string
const (
ShutdownStrategyTerminate ShutdownStrategy = "Terminate"
ShutdownStrategyStop ShutdownStrategy = "Stop"
ShutdownStrategyNone ShutdownStrategy = ""
)

func (s ShutdownStrategy) Enabled() bool {
return s != ShutdownStrategyNone
}

func (s ShutdownStrategy) ShouldExecute(isOnExitPod bool) bool {
switch s {
case ShutdownStrategyTerminate:
Expand Down
12 changes: 6 additions & 6 deletions workflow/controller/exec_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1
return nil
case apiv1.PodPending:
// Check if we are currently shutting down
if woc.execWf.Spec.Shutdown != "" {
if woc.GetShutdownStrategy().Enabled() {
// Only delete pods that are not part of an onExit handler if we are "Stopping" or all pods if we are "Terminating"
_, onExitPod := pod.Labels[common.LabelKeyOnExit]

if !woc.wf.Spec.Shutdown.ShouldExecute(onExitPod) {
woc.log.Infof("Deleting Pending pod %s/%s as part of workflow shutdown with strategy: %s", pod.Namespace, pod.Name, woc.wf.Spec.Shutdown)
if !woc.GetShutdownStrategy().ShouldExecute(onExitPod) {
woc.log.Infof("Deleting Pending pod %s/%s as part of workflow shutdown with strategy: %s", pod.Namespace, pod.Name, woc.GetShutdownStrategy())
err := woc.controller.kubeclientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err == nil {
wfNodesLock.Lock()
defer wfNodesLock.Unlock()
node := woc.wf.Status.Nodes[pod.Name]
woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("workflow shutdown with strategy: %s", woc.execWf.Spec.Shutdown))
woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("workflow shutdown with strategy: %s", woc.GetShutdownStrategy()))
return nil
}
// If we fail to delete the pod, fall back to setting the annotation
Expand Down Expand Up @@ -75,8 +75,8 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1
}

for _, c := range woc.findTemplate(pod).GetMainContainerNames() {
if woc.wf.Spec.Shutdown != "" {
if _, onExitPod := pod.Labels[common.LabelKeyOnExit]; !woc.wf.Spec.Shutdown.ShouldExecute(onExitPod) {
if woc.GetShutdownStrategy().Enabled() {
if _, onExitPod := pod.Labels[common.LabelKeyOnExit]; !woc.GetShutdownStrategy().ShouldExecute(onExitPod) {
podExecCtl.Deadline = &time.Time{}
woc.log.Infof("Applying shutdown deadline for pod %s", pod.Name)
return woc.updateExecutionControl(ctx, pod.Name, podExecCtl, c)
Expand Down
50 changes: 32 additions & 18 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ type wfOperationCtx struct {
// execWf holds the Workflow for use in execution.
// In Normal workflow scenario: It holds copy of workflow object
// In Submit From WorkflowTemplate: It holds merged workflow with WorkflowDefault, Workflow and WorkflowTemplate
// 'execWf.Spec' should usually be used instead `wf.Spec`, with two exceptions for user editable fields:
// 1. `wf.Spec.Suspend`
// 2. `wf.Spec.Shutdown`
// 'execWf.Spec' should usually be used instead `wf.Spec`
execWf *wfv1.Workflow
}

Expand Down Expand Up @@ -299,7 +297,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}
}

if woc.wf.Spec.Suspend != nil && *woc.wf.Spec.Suspend {
if woc.ShouldSuspend() {
woc.log.Infof("workflow suspended")
return
}
Expand Down Expand Up @@ -373,7 +371,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}[node.Phase]

var onExitNode *wfv1.NodeStatus
if woc.execWf.Spec.OnExit != "" && woc.wf.Spec.Shutdown.ShouldExecute(true) {
if woc.execWf.Spec.OnExit != "" && woc.GetShutdownStrategy().ShouldExecute(true) {
woc.globalParams[common.GlobalVarWorkflowStatus] = string(workflowStatus)

var failures []failedNodeStatus
Expand Down Expand Up @@ -412,8 +410,8 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}

var workflowMessage string
if node.FailedOrError() && woc.execWf.Spec.Shutdown != "" {
workflowMessage = fmt.Sprintf("Stopped with strategy '%s'", woc.execWf.Spec.Shutdown)
if node.FailedOrError() && woc.GetShutdownStrategy().Enabled() {
workflowMessage = fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy())
} else {
workflowMessage = node.Message
}
Expand Down Expand Up @@ -753,10 +751,10 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
return woc.markNodePhase(node.Name, wfv1.NodeSucceeded), true, nil
}

if woc.execWf.Spec.Shutdown != "" || (woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)) {
if woc.GetShutdownStrategy().Enabled() || (woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)) {
var message string
if woc.execWf.Spec.Shutdown != "" {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.execWf.Spec.Shutdown)
if woc.GetShutdownStrategy().Enabled() {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy())
} else {
message = fmt.Sprintf("retry exceeded workflow deadline %s", *woc.workflowDeadline)
}
Expand Down Expand Up @@ -1018,12 +1016,12 @@ func (woc *wfOperationCtx) shouldPrintPodSpec(node wfv1.NodeStatus) bool {
// fails any suspended and pending nodes if the workflow deadline has passed
func (woc *wfOperationCtx) failSuspendedAndPendingNodesAfterDeadlineOrShutdown() {
deadlineExceeded := woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)
if woc.execWf.Spec.Shutdown != "" || deadlineExceeded {
if woc.GetShutdownStrategy().Enabled() || deadlineExceeded {
for _, node := range woc.wf.Status.Nodes {
if node.IsActiveSuspendNode() || (node.Phase == wfv1.NodePending && deadlineExceeded) {
var message string
if woc.execWf.Spec.Shutdown != "" {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.execWf.Spec.Shutdown)
if woc.GetShutdownStrategy().Enabled() {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy())
} else {
message = "Step exceeded its deadline"
}
Expand Down Expand Up @@ -2927,7 +2925,7 @@ func (woc *wfOperationCtx) createTemplateContext(scope wfv1.ResourceScope, resou
}

func (woc *wfOperationCtx) runOnExitNode(ctx context.Context, templateRef, parentDisplayName, parentNodeName, boundaryID string, tmplCtx *templateresolution.Context) (bool, *wfv1.NodeStatus, error) {
if templateRef != "" && woc.wf.Spec.Shutdown.ShouldExecute(true) {
if templateRef != "" && woc.GetShutdownStrategy().ShouldExecute(true) {
woc.log.Infof("Running OnExit handler: %s", templateRef)
onExitNodeName := common.GenerateOnExitNodeName(parentDisplayName)
onExitNode, err := woc.executeTemplate(ctx, onExitNodeName, &wfv1.WorkflowStep{Template: templateRef}, tmplCtx, woc.execWf.Spec.Arguments, &executeTemplateOpts{
Expand Down Expand Up @@ -3182,15 +3180,31 @@ func (woc *wfOperationCtx) setExecWorkflow() error {
return nil
}

func (woc *wfOperationCtx) GetShutdownStrategy() wfv1.ShutdownStrategy {
return woc.execWf.Spec.Shutdown
}

func (woc *wfOperationCtx) ShouldSuspend() bool {
return woc.execWf.Spec.Suspend != nil && *woc.execWf.Spec.Suspend
}

func (woc *wfOperationCtx) needsStoredWfSpecUpdate() bool {
// woc.wf.Status.StoredWorkflowSpec.Entrypoint == "" check is mainly to support backward compatible with 2.11.x workflow to 2.12.x
// Need to recalculate StoredWorkflowSpec in 2.12.x format.
// This check can be removed once all user migrated from 2.11.x to 2.12.x
return woc.wf.Status.StoredWorkflowSpec == nil || (woc.wf.Spec.Entrypoint != "" && woc.wf.Status.StoredWorkflowSpec.Entrypoint == "") ||
(woc.wf.Spec.Suspend != nil && woc.wf.Status.StoredWorkflowSpec.Suspend == nil) ||
(woc.wf.Spec.Shutdown != "" && woc.wf.Status.StoredWorkflowSpec.Shutdown == "") ||
(woc.wf.Spec.Shutdown != woc.wf.Status.StoredWorkflowSpec.Shutdown)
}

func (woc *wfOperationCtx) setStoredWfSpec() error {
wfDefault := woc.controller.Config.WorkflowDefaults
if wfDefault == nil {
wfDefault = &wfv1.Workflow{}
}
// woc.wf.Status.StoredWorkflowSpec.Entrypoint == "" check is mainly to support backward compatible with 2.11.x workflow to 2.12.x
// Need to recalculate StoredWorkflowSpec in 2.12.x format.
// This check can be removed once all user migrated from 2.11.x to 2.12.x
if woc.wf.Status.StoredWorkflowSpec == nil || woc.wf.Status.StoredWorkflowSpec.Entrypoint == "" {

if woc.needsStoredWfSpecUpdate() {
wftHolder, err := woc.fetchWorkflowSpec()
if err != nil {
return err
Expand Down
65 changes: 65 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5890,6 +5890,71 @@ func TestWorkflowScheduledTimeVariable(t *testing.T) {
assert.Equal(t, "2006-01-02T15:04:05-07:00", woc.globalParams[common.GlobalVarWorkflowCronScheduleTime])
}

func TestWorkflowShutdownStrategy(t *testing.T) {
wf := unmarshalWF(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: whalesay
namespace: default
spec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["cowsay hellow"]`)

cancel, controller := newController()
defer cancel()
wf1 := wf.DeepCopy()
t.Run("StopStrategy", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)

for _, node := range woc.wf.Status.Nodes {
assert.Equal(t, wfv1.NodePending, node.Phase)
}
// Updating Pod state
makePodsPhase(ctx, woc, apiv1.PodPending)
// Simulate the Stop command
wf1 := woc.wf
wf1.Spec.Shutdown = wfv1.ShutdownStrategyStop
woc1 := newWorkflowOperationCtx(wf1, controller)
woc1.operate(ctx)

node := woc1.wf.Status.Nodes.FindByDisplayName("whalesay")
if assert.NotNil(t, node) {
assert.Contains(t, node.Message, "workflow shutdown with strategy: Stop")
}
})

t.Run("TerminateStrategy", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(wf1, controller)
woc.operate(ctx)

for _, node := range woc.wf.Status.Nodes {
assert.Equal(t, wfv1.NodePending, node.Phase)
}
// Updating Pod state
makePodsPhase(ctx, woc, apiv1.PodPending)
// Simulate the Terminate command
wfOut := woc.wf
wfOut.Spec.Shutdown = wfv1.ShutdownStrategyTerminate
woc1 := newWorkflowOperationCtx(wfOut, controller)
woc1.operate(ctx)
for _, node := range woc1.wf.Status.Nodes {
if assert.NotNil(t, node) {
assert.Contains(t, node.Message, "workflow shutdown with strategy")
assert.Contains(t, node.Message, "Terminate")
}
}
})
}

const resultVarRefWf = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down
74 changes: 74 additions & 0 deletions workflow/controller/operator_workflow_template_ref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
"k8s.io/utils/pointer"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util"
Expand Down Expand Up @@ -283,3 +285,75 @@ func TestWorkflowTemplateRefGetArtifactsFromTemplate(t *testing.T) {
assert.Equal(t, "data-file", woc.execWf.Spec.Arguments.Artifacts[2].Name)
})
}

func TestWorkflowTemplateRefWithShutdownAndSuspend(t *testing.T) {
cancel, controller := newController(unmarshalWF(wfWithTmplRef), unmarshalWFTmpl(wfTmpl))
defer cancel()
t.Run("EntrypointMissingInStoredWfSpec", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(unmarshalWF(wfWithTmplRef), controller)
woc.operate(ctx)
assert.Nil(t, woc.wf.Status.StoredWorkflowSpec.Suspend)
wf1 := woc.wf.DeepCopy()
// Updating Pod state
makePodsPhase(ctx, woc, apiv1.PodPending)
wf1.Status.StoredWorkflowSpec.Entrypoint = ""
woc1 := newWorkflowOperationCtx(wf1, controller)
woc1.operate(ctx)
assert.NotNil(t, woc1.wf.Status.StoredWorkflowSpec.Entrypoint)
assert.Equal(t, woc.wf.Spec.Entrypoint, woc1.wf.Status.StoredWorkflowSpec.Entrypoint)
})

t.Run("WorkflowTemplateRefWithSuspend", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(unmarshalWF(wfWithTmplRef), controller)
woc.operate(ctx)
assert.Nil(t, woc.wf.Status.StoredWorkflowSpec.Suspend)
wf1 := woc.wf.DeepCopy()
// Updating Pod state
makePodsPhase(ctx, woc, apiv1.PodPending)
wf1.Spec.Suspend = pointer.BoolPtr(true)
woc1 := newWorkflowOperationCtx(wf1, controller)
woc1.operate(ctx)
assert.NotNil(t, woc1.wf.Status.StoredWorkflowSpec.Suspend)
assert.True(t, *woc1.wf.Status.StoredWorkflowSpec.Suspend)
})
t.Run("WorkflowTemplateRefWithShutdownTerminate", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(unmarshalWF(wfWithTmplRef), controller)
woc.operate(ctx)
assert.Empty(t, woc.wf.Status.StoredWorkflowSpec.Shutdown)
wf1 := woc.wf.DeepCopy()
// Updating Pod state
makePodsPhase(ctx, woc, apiv1.PodPending)
wf1.Spec.Shutdown = wfv1.ShutdownStrategyTerminate
woc1 := newWorkflowOperationCtx(wf1, controller)
woc1.operate(ctx)
assert.NotEmpty(t, woc1.wf.Status.StoredWorkflowSpec.Shutdown)
assert.Equal(t, wfv1.ShutdownStrategyTerminate, woc1.wf.Status.StoredWorkflowSpec.Shutdown)
for _, node := range woc1.wf.Status.Nodes {
if assert.NotNil(t, node) {
assert.Contains(t, node.Message, "workflow shutdown with strategy: Terminate")
}
}
})
t.Run("WorkflowTemplateRefWithShutdownStop", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(unmarshalWF(wfWithTmplRef), controller)
woc.operate(ctx)
assert.Empty(t, woc.wf.Status.StoredWorkflowSpec.Shutdown)
wf1 := woc.wf.DeepCopy()
// Updating Pod state
makePodsPhase(ctx, woc, apiv1.PodPending)
wf1.Spec.Shutdown = wfv1.ShutdownStrategyStop
woc1 := newWorkflowOperationCtx(wf1, controller)
woc1.operate(ctx)
assert.NotEmpty(t, woc1.wf.Status.StoredWorkflowSpec.Shutdown)
assert.Equal(t, wfv1.ShutdownStrategyStop, woc1.wf.Status.StoredWorkflowSpec.Shutdown)
for _, node := range woc1.wf.Status.Nodes {
if assert.NotNil(t, node) {
assert.Contains(t, node.Message, "workflow shutdown with strategy: Stop")
}
}
})
}
4 changes: 2 additions & 2 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
}
}

if !woc.execWf.Spec.Shutdown.ShouldExecute(opts.onExitPod) {
if !woc.GetShutdownStrategy().ShouldExecute(opts.onExitPod) {
// Do not create pods if we are shutting down
woc.markNodePhase(nodeName, wfv1.NodeSkipped, fmt.Sprintf("workflow shutdown with strategy: %s", woc.execWf.Spec.Shutdown))
woc.markNodePhase(nodeName, wfv1.NodeSkipped, fmt.Sprintf("workflow shutdown with strategy: %s", woc.GetShutdownStrategy()))
return nil, nil
}

Expand Down

0 comments on commit a5d1acc

Please sign in to comment.