Skip to content

Commit

Permalink
fix(controller): Cleanup the synchronize pending queue once Workflow …
Browse files Browse the repository at this point in the history
…deleted (argoproj#4664)
  • Loading branch information
sarabala1979 committed Dec 8, 2020
1 parent 7055420 commit 4f9fab9
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 0 deletions.
20 changes: 20 additions & 0 deletions workflow/sync/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool {
log.Infof("%s released a lock from %s", resourceKey, holding.Semaphore)
}
}

// Remove the pending Workflow level semaphore keys
for _, waiting := range wf.Status.Synchronization.Semaphore.Waiting {
syncLockHolder := cm.syncLockMap[waiting.Semaphore]
if syncLockHolder == nil {
continue
}
resourceKey := getResourceKey(wf.Namespace, wf.Name, wf.Name)
syncLockHolder.removeFromQueue(resourceKey)
}
wf.Status.Synchronization.Semaphore = nil
}

Expand All @@ -198,6 +208,16 @@ func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool {
wf.Status.Synchronization.Mutex.LockReleased(holding.Holder, holding.Mutex)
log.Infof("%s released a lock from %s", resourceKey, holding.Mutex)
}

// Remove the pending Workflow level mutex keys
for _, waiting := range wf.Status.Synchronization.Mutex.Waiting {
syncLockHolder := cm.syncLockMap[waiting.Mutex]
if syncLockHolder == nil {
continue
}
resourceKey := getResourceKey(wf.Namespace, wf.Name, wf.Name)
syncLockHolder.removeFromQueue(resourceKey)
}
wf.Status.Synchronization.Mutex = nil
}

Expand Down
73 changes: 73 additions & 0 deletions workflow/sync/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,24 @@ status:
phase: Running
startedAt: "2020-06-04T19:55:11Z"
`
const wfWithMutex = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hello-world
namespace: default
spec:
entrypoint: whalesay
synchronization:
mutex:
name: my-mutex
templates:
- name: whalesay
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
`

func unmarshalWF(yamlStr string) *wfv1.Workflow {
var wf wfv1.Workflow
Expand Down Expand Up @@ -423,6 +441,14 @@ func TestSemaphoreWfLevel(t *testing.T) {

concurrenyMgr.ReleaseAll(wf2)
assert.Nil(t, wf2.Status.Synchronization)

sema := concurrenyMgr.syncLockMap["default/ConfigMap/my-config/workflow"].(*PrioritySemaphore)
assert.NotNil(t, sema)
assert.Len(t, sema.pending.items, 2)
concurrenyMgr.ReleaseAll(wf1)
assert.Len(t, sema.pending.items, 1)
concurrenyMgr.ReleaseAll(wf3)
assert.Len(t, sema.pending.items, 0)
})
}

Expand Down Expand Up @@ -589,3 +615,50 @@ func TestTriggerWFWithAvailableLock(t *testing.T) {
assert.Equal(2, triggerCount)
})
}

func TestMutexWfLevel(t *testing.T) {
kube := fake.NewSimpleClientset()
syncLimitFunc := GetSyncLimitFunc(kube)
t.Run("WorkflowLevelMutexAcquireAndRelease", func(t *testing.T) {
//var nextKey string
concurrenyMgr := NewLockManager(syncLimitFunc, func(key string) {
//nextKey = key
})
wf := unmarshalWF(wfWithMutex)
wf1 := wf.DeepCopy()
wf2 := wf.DeepCopy()

status, wfUpdate, msg, err := concurrenyMgr.TryAcquire(wf, "", wf.Spec.Synchronization)
assert.NoError(t, err)
assert.Empty(t, msg)
assert.True(t, status)
assert.True(t, wfUpdate)
assert.NotNil(t, wf.Status.Synchronization)
assert.NotNil(t, wf.Status.Synchronization.Mutex)
assert.NotNil(t, wf.Status.Synchronization.Mutex.Holding)

wf1.Name = "two"
status, wfUpdate, msg, err = concurrenyMgr.TryAcquire(wf1, "", wf1.Spec.Synchronization)
assert.NoError(t, err)
assert.NotEmpty(t, msg)
assert.False(t, status)
assert.True(t, wfUpdate)

wf2.Name = "three"
status, wfUpdate, msg, err = concurrenyMgr.TryAcquire(wf2, "", wf2.Spec.Synchronization)
assert.NoError(t, err)
assert.NotEmpty(t, msg)
assert.False(t, status)
assert.True(t, wfUpdate)

mutex := concurrenyMgr.syncLockMap["default/Mutex/my-mutex"].(*PriorityMutex)
assert.NotNil(t, mutex)
assert.Len(t, mutex.mutex.pending.items, 2)
concurrenyMgr.ReleaseAll(wf1)
assert.Len(t, mutex.mutex.pending.items, 1)
concurrenyMgr.ReleaseAll(wf2)
assert.Len(t, mutex.mutex.pending.items, 0)

})

}

0 comments on commit 4f9fab9

Please sign in to comment.