Skip to content

Commit

Permalink
fix(controller): End DAG execution on deadline exceeded error. Fixes a…
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Sep 10, 2020
1 parent 74a68d4 commit 20c518c
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 2 deletions.
16 changes: 15 additions & 1 deletion workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,21 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
}

// Finally execute the template
_, _ = woc.executeTemplate(taskNodeName, &t, dagCtx.tmplCtx, t.Arguments, &executeTemplateOpts{boundaryID: dagCtx.boundaryID, onExitTemplate: dagCtx.onExitTemplate})
node, err = woc.executeTemplate(taskNodeName, &t, dagCtx.tmplCtx, t.Arguments, &executeTemplateOpts{boundaryID: dagCtx.boundaryID, onExitTemplate: dagCtx.onExitTemplate})
if err != nil {
switch err {
case ErrDeadlineExceeded:
return
case ErrParallelismReached:
case ErrTimeout:
_ = woc.markNodePhase(taskNodeName, wfv1.NodeFailed, err.Error())
return
default:
woc.log.Infof("DAG %s deemed errored due to task %s error: %s", node.ID, taskNodeName, err.Error())
_ = woc.markNodePhase(taskNodeName, wfv1.NodeError, fmt.Sprintf("task '%s' errored", taskNodeName))
return
}
}
}

if taskGroupNode != nil {
Expand Down
4 changes: 3 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ var (
ErrDeadlineExceeded = errors.New(errors.CodeTimeout, "Deadline exceeded")
// ErrParallelismReached indicates this workflow reached its parallelism limit
ErrParallelismReached = errors.New(errors.CodeForbidden, "Max parallelism reached")
// ErrTimeout indicates a specific template timed out
ErrTimeout = errors.New(errors.CodeTimeout, "timeout")
)

// maxOperationTime is the maximum time a workflow operation is allowed to run
Expand Down Expand Up @@ -1715,7 +1717,7 @@ func (woc *wfOperationCtx) checkTemplateTimeout(tmpl *wfv1.Template, node *wfv1.
deadline := node.StartedAt.Add(tmplTimeout)

if node.Phase == wfv1.NodePending && time.Now().After(deadline) {
return nil, fmt.Errorf("%s %s exceeded its deadline", node.Name, node.Type)
return nil, ErrTimeout
}
return &deadline, nil
}
Expand Down
2 changes: 2 additions & 0 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod
case ErrDeadlineExceeded:
return node
case ErrParallelismReached:
case ErrTimeout:
return woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("child '%s' timedout", childNodeName))
default:
errMsg := fmt.Sprintf("child '%s' errored", childNodeName)
woc.log.Infof("Step group node %s deemed errored due to child %s error: %s", node.ID, childNodeName, err.Error())
Expand Down

0 comments on commit 20c518c

Please sign in to comment.