Skip to content

Commit

Permalink
fix(controller): Design-out event errors. Fixes argoproj#4364 (argopr…
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Oct 31, 2020
1 parent 5a18c67 commit 45fbc95
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 41 deletions.
2 changes: 1 addition & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func (wfc *WorkflowController) processNextItem() bool {
err = wfc.hydrator.Hydrate(woc.wf)
if err != nil {
woc.log.Errorf("hydration failed: %v", err)
woc.markWorkflowError(err, true)
woc.markWorkflowError(err)
woc.persistUpdates()
wfc.throttler.Remove(key)
return true
Expand Down
73 changes: 35 additions & 38 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ func (woc *wfOperationCtx) operate() {
if r := recover(); r != nil {
woc.log.WithFields(log.Fields{"stack": string(debug.Stack()), "r": r}).Errorf("Recovered from panic")
if rerr, ok := r.(error); ok {
woc.markWorkflowError(rerr, true)
woc.markWorkflowError(rerr)
} else {
woc.markWorkflowPhase(wfv1.NodeError, true, fmt.Sprintf("%v", r))
woc.markWorkflowPhase(wfv1.NodeError, fmt.Sprintf("%v", r))
}
woc.controller.metrics.OperationPanic()
}
Expand All @@ -195,7 +195,7 @@ func (woc *wfOperationCtx) operate() {
execTmplRef, execArgs, err := woc.loadExecutionSpec()
if err != nil {
woc.log.WithError(err).Errorf("Unable to get Workflow Template Reference for workflow")
woc.markWorkflowError(err, true)
woc.markWorkflowError(err)
return
}

Expand All @@ -216,10 +216,8 @@ func (woc *wfOperationCtx) operate() {
if err != nil {
msg := fmt.Sprintf("Unable to create PDB resource for workflow, %s error: %s", woc.wf.Name, err)
woc.markWorkflowFailed(msg)
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowFailed", msg)
return
}
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeNormal, "WorkflowRunning", "Workflow Running")
validateOpts := validate.ValidateOpts{ContainerRuntimeExecutor: woc.controller.GetContainerRuntimeExecutor()}
wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTemplates(woc.wf.Namespace))
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(woc.controller.wfclientset.ArgoprojV1alpha1().ClusterWorkflowTemplates())
Expand All @@ -230,7 +228,6 @@ func (woc *wfOperationCtx) operate() {
if err != nil {
msg := fmt.Sprintf("invalid spec: %s", err.Error())
woc.markWorkflowFailed(msg)
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowFailed", msg)
return
}
// If we received conditions during validation (such as SpecWarnings), add them to the Workflow object
Expand Down Expand Up @@ -280,7 +277,7 @@ func (woc *wfOperationCtx) operate() {
tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
if err != nil {
woc.log.WithError(err).Error("Failed to create a template context")
woc.markWorkflowError(err, true)
woc.markWorkflowError(err)
return
}

Expand All @@ -289,18 +286,15 @@ func (woc *wfOperationCtx) operate() {
if err == nil {
woc.artifactRepository = repo
} else {
msg := fmt.Sprintf("Failed to load artifact repository configMap: %+v", err)
woc.log.Errorf(msg)
woc.markWorkflowError(err, true)
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowFailed", msg)
woc.markWorkflowError(err)
return
}
}

err = woc.substituteParamsInVolumes(woc.globalParams)
if err != nil {
woc.log.WithError(err).Error("volumes global param substitution error")
woc.markWorkflowError(err, true)
woc.markWorkflowError(err)
return
}

Expand All @@ -309,14 +303,13 @@ func (woc *wfOperationCtx) operate() {
if errorsutil.IsTransientErr(err) {
// Error was most likely caused by a lack of resources.
// In this case, Workflow will be in pending state and requeue.
woc.markWorkflowPhase(wfv1.NodePending, false, fmt.Sprintf("Waiting for a PVC to be created. %v", err))
woc.markWorkflowPhase(wfv1.NodePending, fmt.Sprintf("Waiting for a PVC to be created. %v", err))
woc.requeue(defaultRequeueTime)
return
}
msg := "pvc create error"
woc.log.WithError(err).Error(msg)
woc.markWorkflowError(err, true)
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowFailed", fmt.Sprintf("%s %s: %+v", woc.wf.ObjectMeta.Name, msg, err))
err = fmt.Errorf("pvc create error: %w", err)
woc.log.WithError(err).Error("pvc create error")
woc.markWorkflowError(err)
return
} else if woc.wf.Status.Phase == wfv1.NodePending {
// Workflow might be in pending state if previous PVC creation is forbidden
Expand All @@ -332,8 +325,6 @@ func (woc *wfOperationCtx) operate() {
switch err {
case ErrDeadlineExceeded:
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowTimedOut", msg)
default:
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowFailed", msg)
}
return
}
Expand Down Expand Up @@ -403,23 +394,19 @@ func (woc *wfOperationCtx) operate() {
if onExitNode != nil && onExitNode.FailedOrError() {
// if main workflow succeeded, but the exit node was unsuccessful
// the workflow is now considered unsuccessful.
woc.markWorkflowPhase(onExitNode.Phase, true, onExitNode.Message)
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowFailed", onExitNode.Message)
woc.markWorkflowPhase(onExitNode.Phase, onExitNode.Message)
} else {
woc.markWorkflowSuccess()
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeNormal, "WorkflowSucceeded", "Workflow completed")
}
case wfv1.NodeFailed:
woc.markWorkflowFailed(workflowMessage)
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowFailed", workflowMessage)
case wfv1.NodeError:
woc.markWorkflowPhase(wfv1.NodeError, true, workflowMessage)
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowFailed", workflowMessage)
woc.markWorkflowPhase(wfv1.NodeError, workflowMessage)
default:
// NOTE: we should never make it here because if the node was 'Running' we should have
// returned earlier.
err = errors.InternalErrorf("Unexpected node phase %s: %+v", woc.wf.ObjectMeta.Name, err)
woc.markWorkflowError(err, true)
woc.markWorkflowError(err)
}

if woc.execWf.Spec.Metrics != nil {
Expand Down Expand Up @@ -504,7 +491,7 @@ func (woc *wfOperationCtx) persistUpdates() {
err := woc.controller.hydrator.Dehydrate(woc.wf)
if err != nil {
woc.log.Warnf("Failed to dehydrate: %v", err)
woc.markWorkflowError(err, true)
woc.markWorkflowError(err)
}

// Release all acquired lock for completed workflow
Expand Down Expand Up @@ -544,7 +531,7 @@ func (woc *wfOperationCtx) persistUpdates() {

if os.Getenv("INFORMER_WRITE_BACK") != "false" {
if err := woc.writeBackToInformer(); err != nil {
woc.markWorkflowError(err, true)
woc.markWorkflowError(err)
return
}
} else {
Expand Down Expand Up @@ -592,7 +579,7 @@ func (woc *wfOperationCtx) writeBackToInformer() error {
// See https://github.com/argoproj/argo/issues/913
func (woc *wfOperationCtx) persistWorkflowSizeLimitErr(wfClient v1alpha1.WorkflowInterface, err error) {
woc.wf = woc.orig.DeepCopy()
woc.markWorkflowError(err, true)
woc.markWorkflowError(err)
_, err = wfClient.Update(woc.wf)
if err != nil {
woc.log.Warnf("Error updating workflow with size error: %v", err)
Expand Down Expand Up @@ -1780,7 +1767,8 @@ func (woc *wfOperationCtx) checkTemplateTimeout(tmpl *wfv1.Template, node *wfv1.

// markWorkflowPhase is a convenience method to set the phase of the workflow with optional message
// optionally marks the workflow completed, which sets the finishedAt timestamp and completed label
func (woc *wfOperationCtx) markWorkflowPhase(phase wfv1.NodePhase, markCompleted bool, message ...string) {
func (woc *wfOperationCtx) markWorkflowPhase(phase wfv1.NodePhase, message string) {
markCompleted := false
if woc.wf.Status.Phase != phase {
if woc.wf.Status.Phase.Fulfilled() {
woc.log.WithFields(log.Fields{"fromPhase": woc.wf.Status.Phase, "toPhase": phase}).
Expand All @@ -1793,16 +1781,25 @@ func (woc *wfOperationCtx) markWorkflowPhase(phase wfv1.NodePhase, markCompleted
woc.wf.ObjectMeta.Labels = make(map[string]string)
}
woc.wf.ObjectMeta.Labels[common.LabelKeyPhase] = string(phase)
switch phase {
case wfv1.NodeRunning:
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeNormal, "WorkflowRunning", "Workflow Running")
case wfv1.NodeSucceeded:
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeNormal, "WorkflowSucceeded", "Workflow completed")
case wfv1.NodeFailed, wfv1.NodeError:
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowFailed", message)
}
markCompleted = phase.Completed()
}
if woc.wf.Status.StartedAt.IsZero() {
woc.updated = true
woc.wf.Status.StartedAt = metav1.Time{Time: time.Now().UTC()}
woc.wf.Status.EstimatedDuration = woc.estimateWorkflowDuration()
}
if len(message) > 0 && woc.wf.Status.Message != message[0] {
woc.log.Infof("Updated message %s -> %s", woc.wf.Status.Message, message[0])
if woc.wf.Status.Message != message {
woc.log.Infof("Updated message %s -> %s", woc.wf.Status.Message, message)
woc.updated = true
woc.wf.Status.Message = message[0]
woc.wf.Status.Message = message
}

if phase == wfv1.NodeError {
Expand Down Expand Up @@ -1873,19 +1870,19 @@ func (woc *wfOperationCtx) hasDaemonNodes() bool {
}

func (woc *wfOperationCtx) markWorkflowRunning() {
woc.markWorkflowPhase(wfv1.NodeRunning, false, "")
woc.markWorkflowPhase(wfv1.NodeRunning, "")
}

func (woc *wfOperationCtx) markWorkflowSuccess() {
woc.markWorkflowPhase(wfv1.NodeSucceeded, true)
woc.markWorkflowPhase(wfv1.NodeSucceeded, "")
}

func (woc *wfOperationCtx) markWorkflowFailed(message string) {
woc.markWorkflowPhase(wfv1.NodeFailed, true, message)
woc.markWorkflowPhase(wfv1.NodeFailed, message)
}

func (woc *wfOperationCtx) markWorkflowError(err error, markCompleted bool) {
woc.markWorkflowPhase(wfv1.NodeError, markCompleted, err.Error())
func (woc *wfOperationCtx) markWorkflowError(err error) {
woc.markWorkflowPhase(wfv1.NodeError, err.Error())
}

// stepsOrDagSeparator identifies if a node name starts with our naming convention separator from
Expand Down
4 changes: 2 additions & 2 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3042,7 +3042,7 @@ spec:
path: /tmp/hello_world.txt
`: {
"Normal WorkflowRunning Workflow Running",
"Warning WorkflowFailed Failed to load artifact repository configMap: failed to find artifactory ref {,}/artifact-repository#config",
"Warning WorkflowFailed failed to find artifactory ref {,}/artifact-repository#config",
},
// DAG
`
Expand Down Expand Up @@ -3089,7 +3089,6 @@ spec:
} {
wf := unmarshalWF(manifest)
cancel, controller := newController(wf)
defer cancel()
t.Run(wf.Name, func(t *testing.T) {
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
Expand All @@ -3098,6 +3097,7 @@ spec:
woc.operate()
assert.Equal(t, want, getEvents(controller, len(want)))
})
cancel()
}
}

Expand Down

0 comments on commit 45fbc95

Please sign in to comment.