Skip to content

Commit

Permalink
fix: Ensure CronWorkflows are persisted once per operation (argoproj#…
Browse files Browse the repository at this point in the history
  • Loading branch information
simster7 committed Oct 1, 2020
1 parent 2a992ae commit 6265c70
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
2 changes: 2 additions & 0 deletions workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ func (cc *Controller) processNextWorkflowItem() bool {
return true
}

defer woc.persistUpdate()

// If the workflow is completed or was deleted, remove it from Active Workflows
if wf.Status.Fulfilled() || !wfExists {
log.Warnf("Workflow '%s' from CronWorkflow '%s' completed", wf.Name, woc.cronWf.Name)
Expand Down
28 changes: 18 additions & 10 deletions workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type cronWfOperationCtx struct {
cronWfIf typed.CronWorkflowInterface
log *log.Entry
metrics *metrics.Metrics
updated bool
}

func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset versioned.Interface, wfLister util.WorkflowLister, metrics *metrics.Metrics) *cronWfOperationCtx {
Expand All @@ -51,10 +52,13 @@ func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset vers
"namespace": cronWorkflow.ObjectMeta.Namespace,
}),
metrics: metrics,
updated: false,
}
}

func (woc *cronWfOperationCtx) Run() {
defer woc.persistUpdate()

woc.log.Infof("Running %s", woc.name)

err := woc.validateCronWorkflow()
Expand Down Expand Up @@ -87,7 +91,7 @@ func (woc *cronWfOperationCtx) Run() {
woc.cronWf.Status.Active = append(woc.cronWf.Status.Active, getWorkflowObjectReference(wf, runWf))
woc.cronWf.Status.LastScheduledTime = &v1.Time{Time: time.Now()}
woc.cronWf.Status.Conditions.RemoveCondition(v1alpha1.ConditionTypeSubmissionError)
woc.persistUpdate()
woc.updated = true
}

func (woc *cronWfOperationCtx) validateCronWorkflow() error {
Expand All @@ -98,6 +102,7 @@ func (woc *cronWfOperationCtx) validateCronWorkflow() error {
woc.reportCronWorkflowError(v1alpha1.ConditionTypeSpecError, fmt.Sprint(err))
} else {
woc.cronWf.Status.Conditions.RemoveCondition(v1alpha1.ConditionTypeSpecError)
woc.updated = true
}
return err
}
Expand All @@ -116,24 +121,26 @@ func getWorkflowObjectReference(wf *v1alpha1.Workflow, runWf *v1alpha1.Workflow)
}

func (woc *cronWfOperationCtx) persistUpdate() {
if woc.origCronWf.ResourceVersion != woc.cronWf.ResourceVersion {
if !woc.updated {
return
} else if woc.origCronWf.ResourceVersion != woc.cronWf.ResourceVersion {
woc.log.Error("cannot update cron workflow with mismatched resource versions")
return
}
cronWf, err := woc.cronWfIf.Update(woc.cronWf)

_, err := woc.cronWfIf.Update(woc.cronWf)
if err != nil {
if !errors.IsConflict(err) {
woc.log.WithError(err).Error("failed to update CronWorkflow")
return
}
var reapplyErr error
cronWf, reapplyErr = woc.reapplyUpdate()
_, reapplyErr = woc.reapplyUpdate()
if err != nil {
woc.log.WithError(reapplyErr).WithField("original error", err).Error("failed to update CronWorkflow after reapply attempt")
return
}
}
woc.cronWf = cronWf
}

func (woc *cronWfOperationCtx) reapplyUpdate() (*v1alpha1.CronWorkflow, error) {
Expand Down Expand Up @@ -286,13 +293,13 @@ func (woc *cronWfOperationCtx) reconcileDeletedWfs() error {
return fmt.Errorf("unable to list workflows: %s", err)
}

currentWfs := make(map[types.UID]bool)
currentWfs := make(map[types.UID]*v1alpha1.Workflow)
for _, wf := range wfList {
currentWfs[wf.UID] = true
currentWfs[wf.UID] = wf
}

for _, objectRef := range woc.cronWf.Status.Active {
if found := currentWfs[objectRef.UID]; !found {
if wf, found := currentWfs[objectRef.UID]; !found || wf.Status.Fulfilled() {
woc.removeFromActiveList(objectRef.UID)
}
}
Expand All @@ -305,7 +312,7 @@ func (woc *cronWfOperationCtx) removeActiveWf(wf *v1alpha1.Workflow) {
return
}
woc.removeFromActiveList(wf.ObjectMeta.UID)
woc.persistUpdate()
woc.updated = true
}

func (woc *cronWfOperationCtx) removeFromActiveList(uid types.UID) {
Expand All @@ -316,6 +323,7 @@ func (woc *cronWfOperationCtx) removeFromActiveList(uid types.UID) {
}
}
woc.cronWf.Status.Active = newActive
woc.updated = true
}

func (woc *cronWfOperationCtx) enforceHistoryLimit() {
Expand Down Expand Up @@ -397,5 +405,5 @@ func (woc *cronWfOperationCtx) reportCronWorkflowError(conditionType v1alpha1.Co
Status: v1.ConditionTrue,
})
woc.metrics.CronWorkflowSubmissionError()
woc.persistUpdate()
woc.updated = true
}

0 comments on commit 6265c70

Please sign in to comment.