Skip to content

Commit

Permalink
fix(controller): Only warn if cron job missing. Fixes argoproj#4351 (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Oct 23, 2020
1 parent dbbe95c commit 220ac73
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 26 deletions.
35 changes: 9 additions & 26 deletions workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"context"
"fmt"
"reflect"
"sync"
"time"

"github.com/robfig/cron/v3"
log "github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -39,8 +37,7 @@ type Controller struct {
namespace string
managedNamespace string
instanceId string
cron *cron.Cron
nameEntryIDMap sync.Map
cron *cronFacade
wfClientset versioned.Interface
wfInformer cache.SharedIndexInformer
wfLister util.WorkflowLister
Expand All @@ -65,10 +62,9 @@ func NewCronController(wfclientset versioned.Interface, restConfig *rest.Config,
namespace: namespace,
managedNamespace: managedNamespace,
instanceId: instanceId,
cron: cron.New(),
cron: newCronFacade(),
restConfig: restConfig,
dynamicInterface: dynamicInterface,
nameEntryIDMap: sync.Map{},
wfQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "wf_cron_queue"),
cronWfQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cron_wf_queue"),
metrics: metrics,
Expand Down Expand Up @@ -133,11 +129,8 @@ func (cc *Controller) processNextCronItem() bool {
return true
}
if !exists {
if entryId, ok := cc.nameEntryIDMap.Load(key.(string)); ok {
logCtx.Infof("Deleting '%s'", key)
cc.cron.Remove(entryId.(cron.EntryID))
cc.nameEntryIDMap.Delete(key.(string))
}
logCtx.Infof("Deleting '%s'", key)
cc.cron.Delete(key.(string))
return true
}

Expand Down Expand Up @@ -169,22 +162,18 @@ func (cc *Controller) processNextCronItem() bool {
}

// The job is currently scheduled, remove it and re add it.
if entryId, ok := cc.nameEntryIDMap.Load(key.(string)); ok {
cc.cron.Remove(entryId.(cron.EntryID))
cc.nameEntryIDMap.Delete(key.(string))
}
cc.cron.Delete(key.(string))

cronSchedule := cronWf.Spec.Schedule
if cronWf.Spec.Timezone != "" {
cronSchedule = "CRON_TZ=" + cronWf.Spec.Timezone + " " + cronSchedule
}

entryId, err := cc.cron.AddJob(cronSchedule, cronWorkflowOperationCtx)
err = cc.cron.AddJob(key.(string), cronSchedule, cronWorkflowOperationCtx)
if err != nil {
logCtx.WithError(err).Error("could not schedule CronWorkflow")
return true
}
cc.nameEntryIDMap.Store(key.(string), entryId)

logCtx.Infof("CronWorkflow %s added", key.(string))

Expand Down Expand Up @@ -237,15 +226,9 @@ func (cc *Controller) processNextWorkflowItem() bool {

// Workflows are run in the same namespace as CronWorkflow
nameEntryIdMapKey := wf.Namespace + "/" + wf.OwnerReferences[0].Name
var woc *cronWfOperationCtx
if entryId, ok := cc.nameEntryIDMap.Load(nameEntryIdMapKey); ok {
woc, ok = cc.cron.Entry(entryId.(cron.EntryID)).Job.(*cronWfOperationCtx)
if !ok {
log.Errorf("Parent CronWorkflow '%s' is malformed", nameEntryIdMapKey)
return true
}
} else {
log.Warnf("Parent CronWorkflow '%s' no longer exists", nameEntryIdMapKey)
woc, err := cc.cron.Load(nameEntryIdMapKey)
if err != nil {
log.Warnf("Parent CronWorkflow '%s' is bad: %v", nameEntryIdMapKey, err)
return true
}

Expand Down
69 changes: 69 additions & 0 deletions workflow/cron/cron_facade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package cron

import (
"fmt"
"reflect"
"sync"

"github.com/robfig/cron/v3"
)

// cronFacade allows the client to operate using key rather than cron.EntryID,
// as well as providing sync guarantees
type cronFacade struct {
mu sync.Mutex
cron *cron.Cron
entryIDs map[string]cron.EntryID
}

func newCronFacade() *cronFacade {
return &cronFacade{
cron: cron.New(),
entryIDs: make(map[string]cron.EntryID),
}
}

func (f *cronFacade) Start() {
f.cron.Start()
}

func (f *cronFacade) Stop() {
f.cron.Stop()
}

func (f *cronFacade) Delete(key string) {
f.mu.Lock()
defer f.mu.Unlock()
entryID, ok := f.entryIDs[key]
if !ok {
return
}
f.cron.Remove(entryID)
delete(f.entryIDs, key)
}

func (f *cronFacade) AddJob(key, schedule string, cwoc *cronWfOperationCtx) error {
f.mu.Lock()
defer f.mu.Unlock()
entryID, err := f.cron.AddJob(schedule, cwoc)
if err != nil {
return err
}
f.entryIDs[key] = entryID
return nil
}

func (f *cronFacade) Load(key string) (*cronWfOperationCtx, error) {
f.mu.Lock()
defer f.mu.Unlock()
entryID, ok := f.entryIDs[key]
if !ok {
return nil, fmt.Errorf("entry ID for %s not found", key)
}
entry := f.cron.Entry(entryID).Job
cwoc, ok := entry.(*cronWfOperationCtx)
if !ok {
return nil, fmt.Errorf("job entry ID for %s was not a *cronWfOperationCtx, was %v", key, reflect.TypeOf(entry))
}
return cwoc, nil
}

0 comments on commit 220ac73

Please sign in to comment.