Skip to content

Commit

Permalink
fix: Resolve inconsistent CronWorkflow persistence (argoproj#4440)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Behar <[email protected]>
  • Loading branch information
simster7 committed Nov 4, 2020
1 parent 76887cf commit f4f68a7
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 152 deletions.
5 changes: 5 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2505,6 +2505,11 @@
"io.argoproj.workflow.v1alpha1.CronWorkflowStatus": {
"description": "CronWorkflowStatus is the status of a CronWorkflow",
"type": "object",
"required": [
"active",
"lastScheduledTime",
"conditions"
],
"properties": {
"active": {
"description": "Active is a list of active workflows stemming from this CronWorkflow",
Expand Down
4 changes: 4 additions & 0 deletions manifests/base/crds/full/argoproj.io_cronworkflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6751,6 +6751,10 @@ spec:
lastScheduledTime:
format: date-time
type: string
required:
- active
- conditions
- lastScheduledTime
type: object
required:
- metadata
Expand Down
6 changes: 3 additions & 3 deletions pkg/apis/workflow/v1alpha1/cron_workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ type CronWorkflowSpec struct {
// CronWorkflowStatus is the status of a CronWorkflow
type CronWorkflowStatus struct {
// Active is a list of active workflows stemming from this CronWorkflow
Active []v1.ObjectReference `json:"active,omitempty" protobuf:"bytes,1,rep,name=active"`
Active []v1.ObjectReference `json:"active" protobuf:"bytes,1,rep,name=active"`
// LastScheduleTime is the last time the CronWorkflow was scheduled
LastScheduledTime *metav1.Time `json:"lastScheduledTime,omitempty" protobuf:"bytes,2,opt,name=lastScheduledTime"`
LastScheduledTime *metav1.Time `json:"lastScheduledTime" protobuf:"bytes,2,opt,name=lastScheduledTime"`
// Conditions is a list of conditions the CronWorkflow may have
Conditions Conditions `json:"conditions,omitempty" protobuf:"bytes,3,rep,name=conditions"`
Conditions Conditions `json:"conditions" protobuf:"bytes,3,rep,name=conditions"`
}

const (
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

180 changes: 79 additions & 101 deletions workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"reflect"
"time"

"github.com/argoproj/pkg/sync"
log "github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -14,6 +15,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
Expand All @@ -37,8 +39,8 @@ type Controller struct {
managedNamespace string
instanceId string
cron *cronFacade
keyLock sync.KeyLock
wfClientset versioned.Interface
wfInformer cache.SharedIndexInformer
wfLister util.WorkflowLister
wfQueue workqueue.RateLimitingInterface
cronWfInformer informers.GenericInformer
Expand All @@ -50,9 +52,8 @@ type Controller struct {
}

const (
cronWorkflowResyncPeriod = 20 * time.Minute
cronWorkflowWorkers = 8
cronWorkflowWorkflowWorkers = 8
cronWorkflowResyncPeriod = 20 * time.Minute
cronWorkflowWorkers = 8
)

func NewCronController(wfclientset versioned.Interface, restConfig *rest.Config, dynamicInterface dynamic.Interface, namespace string, managedNamespace string, instanceId string, metrics *metrics.Metrics, eventRecorderManager events.EventRecorderManager) *Controller {
Expand All @@ -62,6 +63,7 @@ func NewCronController(wfclientset versioned.Interface, restConfig *rest.Config,
managedNamespace: managedNamespace,
instanceId: instanceId,
cron: newCronFacade(),
keyLock: sync.NewKeyLock(),
restConfig: restConfig,
dynamicInterface: dynamicInterface,
wfQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "wf_cron_queue"),
Expand All @@ -84,27 +86,23 @@ func (cc *Controller) Run(ctx context.Context) {
}).ForResource(schema.GroupVersionResource{Group: workflow.Group, Version: workflow.Version, Resource: workflow.CronWorkflowPlural})
cc.addCronWorkflowInformerHandler()

cc.wfInformer = util.NewWorkflowInformer(cc.dynamicInterface, cc.managedNamespace, cronWorkflowResyncPeriod, func(options *v1.ListOptions) {
wfInformer := util.NewWorkflowInformer(cc.dynamicInterface, cc.managedNamespace, cronWorkflowResyncPeriod, func(options *v1.ListOptions) {
wfInformerListOptionsFunc(options, cc.instanceId)
}, cache.Indexers{})
cc.addWorkflowInformerHandler()
go wfInformer.Run(ctx.Done())

cc.wfLister = util.NewWorkflowLister(cc.wfInformer)
cc.wfLister = util.NewWorkflowLister(wfInformer)

cc.cron.Start()
defer cc.cron.Stop()

go cc.cronWfInformer.Informer().Run(ctx.Done())
go cc.wfInformer.Run(ctx.Done())
go wait.Until(cc.syncAll, 10*time.Second, ctx.Done())

for i := 0; i < cronWorkflowWorkers; i++ {
go wait.Until(cc.runCronWorker, time.Second, ctx.Done())
}

for i := 0; i < cronWorkflowWorkflowWorkers; i++ {
go wait.Until(cc.runWorkflowWorker, time.Second, ctx.Done())
}

<-ctx.Done()
}

Expand All @@ -119,6 +117,10 @@ func (cc *Controller) processNextCronItem() bool {
return false
}
defer cc.cronWfQueue.Done(key)

cc.keyLock.Lock(key.(string))
defer cc.keyLock.Unlock(key.(string))

logCtx := log.WithField("cronWorkflow", key)
logCtx.Infof("Processing %s", key)

Expand Down Expand Up @@ -146,18 +148,21 @@ func (cc *Controller) processNextCronItem() bool {
return true
}

cronWorkflowOperationCtx := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.wfLister, cc.metrics)
cronWorkflowOperationCtx := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.metrics)

err = cronWorkflowOperationCtx.validateCronWorkflow()
if err != nil {
logCtx.WithError(err).Error("invalid cron workflow")
return true
}

err = cronWorkflowOperationCtx.runOutstandingWorkflows()
wfWasRun, err := cronWorkflowOperationCtx.runOutstandingWorkflows()
if err != nil {
logCtx.WithError(err).Error("could not run outstanding Workflow")
return true
} else if wfWasRun {
// A workflow was run, so the cron workflow will be requeued. Return here to avoid duplicating work
return true
}

// The job is currently scheduled, remove it and re add it.
Expand All @@ -179,70 +184,6 @@ func (cc *Controller) processNextCronItem() bool {
return true
}

func (cc *Controller) runWorkflowWorker() {
for cc.processNextWorkflowItem() {
}
}

func (cc *Controller) processNextWorkflowItem() bool {
key, quit := cc.wfQueue.Get()
if quit {
return false
}
defer cc.wfQueue.Done(key)

obj, wfExists, err := cc.wfInformer.GetIndexer().GetByKey(key.(string))
if err != nil {
log.WithError(err).Error(fmt.Sprintf("Failed to get Workflow '%s' from informer index", key))
return true
}

// Check if the workflow no longer exists. If the workflow was deleted while it was an active workflow of a cron
// workflow, the cron workflow will reconcile this fact on its own next time it is processed.
if !wfExists {
log.Warnf("Workflow '%s' no longer exists", key)
return true
}

// The workflow informer receives unstructured objects to deal with the possibility of invalid
// workflow manifests that are unable to unmarshal to workflow objects
un, ok := obj.(*unstructured.Unstructured)
if !ok {
log.Warnf("Key '%s' in index is not an unstructured", key)
return true
}

wf, err := util.FromUnstructured(un)
if err != nil {
log.Warnf("Failed to unmarshal key '%s' to workflow object: %v", key, err)
return true
}

if wf.OwnerReferences == nil || len(wf.OwnerReferences) != 1 {
log.Warnf("Workflow '%s' stemming from CronWorkflow is malformed", wf.Name)
return true
}

// Workflows are run in the same namespace as CronWorkflow
nameEntryIdMapKey := wf.Namespace + "/" + wf.OwnerReferences[0].Name
woc, err := cc.cron.Load(nameEntryIdMapKey)
if err != nil {
log.Warnf("Parent CronWorkflow '%s' is bad: %v", nameEntryIdMapKey, err)
return true
}

defer woc.persistUpdate()

// If the workflow is completed or was deleted, remove it from Active Workflows
if wf.Status.Fulfilled() || !wfExists {
log.Warnf("Workflow '%s' from CronWorkflow '%s' completed", wf.Name, woc.cronWf.Name)
woc.removeActiveWf(wf)
}

woc.enforceHistoryLimit()
return true
}

func (cc *Controller) addCronWorkflowInformerHandler() {
cc.cronWfInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand All @@ -266,29 +207,66 @@ func (cc *Controller) addCronWorkflowInformerHandler() {
})
}

func (cc *Controller) addWorkflowInformerHandler() {
cc.wfInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
cc.wfQueue.Add(key)
}
},
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
cc.wfQueue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
cc.wfQueue.Add(key)
}
},
},
)
func (cc *Controller) syncAll() {
log.Info("Syncing all CronWorkflows")

workflows, err := cc.wfLister.List()
if err != nil {
return
}
groupedWorkflows := groupWorkflows(workflows)

cronWorkflows := cc.cronWfInformer.Informer().GetStore().List()
for _, obj := range cronWorkflows {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
log.Error("Unable to convert object to unstructured when syncing CronWorkflows")
continue
}
cronWf := &v1alpha1.CronWorkflow{}
err := util.FromUnstructuredObj(un, cronWf)
if err != nil {
log.WithError(err).Error("Unable to convert unstructured to CronWorkflow when syncing CronWorkflows")
continue
}

err = cc.syncCronWorkflow(cronWf, groupedWorkflows[cronWf.UID])
if err != nil {
log.WithError(err).Error("Unable to sync CronWorkflow")
continue
}
}
}

func (cc *Controller) syncCronWorkflow(cronWf *v1alpha1.CronWorkflow, workflows []v1alpha1.Workflow) error {
key := cronWf.Namespace + "/" + cronWf.Name
cc.keyLock.Lock(key)
defer cc.keyLock.Unlock(key)

cwoc := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.metrics)
err := cwoc.enforceHistoryLimit(workflows)
if err != nil {
return err
}
err = cwoc.reconcileActiveWfs(workflows)
if err != nil {
return err
}

cwoc.persistUpdate()
return nil
}

func groupWorkflows(wfs []*v1alpha1.Workflow) map[types.UID][]v1alpha1.Workflow {
cwfChildren := make(map[types.UID][]v1alpha1.Workflow)
for _, wf := range wfs {
owner := v1.GetControllerOf(wf)
if owner == nil || owner.Kind != workflow.CronWorkflowKind {
continue
}
cwfChildren[owner.UID] = append(cwfChildren[owner.UID], *wf)
}
return cwfChildren
}

func cronWfInformerListOptionsFunc(options *v1.ListOptions, instanceId string) {
Expand Down
Loading

0 comments on commit f4f68a7

Please sign in to comment.