diff --git a/cmd/workflow-controller/main.go b/cmd/workflow-controller/main.go index 70d11518606c..8733453489bd 100644 --- a/cmd/workflow-controller/main.go +++ b/cmd/workflow-controller/main.go @@ -2,8 +2,10 @@ package main import ( "context" + "flag" "fmt" "os" + "strconv" "time" wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned" @@ -38,6 +40,7 @@ type rootFlags struct { kubeConfig string // --kubeconfig configMap string // --configmap logLevel string // --loglevel + glogLevel int // --gloglevel } var ( @@ -50,6 +53,7 @@ func init() { RootCmd.Flags().StringVar(&rootArgs.kubeConfig, "kubeconfig", "", "Kubernetes config (used when running outside of cluster)") RootCmd.Flags().StringVar(&rootArgs.configMap, "configmap", common.DefaultConfigMapName(common.DefaultControllerDeploymentName), "Name of K8s configmap to retrieve workflow controller configuration") RootCmd.Flags().StringVar(&rootArgs.logLevel, "loglevel", "info", "Set the logging level. One of: debug|info|warn|error") + RootCmd.Flags().IntVar(&rootArgs.glogLevel, "gloglevel", 0, "Set the glog logging level") } // GetClientConfig return rest config, if path not specified, assume in cluster config @@ -72,6 +76,11 @@ func Run(cmd *cobra.Command, args []string) { stats.RegisterStackDumper() stats.StartStatsTicker(5 * time.Minute) + // Set the glog level for the k8s go-client + _ = flag.CommandLine.Parse([]string{}) + _ = flag.Lookup("logtostderr").Value.Set("true") + _ = flag.Lookup("v").Value.Set(strconv.Itoa(rootArgs.glogLevel)) + config, err := GetClientConfig(rootArgs.kubeConfig) if err != nil { log.Fatalf("%+v", err) diff --git a/test/e2e/expectedfailures/parallelism-dag-failure.yaml b/test/e2e/expectedfailures/parallelism-dag-failure.yaml new file mode 100644 index 000000000000..478d3f1724c0 --- /dev/null +++ b/test/e2e/expectedfailures/parallelism-dag-failure.yaml @@ -0,0 +1,38 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: parallelism-failed-dag- +spec: + entrypoint: parallelism-failed-dag + templates: + - name: parallelism-failed-dag + parallelism: 2 + dag: + tasks: + - name: A + template: pass + - name: B + dependencies: [A] + template: pass + - name: C + dependencies: [A] + template: pass + - name: D + dependencies: [A] + template: fail + - name: E + dependencies: [A] + template: pass + - name: F + dependencies: [B, C, D, E] + template: pass + + - name: pass + container: + image: alpine:3.7 + command: [sh, -c, exit 0] + + - name: fail + container: + image: alpine:3.7 + command: [sh, -c, exit 1] diff --git a/test/e2e/expectedfailures/parallelism-step-failure.yaml b/test/e2e/expectedfailures/parallelism-step-failure.yaml new file mode 100644 index 000000000000..37682259d852 --- /dev/null +++ b/test/e2e/expectedfailures/parallelism-step-failure.yaml @@ -0,0 +1,28 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: parallelism-failed-step- +spec: + entrypoint: parallelism-failed-step + templates: + - name: parallelism-failed-step + parallelism: 2 + steps: + - - name: sleep + template: sleep + arguments: + parameters: + - name: exit-code + value: "{{item}}" + withItems: + - 0 + - 1 + - 0 + + - name: sleep + inputs: + parameters: + - name: exit-code + container: + image: alpine:latest + command: [sh, -c, "exit {{inputs.parameters.exit-code}}"] diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index d0e064c2b6b1..f1282edbbf08 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -30,11 +30,11 @@ type wfOperationCtx struct { // updated indicates whether or not the workflow object itself was updated // and needs to be persisted back to kubernetes updated bool - // log is an logrus logging context to corrolate logs with a workflow + // log is an logrus logging context to corralate logs with a workflow log *log.Entry // controller reference to workflow controller controller *WorkflowController - // globalParms holds any parameters that are available to be referenced + // globalParams holds any parameters that are available to be referenced // in the global scope (e.g. workflow.parameters.XXX). globalParams map[string]string // map of pods which need to be labeled with completed=true @@ -94,7 +94,7 @@ func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOper // operate is the main operator logic of a workflow. It evaluates the current state of the workflow, // and its pods and decides how to proceed down the execution path. -// TODO: an error returned by this method should result in requeing the workflow to be retried at a +// TODO: an error returned by this method should result in requeuing the workflow to be retried at a // later time func (woc *wfOperationCtx) operate() { defer woc.persistUpdates() @@ -633,7 +633,7 @@ func inferFailedReason(pod *apiv1.Pod) (wfv1.NodePhase, string) { } annotatedMsg := pod.Annotations[common.AnnotationKeyNodeMessage] // We only get one message to set for the overall node status. - // If mutiple containers failed, in order of preference: + // If multiple containers failed, in order of preference: // init, main (annotated), main (exit code), wait, sidecars for _, ctr := range pod.Status.InitContainerStatuses { if ctr.State.Terminated == nil { diff --git a/workflow/controller/steps.go b/workflow/controller/steps.go index 4076ed800919..8138e9cad8ce 100644 --- a/workflow/controller/steps.go +++ b/workflow/controller/steps.go @@ -156,11 +156,11 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod // Kick off all parallel steps in the group for _, step := range stepGroup { childNodeName := fmt.Sprintf("%s.%s", sgNodeName, step.Name) - woc.addChildNode(sgNodeName, childNodeName) // Check the step's when clause to decide if it should execute proceed, err := shouldExecute(step.When) if err != nil { + woc.addChildNode(sgNodeName, childNodeName) woc.markNodeError(childNodeName, err) return woc.markNodeError(sgNodeName, err) } @@ -169,6 +169,7 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod skipReason := fmt.Sprintf("when '%s' evaluated false", step.When) woc.log.Infof("Skipping %s: %s", childNodeName, skipReason) woc.initializeNode(childNodeName, wfv1.NodeTypeSkipped, "", stepsCtx.boundaryID, wfv1.NodeSkipped, skipReason) + woc.addChildNode(sgNodeName, childNodeName) } continue } @@ -181,11 +182,15 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod default: errMsg := fmt.Sprintf("child '%s' errored", childNode) woc.log.Infof("Step group node %s deemed errored due to child %s error: %s", node, childNodeName, err.Error()) + woc.addChildNode(sgNodeName, childNodeName) return woc.markNodePhase(node.Name, wfv1.NodeError, errMsg) } } - if childNode != nil && childNode.Completed() && !childNode.Successful() { - break + if childNode != nil { + woc.addChildNode(sgNodeName, childNodeName) + if childNode.Completed() && !childNode.Successful() { + break + } } }