Skip to content

Commit

Permalink
fix: Fix the Status update for node with synchronization lock (argopr…
Browse files Browse the repository at this point in the history
…oj#6525)

* fix: Fix the Status update for node with synchronization lock

Signed-off-by: Saravanan Balasubramanian <[email protected]>

* update

Signed-off-by: Saravanan Balasubramanian <[email protected]>
  • Loading branch information
sarabala1979 committed Aug 11, 2021
1 parent a384603 commit 903ce68
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 1 deletion.
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1731,7 +1731,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
}
}

woc.updated = wfUpdated
woc.updated = woc.updated || wfUpdated
}
// If the user has specified retries, node becomes a special retry node.
// This node acts as a parent of all retries that will be done for
Expand Down
73 changes: 73 additions & 0 deletions workflow/controller/operator_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,3 +687,76 @@ func TestSynchronizationWithStep(t *testing.T) {
assert.Len(woc1.wf.Status.Synchronization.Semaphore.Holding, 1)
})
}

const wfWithStepRetry = `apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: my-workflow-
spec:
entrypoint: step-entry
templates:
- name: step-entry
steps:
- - name: step1
template: sleep
- name: sleep
retryStrategy:
limit: 5
retryPolicy: Always
synchronization:
semaphore:
configMapKeyRef:
name: my-config
key: template
container:
image: alpine:3.6
command: [sh, -c]
args: ["sleep 300"]`

func TestSynchronizationWithStepRetry(t *testing.T) {
assert := assert.New(t)
cancel, controller := newController()
defer cancel()
ctx := context.Background()
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) {
}, workflowExistenceFunc)
var cm v1.ConfigMap
wfv1.MustUnmarshal([]byte(configMap), &cm)
_, err := controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &cm, metav1.CreateOptions{})
assert.NoError(err)

t.Run("StepRetryWithSynchronization", func(t *testing.T) {
// First workflow Acquire the lock
wf := wfv1.MustUnmarshalWorkflow(wfWithStepRetry)
wf, err := controller.wfclientset.ArgoprojV1alpha1().Workflows("default").Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(err)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
for _, n := range woc.wf.Status.Nodes {
if n.Name == "[0].step1(0)" {
assert.Equal(n.Phase, wfv1.NodePending)
}
}
// Updating Pod state
makePodsPhase(ctx, woc, v1.PodRunning)

woc.operate(ctx)
for _, n := range woc.wf.Status.Nodes {
if n.Name == "[0].step1(0)" {
assert.Equal(n.Phase, wfv1.NodeRunning)
}
}
makePodsPhase(ctx, woc, v1.PodFailed)
woc.operate(ctx)
for _, n := range woc.wf.Status.Nodes {
if n.Name == "[0].step1(0)" {
assert.Equal(n.Phase, wfv1.NodeFailed)
}
if n.Name == "[0].step1(1)" {
assert.Equal(n.Phase, wfv1.NodePending)
}
}
})

}

0 comments on commit 903ce68

Please sign in to comment.