diff --git a/workflow/cron/controller.go b/workflow/cron/controller.go index fa14c24291ba..cc65b4f60e43 100644 --- a/workflow/cron/controller.go +++ b/workflow/cron/controller.go @@ -4,10 +4,8 @@ import ( "context" "fmt" "reflect" - "sync" "time" - "github.com/robfig/cron/v3" log "github.com/sirupsen/logrus" apiv1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -39,8 +37,7 @@ type Controller struct { namespace string managedNamespace string instanceId string - cron *cron.Cron - nameEntryIDMap sync.Map + cron *cronFacade wfClientset versioned.Interface wfInformer cache.SharedIndexInformer wfLister util.WorkflowLister @@ -65,10 +62,9 @@ func NewCronController(wfclientset versioned.Interface, restConfig *rest.Config, namespace: namespace, managedNamespace: managedNamespace, instanceId: instanceId, - cron: cron.New(), + cron: newCronFacade(), restConfig: restConfig, dynamicInterface: dynamicInterface, - nameEntryIDMap: sync.Map{}, wfQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "wf_cron_queue"), cronWfQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cron_wf_queue"), metrics: metrics, @@ -133,11 +129,8 @@ func (cc *Controller) processNextCronItem() bool { return true } if !exists { - if entryId, ok := cc.nameEntryIDMap.Load(key.(string)); ok { - logCtx.Infof("Deleting '%s'", key) - cc.cron.Remove(entryId.(cron.EntryID)) - cc.nameEntryIDMap.Delete(key.(string)) - } + logCtx.Infof("Deleting '%s'", key) + cc.cron.Delete(key.(string)) return true } @@ -169,22 +162,18 @@ func (cc *Controller) processNextCronItem() bool { } // The job is currently scheduled, remove it and re add it. - if entryId, ok := cc.nameEntryIDMap.Load(key.(string)); ok { - cc.cron.Remove(entryId.(cron.EntryID)) - cc.nameEntryIDMap.Delete(key.(string)) - } + cc.cron.Delete(key.(string)) cronSchedule := cronWf.Spec.Schedule if cronWf.Spec.Timezone != "" { cronSchedule = "CRON_TZ=" + cronWf.Spec.Timezone + " " + cronSchedule } - entryId, err := cc.cron.AddJob(cronSchedule, cronWorkflowOperationCtx) + err = cc.cron.AddJob(key.(string), cronSchedule, cronWorkflowOperationCtx) if err != nil { logCtx.WithError(err).Error("could not schedule CronWorkflow") return true } - cc.nameEntryIDMap.Store(key.(string), entryId) logCtx.Infof("CronWorkflow %s added", key.(string)) @@ -237,15 +226,9 @@ func (cc *Controller) processNextWorkflowItem() bool { // Workflows are run in the same namespace as CronWorkflow nameEntryIdMapKey := wf.Namespace + "/" + wf.OwnerReferences[0].Name - var woc *cronWfOperationCtx - if entryId, ok := cc.nameEntryIDMap.Load(nameEntryIdMapKey); ok { - woc, ok = cc.cron.Entry(entryId.(cron.EntryID)).Job.(*cronWfOperationCtx) - if !ok { - log.Errorf("Parent CronWorkflow '%s' is malformed", nameEntryIdMapKey) - return true - } - } else { - log.Warnf("Parent CronWorkflow '%s' no longer exists", nameEntryIdMapKey) + woc, err := cc.cron.Load(nameEntryIdMapKey) + if err != nil { + log.Warnf("Parent CronWorkflow '%s' is bad: %v", nameEntryIdMapKey, err) return true } diff --git a/workflow/cron/cron_facade.go b/workflow/cron/cron_facade.go new file mode 100644 index 000000000000..d070f34e138d --- /dev/null +++ b/workflow/cron/cron_facade.go @@ -0,0 +1,69 @@ +package cron + +import ( + "fmt" + "reflect" + "sync" + + "github.com/robfig/cron/v3" +) + +// cronFacade allows the client to operate using key rather than cron.EntryID, +// as well as providing sync guarantees +type cronFacade struct { + mu sync.Mutex + cron *cron.Cron + entryIDs map[string]cron.EntryID +} + +func newCronFacade() *cronFacade { + return &cronFacade{ + cron: cron.New(), + entryIDs: make(map[string]cron.EntryID), + } +} + +func (f *cronFacade) Start() { + f.cron.Start() +} + +func (f *cronFacade) Stop() { + f.cron.Stop() +} + +func (f *cronFacade) Delete(key string) { + f.mu.Lock() + defer f.mu.Unlock() + entryID, ok := f.entryIDs[key] + if !ok { + return + } + f.cron.Remove(entryID) + delete(f.entryIDs, key) +} + +func (f *cronFacade) AddJob(key, schedule string, cwoc *cronWfOperationCtx) error { + f.mu.Lock() + defer f.mu.Unlock() + entryID, err := f.cron.AddJob(schedule, cwoc) + if err != nil { + return err + } + f.entryIDs[key] = entryID + return nil +} + +func (f *cronFacade) Load(key string) (*cronWfOperationCtx, error) { + f.mu.Lock() + defer f.mu.Unlock() + entryID, ok := f.entryIDs[key] + if !ok { + return nil, fmt.Errorf("entry ID for %s not found", key) + } + entry := f.cron.Entry(entryID).Job + cwoc, ok := entry.(*cronWfOperationCtx) + if !ok { + return nil, fmt.Errorf("job entry ID for %s was not a *cronWfOperationCtx, was %v", key, reflect.TypeOf(entry)) + } + return cwoc, nil +}