Skip to content

Commit

Permalink
Fix issue where a failed step in a template with parallelism would no…
Browse files Browse the repository at this point in the history
…t complete (resolves argoproj#868)
  • Loading branch information
jessesuen committed May 26, 2018
1 parent 289000c commit 543e939
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 7 deletions.
9 changes: 9 additions & 0 deletions cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package main

import (
"context"
"flag"
"fmt"
"os"
"strconv"
"time"

wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -38,6 +40,7 @@ type rootFlags struct {
kubeConfig string // --kubeconfig
configMap string // --configmap
logLevel string // --loglevel
glogLevel int // --gloglevel
}

var (
Expand All @@ -50,6 +53,7 @@ func init() {
RootCmd.Flags().StringVar(&rootArgs.kubeConfig, "kubeconfig", "", "Kubernetes config (used when running outside of cluster)")
RootCmd.Flags().StringVar(&rootArgs.configMap, "configmap", common.DefaultConfigMapName(common.DefaultControllerDeploymentName), "Name of K8s configmap to retrieve workflow controller configuration")
RootCmd.Flags().StringVar(&rootArgs.logLevel, "loglevel", "info", "Set the logging level. One of: debug|info|warn|error")
RootCmd.Flags().IntVar(&rootArgs.glogLevel, "gloglevel", 0, "Set the glog logging level")
}

// GetClientConfig return rest config, if path not specified, assume in cluster config
Expand All @@ -72,6 +76,11 @@ func Run(cmd *cobra.Command, args []string) {
stats.RegisterStackDumper()
stats.StartStatsTicker(5 * time.Minute)

// Set the glog level for the k8s go-client
_ = flag.CommandLine.Parse([]string{})
_ = flag.Lookup("logtostderr").Value.Set("true")
_ = flag.Lookup("v").Value.Set(strconv.Itoa(rootArgs.glogLevel))

config, err := GetClientConfig(rootArgs.kubeConfig)
if err != nil {
log.Fatalf("%+v", err)
Expand Down
38 changes: 38 additions & 0 deletions test/e2e/expectedfailures/parallelism-dag-failure.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: parallelism-failed-dag-
spec:
entrypoint: parallelism-failed-dag
templates:
- name: parallelism-failed-dag
parallelism: 2
dag:
tasks:
- name: A
template: pass
- name: B
dependencies: [A]
template: pass
- name: C
dependencies: [A]
template: pass
- name: D
dependencies: [A]
template: fail
- name: E
dependencies: [A]
template: pass
- name: F
dependencies: [B, C, D, E]
template: pass

- name: pass
container:
image: alpine:3.7
command: [sh, -c, exit 0]

- name: fail
container:
image: alpine:3.7
command: [sh, -c, exit 1]
28 changes: 28 additions & 0 deletions test/e2e/expectedfailures/parallelism-step-failure.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: parallelism-failed-step-
spec:
entrypoint: parallelism-failed-step
templates:
- name: parallelism-failed-step
parallelism: 2
steps:
- - name: sleep
template: sleep
arguments:
parameters:
- name: exit-code
value: "{{item}}"
withItems:
- 0
- 1
- 0

- name: sleep
inputs:
parameters:
- name: exit-code
container:
image: alpine:latest
command: [sh, -c, "exit {{inputs.parameters.exit-code}}"]
8 changes: 4 additions & 4 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ type wfOperationCtx struct {
// updated indicates whether or not the workflow object itself was updated
// and needs to be persisted back to kubernetes
updated bool
// log is an logrus logging context to corrolate logs with a workflow
// log is an logrus logging context to corralate logs with a workflow
log *log.Entry
// controller reference to workflow controller
controller *WorkflowController
// globalParms holds any parameters that are available to be referenced
// globalParams holds any parameters that are available to be referenced
// in the global scope (e.g. workflow.parameters.XXX).
globalParams map[string]string
// map of pods which need to be labeled with completed=true
Expand Down Expand Up @@ -94,7 +94,7 @@ func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOper

// operate is the main operator logic of a workflow. It evaluates the current state of the workflow,
// and its pods and decides how to proceed down the execution path.
// TODO: an error returned by this method should result in requeing the workflow to be retried at a
// TODO: an error returned by this method should result in requeuing the workflow to be retried at a
// later time
func (woc *wfOperationCtx) operate() {
defer woc.persistUpdates()
Expand Down Expand Up @@ -633,7 +633,7 @@ func inferFailedReason(pod *apiv1.Pod) (wfv1.NodePhase, string) {
}
annotatedMsg := pod.Annotations[common.AnnotationKeyNodeMessage]
// We only get one message to set for the overall node status.
// If mutiple containers failed, in order of preference:
// If multiple containers failed, in order of preference:
// init, main (annotated), main (exit code), wait, sidecars
for _, ctr := range pod.Status.InitContainerStatuses {
if ctr.State.Terminated == nil {
Expand Down
11 changes: 8 additions & 3 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod
// Kick off all parallel steps in the group
for _, step := range stepGroup {
childNodeName := fmt.Sprintf("%s.%s", sgNodeName, step.Name)
woc.addChildNode(sgNodeName, childNodeName)

// Check the step's when clause to decide if it should execute
proceed, err := shouldExecute(step.When)
if err != nil {
woc.addChildNode(sgNodeName, childNodeName)
woc.markNodeError(childNodeName, err)
return woc.markNodeError(sgNodeName, err)
}
Expand All @@ -169,6 +169,7 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod
skipReason := fmt.Sprintf("when '%s' evaluated false", step.When)
woc.log.Infof("Skipping %s: %s", childNodeName, skipReason)
woc.initializeNode(childNodeName, wfv1.NodeTypeSkipped, "", stepsCtx.boundaryID, wfv1.NodeSkipped, skipReason)
woc.addChildNode(sgNodeName, childNodeName)
}
continue
}
Expand All @@ -181,11 +182,15 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod
default:
errMsg := fmt.Sprintf("child '%s' errored", childNode)
woc.log.Infof("Step group node %s deemed errored due to child %s error: %s", node, childNodeName, err.Error())
woc.addChildNode(sgNodeName, childNodeName)
return woc.markNodePhase(node.Name, wfv1.NodeError, errMsg)
}
}
if childNode != nil && childNode.Completed() && !childNode.Successful() {
break
if childNode != nil {
woc.addChildNode(sgNodeName, childNodeName)
if childNode.Completed() && !childNode.Successful() {
break
}
}
}

Expand Down

0 comments on commit 543e939

Please sign in to comment.