Skip to content

Commit

Permalink
fix(controller): Use node.Name instead of node.DisplayName for onExit…
Browse files Browse the repository at this point in the history
… nodes (argoproj#5486)

Signed-off-by: Simon Behar <[email protected]>
  • Loading branch information
simster7 committed Mar 24, 2021
1 parent 80cea6a commit d964fe4
Show file tree
Hide file tree
Showing 8 changed files with 301 additions and 7 deletions.
9 changes: 9 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1806,6 +1806,15 @@ func (n NodeStatus) GetDuration() time.Duration {
return n.FinishedAt.Sub(n.StartedAt.Time)
}

func (n NodeStatus) HasChild(childID string) bool {
for _, nodeID := range n.Children {
if childID == nodeID {
return true
}
}
return false
}

// S3Bucket contains the access information required for interfacing with an S3 bucket
type S3Bucket struct {
// Endpoint is the hostname of the bucket endpoint
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,3 +825,12 @@ func TestTemplate_ExcludeTemplateTypes(t *testing.T) {
assert.Nil(t, suspendTmpl.Data)
})
}

func TestHasChild(t *testing.T) {
node := NodeStatus{
Children: []string{"a", "b"},
}
assert.True(t, node.HasChild("a"))
assert.False(t, node.HasChild("c"))
assert.False(t, node.HasChild(""))
}
4 changes: 2 additions & 2 deletions workflow/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,6 @@ func GetTemplateHolderString(tmplHolder wfv1.TemplateReferenceHolder) string {
}
}

func GenerateOnExitNodeName(parentDisplayName string) string {
return fmt.Sprintf("%s.onExit", parentDisplayName)
func GenerateOnExitNodeName(parentNodeName string) string {
return fmt.Sprintf("%s.onExit", parentNodeName)
}
25 changes: 25 additions & 0 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,31 @@ func withAnnotation(key, val string) with {
return func(pod *apiv1.Pod) { pod.Annotations[key] = val }
}

// createRunningPods creates the pods that are marked as running in a given test so that they can be accessed by the
// pod assessor
func createRunningPods(ctx context.Context, woc *wfOperationCtx) {
podcs := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.GetNamespace())
for _, node := range woc.wf.Status.Nodes {
if node.Type == wfv1.NodeTypePod && node.Phase == wfv1.NodeRunning {
pod, _ := podcs.Create(ctx, &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: node.ID,
Annotations: map[string]string{
"workflows.argoproj.io/node-name": node.Name,
},
Labels: map[string]string{
"workflows.argoproj.io/workflow": woc.wf.Name,
},
},
Status: apiv1.PodStatus{
Phase: apiv1.PodRunning,
},
}, metav1.CreateOptions{})
_ = woc.controller.podInformer.GetStore().Add(pod)
}
}
}

// makePodsPhase acts like a pod controller and simulates the transition of pods transitioning into a specified state
func makePodsPhase(ctx context.Context, woc *wfOperationCtx, phase apiv1.PodPhase, with ...with) {
podcs := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.GetNamespace())
Expand Down
15 changes: 14 additions & 1 deletion workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,20 @@ func (d *dagContext) evaluateDependsLogic(taskName string) (bool, bool, error) {
}

// If a task happens to have an onExit node, don't proceed until the onExit node is fulfilled
if onExitNode := d.wf.GetNodeByName(common.GenerateOnExitNodeName(taskName)); onExitNode != nil {
if onExitNode := d.wf.GetNodeByName(common.GenerateOnExitNodeName(depNode.Name)); onExitNode != nil {
if !onExitNode.Fulfilled() {
return false, false, nil
}
}

// Previously we used `depNode.DisplayName` to generate all onExit node names. However, as these can be non-unique
// we transitioned to using `depNode.Name` instead, which are guaranteed to be unique. In order to not disrupt
// running workflows during upgrade time, we do an additional check to see if there is an onExit node with the old
// name (`depNode.DisplayName`).
// TODO: This scaffold code should be removed after a couple of "grace period" version upgrades to allow transitions. It was introduced in v3.0.0
// See more: https://github.com/argoproj/argo-workflows/issues/5502
legacyOnExitNodeName := common.GenerateOnExitNodeName(depNode.DisplayName)
if onExitNode := d.wf.GetNodeByName(legacyOnExitNodeName); onExitNode != nil && d.wf.GetNodeByName(depNode.Name).HasChild(onExitNode.ID) {
if !onExitNode.Fulfilled() {
return false, false, nil
}
Expand Down
6 changes: 3 additions & 3 deletions workflow/controller/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1802,7 +1802,7 @@ func TestOnExitDAGPhase(t *testing.T) {
assert.Equal(t, wfv1.NodeRunning, retryNode.Phase)
}

retryNode = woc.wf.GetNodeByName("B.onExit")
retryNode = woc.wf.GetNodeByName("dag-diamond-88trp.B.onExit")
if assert.NotNil(t, retryNode) {
assert.Equal(t, wfv1.NodePending, retryNode.Phase)
}
Expand Down Expand Up @@ -1927,7 +1927,7 @@ func TestOnExitNonLeaf(t *testing.T) {
woc := newWorkflowOperationCtx(wf, controller)

woc.operate(ctx)
retryNode := woc.wf.GetNodeByName("step-2.onExit")
retryNode := woc.wf.GetNodeByName("exit-handler-bug-example.step-2.onExit")
if assert.NotNil(t, retryNode) {
assert.Equal(t, wfv1.NodePending, retryNode.Phase)
}
Expand Down Expand Up @@ -2195,7 +2195,7 @@ func TestDagTargetTaskOnExit(t *testing.T) {
woc := newWorkflowOperationCtx(wf, controller)

woc.operate(ctx)
onExitNode := woc.wf.GetNodeByName("A.onExit")
onExitNode := woc.wf.GetNodeByName("dag-primay-branch-6bnnl.A.onExit")
if assert.NotNil(t, onExitNode) {
assert.Equal(t, wfv1.NodePending, onExitNode.Phase)
}
Expand Down
19 changes: 18 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2940,7 +2940,24 @@ func (woc *wfOperationCtx) createTemplateContext(scope wfv1.ResourceScope, resou
func (woc *wfOperationCtx) runOnExitNode(ctx context.Context, templateRef, parentDisplayName, parentNodeName, boundaryID string, tmplCtx *templateresolution.Context) (bool, *wfv1.NodeStatus, error) {
if templateRef != "" && woc.GetShutdownStrategy().ShouldExecute(true) {
woc.log.Infof("Running OnExit handler: %s", templateRef)
onExitNodeName := common.GenerateOnExitNodeName(parentDisplayName)

// Previously we used `parentDisplayName` to generate all onExit node names. However, as these can be non-unique
// we transitioned to using `parentNodeName` instead, which are guaranteed to be unique. In order to not disrupt
// running workflows during upgrade time, we first check if there is an onExit node that currently exists with the
// legacy name AND said node is a child of the parent node. If it does, we continue execution with the legacy name.
// If it doesn't, we use the new (and unique) name for all operations henceforth.
// TODO: This scaffold code should be removed after a couple of "grace period" version upgrades to allow transitions. It was introduced in v3.0.0
// When the scaffold code is removed, we should only have the following:
//
// onExitNodeName := common.GenerateOnExitNodeName(parentNodeName)
//
// See more: https://github.com/argoproj/argo-workflows/issues/5502
onExitNodeName := common.GenerateOnExitNodeName(parentNodeName)
legacyOnExitNodeName := common.GenerateOnExitNodeName(parentDisplayName)
if legacyNameNode := woc.wf.GetNodeByName(legacyOnExitNodeName); legacyNameNode != nil && woc.wf.GetNodeByName(parentNodeName).HasChild(legacyNameNode.ID) {
onExitNodeName = legacyOnExitNodeName
}

onExitNode, err := woc.executeTemplate(ctx, onExitNodeName, &wfv1.WorkflowStep{Template: templateRef}, tmplCtx, woc.execWf.Spec.Arguments, &executeTemplateOpts{
boundaryID: boundaryID,
onExitTemplate: true,
Expand Down
221 changes: 221 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6374,3 +6374,224 @@ func TestRootRetryStrategyCompletes(t *testing.T) {

assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

const testOnExitNameBackwardsCompatibility = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hello-world-69h5d
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: run
onExit: pass
template: pass
- container:
args:
- exit 0
command:
- sh
- -c
image: alpine
name: pass
ttlStrategy:
secondsAfterCompletion: 600
status:
nodes:
hello-world-69h5d:
children:
- hello-world-69h5d-4087924081
displayName: hello-world-69h5d
finishedAt: "2021-03-24T14:53:32Z"
id: hello-world-69h5d
name: hello-world-69h5d
outboundNodes:
- hello-world-69h5d-928074325
phase: Running
startedAt: "2021-03-24T14:53:18Z"
templateName: main
templateScope: local/hello-world-69h5d
type: Steps
hello-world-69h5d-928074325:
boundaryID: hello-world-69h5d
displayName: run.onExit
finishedAt: "2021-03-24T14:53:31Z"
hostNodeName: k3d-k3s-default-server-0
id: hello-world-69h5d-928074325
name: run.onExit
phase: Running
startedAt: "2021-03-24T14:53:25Z"
templateName: pass
templateScope: local/hello-world-69h5d
type: Pod
hello-world-69h5d-2500098386:
boundaryID: hello-world-69h5d
children:
- hello-world-69h5d-928074325
displayName: run
finishedAt: "2021-03-24T14:53:24Z"
hostNodeName: k3d-k3s-default-server-0
id: hello-world-69h5d-2500098386
name: hello-world-69h5d[0].run
phase: Succeeded
startedAt: "2021-03-24T14:53:18Z"
templateName: pass
templateScope: local/hello-world-69h5d
type: Pod
hello-world-69h5d-4087924081:
boundaryID: hello-world-69h5d
children:
- hello-world-69h5d-2500098386
displayName: '[0]'
finishedAt: "2021-03-24T14:53:32Z"
id: hello-world-69h5d-4087924081
name: hello-world-69h5d[0]
phase: Running
startedAt: "2021-03-24T14:53:18Z"
templateScope: local/hello-world-69h5d
type: StepGroup
phase: Running
startedAt: "2021-03-24T14:53:18Z"
`

// Previously we used `parentNodeDisplayName` to generate all onExit node names. However, as these can be non-unique
// we transitioned to using `parentNodeName` instead, which are guaranteed to be unique. In order to not disrupt
// running workflows during upgrade time, we first check if there is an onExit node that currently exists with the
// legacy name AND said node is a child of the parent node. If it does, we continue execution with the legacy name.
// If it doesn't, we use the new (and unique) name for all operations henceforth.
//
// Here we test to see if this backwards compatibility works. This test workflow contains a running onExit node with the
// old name. When we call operate on it, we should NOT create another onExit node with the new name and instead respect
// the old onExit node.
//
// TODO: This test should be removed after a couple of "grace period" version upgrades to allow transitions. It was introduced in v3.0.0
// See more: https://github.com/argoproj/argo-workflows/issues/5502
func TestOnExitNameBackwardsCompatibility(t *testing.T) {
wf := unmarshalWF(testOnExitNameBackwardsCompatibility)
cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)

createRunningPods(ctx, woc)

nodesBeforeOperation := len(woc.wf.Status.Nodes)
woc.operate(ctx)

assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
// Number of nodes should not change (no new name node was created)
assert.Equal(t, nodesBeforeOperation, len(woc.wf.Status.Nodes))
node := woc.wf.Status.Nodes.FindByDisplayName("run.onExit")
if assert.NotNil(t, node) {
assert.Equal(t, wfv1.NodeRunning, node.Phase)
}
}

const testOnExitDAGStatusCompatibility = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: dag-diamond-8xw8l
spec:
entrypoint: diamond
templates:
- dag:
tasks:
- name: A
onExit: echo
template: echo
- depends: A
name: B
template: echo
name: diamond
- container:
command:
- echo
- hi
image: alpine:3.7
name: echo
status:
nodes:
dag-diamond-8xw8l:
children:
- dag-diamond-8xw8l-1488416551
displayName: dag-diamond-8xw8l
finishedAt: "2021-03-24T15:37:06Z"
id: dag-diamond-8xw8l
name: dag-diamond-8xw8l
outboundNodes:
- dag-diamond-8xw8l-1505194170
phase: Running
startedAt: "2021-03-24T15:36:47Z"
templateName: diamond
templateScope: local/dag-diamond-8xw8l
type: DAG
dag-diamond-8xw8l-1342580575:
boundaryID: dag-diamond-8xw8l
displayName: A.onExit
finishedAt: "2021-03-24T15:36:59Z"
hostNodeName: k3d-k3s-default-server-0
id: dag-diamond-8xw8l-1342580575
name: A.onExit
phase: Running
startedAt: "2021-03-24T15:36:54Z"
templateName: echo
templateScope: local/dag-diamond-8xw8l
type: Pod
dag-diamond-8xw8l-1488416551:
boundaryID: dag-diamond-8xw8l
children:
- dag-diamond-8xw8l-1342580575
displayName: A
finishedAt: "2021-03-24T15:36:53Z"
hostNodeName: k3d-k3s-default-server-0
id: dag-diamond-8xw8l-1488416551
name: dag-diamond-8xw8l.A
phase: Succeeded
startedAt: "2021-03-24T15:36:47Z"
templateName: echo
templateScope: local/dag-diamond-8xw8l
type: Pod
phase: Running
startedAt: "2021-03-24T15:36:47Z"
`

// Previously we used `parentNodeDisplayName` to generate all onExit node names. However, as these can be non-unique
// we transitioned to using `parentNodeName` instead, which are guaranteed to be unique. In order to not disrupt
// running workflows during upgrade time, we first check if there is an onExit node that currently exists with the
// legacy name AND said node is a child of the parent node. If it does, we continue execution with the legacy name.
// If it doesn't, we use the new (and unique) name for all operations henceforth.
//
// Here we test to see if this backwards compatibility works. This test workflow contains a running onExit node with the
// old name. When we call operate on it, we should NOT create the subsequent DAG done ("B") until the onExit node name with
// the old name finishes running.
//
// TODO: This test should be removed after a couple of "grace period" version upgrades to allow transitions. It was introduced in v3.0.0
// See more: https://github.com/argoproj/argo-workflows/issues/5502
func TestOnExitDAGStatusCompatibility(t *testing.T) {
wf := unmarshalWF(testOnExitDAGStatusCompatibility)
cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)

createRunningPods(ctx, woc)

nodesBeforeOperation := len(woc.wf.Status.Nodes)
woc.operate(ctx)

assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
// Number of nodes should not change (no new name node was created)
assert.Equal(t, nodesBeforeOperation, len(woc.wf.Status.Nodes))
node := woc.wf.Status.Nodes.FindByDisplayName("A.onExit")
if assert.NotNil(t, node) {
assert.Equal(t, wfv1.NodeRunning, node.Phase)
}

nodeB := woc.wf.Status.Nodes.FindByDisplayName("B")
assert.Nil(t, nodeB)
}

0 comments on commit d964fe4

Please sign in to comment.