Skip to content

Commit

Permalink
Propogate executor errors back to controller. Add error column in `ar…
Browse files Browse the repository at this point in the history
…go get` (argoproj#522)
  • Loading branch information
jessesuen committed Dec 2, 2017
1 parent 32b5e99 commit 5286728
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 79 deletions.
2 changes: 0 additions & 2 deletions Dockerfile-argoexec
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,3 @@ RUN set -ex; \
docker -v

COPY dist/argoexec /bin/

ENTRYPOINT [ "/bin/argoexec" ]
6 changes: 3 additions & 3 deletions cmd/argo/commands/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func printWorkflow(wf *wfv1.Workflow) {
if ok {
fmt.Println()
w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0)
fmt.Fprintf(w, "STEP\tPODNAME\n")
fmt.Fprintf(w, "STEP\tPODNAME\tMESSAGE\n")
printNodeTree(w, wf, node, 0, " ", " ")
w.Flush()
}
Expand All @@ -86,9 +86,9 @@ func printWorkflow(wf *wfv1.Workflow) {
func printNodeTree(w *tabwriter.Writer, wf *wfv1.Workflow, node wfv1.NodeStatus, depth int, nodePrefix string, childPrefix string) {
nodeName := fmt.Sprintf("%s %s", jobStatusIconMap[node.Phase], node.Name)
if len(node.Children) == 0 && node.Phase != wfv1.NodeSkipped {
fmt.Fprintf(w, "%s%s\t%s\n", nodePrefix, nodeName, node.ID)
fmt.Fprintf(w, "%s%s\t%s\t%s\n", nodePrefix, nodeName, node.ID, node.Message)
} else {
fmt.Fprintf(w, "%s%s\t\n", nodePrefix, nodeName)
fmt.Fprintf(w, "%s%s\t%s\t\n", nodePrefix, nodeName, " ")
}

// If the node has children, the node is a workflow template and
Expand Down
3 changes: 3 additions & 0 deletions cmd/argoexec/commands/init.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package commands

import (
"github.com/argoproj/argo/workflow/common"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
Expand All @@ -20,10 +21,12 @@ func loadArtifacts(cmd *cobra.Command, args []string) {
// Download input artifacts
err := wfExecutor.LoadScriptSource()
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
log.Fatalf("Error loading script: %+v", err)
}
err = wfExecutor.LoadArtifacts()
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
log.Fatalf("Error downloading input artifacts: %+v", err)
}
}
6 changes: 6 additions & 0 deletions cmd/argoexec/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package commands
import (
"os"

"github.com/argoproj/argo/workflow/common"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
Expand All @@ -22,24 +23,29 @@ func waitContainer(cmd *cobra.Command, args []string) {
// Wait for main container to complete and kill sidecars
err := wfExecutor.Wait()
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
log.Errorf("Error waiting on main container to be ready, %+v", err)
}
err = wfExecutor.SaveArtifacts()
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
log.Fatalf("Error saving output artifacts, %+v", err)
}
// Saving output parameters
err = wfExecutor.SaveParameters()
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
log.Fatalf("Error saving output parameters, %+v", err)
}
// Capture output script result
err = wfExecutor.CaptureScriptResult()
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
log.Fatalf("Error capturing script output, %+v", err)
}
err = wfExecutor.AnnotateOutputs()
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
log.Fatalf("Error annotating outputs, %+v", err)
}
os.Exit(0)
Expand Down
19 changes: 5 additions & 14 deletions workflow/artifacts/git/git.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package git

import (
"os/exec"
"strings"

wfv1 "github.com/argoproj/argo/api/workflow/v1"
"github.com/argoproj/argo/errors"
log "github.com/sirupsen/logrus"
"github.com/argoproj/argo/workflow/common"
)

// GitArtifactDriver is the artifact driver for a git repo
Expand All @@ -15,20 +12,14 @@ type GitArtifactDriver struct{}
// Load download artifacts from an git URL
func (g *GitArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) error {
// Download the file to a local file path
cmd := exec.Command("git", "clone", inputArtifact.Git.Repo, path)
err := cmd.Run()
err := common.RunCommand("git", "clone", inputArtifact.Git.Repo, path)
if err != nil {
exErr := err.(*exec.ExitError)
log.Errorf("`%s %s` failed: %s", cmd.Path, strings.Join(cmd.Args, " "), exErr.Stderr)
return errors.InternalWrapError(err)
return err
}
if inputArtifact.Git.Revision != "" {
cmd = exec.Command("git", "-C", path, "checkout", inputArtifact.Git.Revision)
err := cmd.Run()
err := common.RunCommand("git", "-C", path, "checkout", inputArtifact.Git.Revision)
if err != nil {
exErr := err.(*exec.ExitError)
log.Errorf("`%s %s` failed: %s", cmd.Path, strings.Join(cmd.Args, " "), exErr.Stderr)
return errors.InternalWrapError(err)
return err
}
}
return nil
Expand Down
14 changes: 2 additions & 12 deletions workflow/artifacts/http/http.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package http

import (
"os/exec"
"strings"

wfv1 "github.com/argoproj/argo/api/workflow/v1"
"github.com/argoproj/argo/errors"
log "github.com/sirupsen/logrus"
"github.com/argoproj/argo/workflow/common"
)

// HTTPArtifactDriver is the artifact driver for a HTTP URL
Expand All @@ -15,14 +12,7 @@ type HTTPArtifactDriver struct{}
// Load download artifacts from an HTTP URL
func (h *HTTPArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) error {
// Download the file to a local file path
cmd := exec.Command("curl", "-L", "-o", path, inputArtifact.HTTP.URL)
err := cmd.Run()
if err != nil {
exErr := err.(*exec.ExitError)
log.Errorf("`%s %s` failed: %s", cmd.Path, strings.Join(cmd.Args, " "), exErr.Stderr)
return errors.InternalWrapError(err)
}
return nil
return common.RunCommand("curl", "-sS", "-L", "-o", path, inputArtifact.HTTP.URL)
}

func (h *HTTPArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact) error {
Expand Down
3 changes: 3 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ const (

// AnnotationKeyNodeName is the pod metadata annotation key containing the workflow node name
AnnotationKeyNodeName = wfv1.CRDFullName + "/node-name"
// AnnotationKeyNodeMessage is the pod metadata annotation key the executor will use to
// communicate errors encountered by the executor during artifact load/save, etc...
AnnotationKeyNodeMessage = wfv1.CRDFullName + "/node-message"
// AnnotationKeyTemplate is the pod metadata annotation key containing the container template as JSON
AnnotationKeyTemplate = wfv1.CRDFullName + "/template"
// AnnotationKeyOutputs is the pod metadata annotation key containing the container outputs
Expand Down
13 changes: 13 additions & 0 deletions workflow/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"os/exec"
"strconv"
"strings"

Expand Down Expand Up @@ -209,3 +210,15 @@ func Replace(fstTmpl *fasttemplate.Template, replaceMap map[string]string, allow
}
return replacedTmpl, nil
}

func RunCommand(name string, arg ...string) error {
cmd := exec.Command(name, arg...)
log.Info(cmd.Args)
_, err := cmd.Output()
if err != nil {
exErr := err.(*exec.ExitError)
log.Errorf("`%s` failed: %s", strings.Join(cmd.Args, " "), string(exErr.Stderr))
return errors.InternalError(string(exErr.Stderr))
}
return nil
}
23 changes: 14 additions & 9 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ func (wfc *WorkflowController) watchWorkflows(ctx context.Context) (cache.Contro
return controller, nil
}


func (wfc *WorkflowController) watchControllerConfigMap(ctx context.Context) (cache.Controller, error) {
source := wfc.newControllerConfigMapWatch()

Expand Down Expand Up @@ -272,7 +271,6 @@ func (wfc *WorkflowController) watchControllerConfigMap(ctx context.Context) (ca
return controller, nil
}


func (wfc *WorkflowController) newControllerConfigMapWatch() *cache.ListWatch {
c := wfc.clientset.Core().RESTClient()
resource := "configmaps"
Expand All @@ -283,7 +281,7 @@ func (wfc *WorkflowController) newControllerConfigMapWatch() *cache.ListWatch {
req := c.Get().
Namespace(namespace).
Resource(resource).
Param("fieldSelector", fmt.Sprintf("metadata.name=%s", name)).
Param("fieldSelector", fmt.Sprintf("metadata.name=%s", name)).
VersionedParams(&options, metav1.ParameterCodec)
return req.Do().Get()
}
Expand Down Expand Up @@ -478,6 +476,7 @@ func inferFailedReason(pod *apiv1.Pod) (wfv1.NodePhase, *bool, string) {
// Pod has a nice error message. Use that.
return wfv1.NodeFailed, &f, pod.Status.Message
}
annotatedMsg := pod.Annotations[common.AnnotationKeyNodeMessage]
// We only get one message to set for the overall node status.
// If mutiple containers failed, in order of preference: init, main, wait, sidecars
for _, ctr := range pod.Status.InitContainerStatuses {
Expand All @@ -490,8 +489,11 @@ func inferFailedReason(pod *apiv1.Pod) (wfv1.NodePhase, *bool, string) {
continue
}
errMsg := fmt.Sprintf("failed to load artifacts")
if ctr.State.Terminated.Message != "" {
errMsg += ": " + ctr.State.Terminated.Message
for _, msg := range []string{annotatedMsg, ctr.State.Terminated.Message} {
if msg != "" {
errMsg += ": " + msg
break
}
}
// NOTE: we consider artifact load issues as Error instead of Failed
return wfv1.NodeError, &f, errMsg
Expand All @@ -507,11 +509,14 @@ func inferFailedReason(pod *apiv1.Pod) (wfv1.NodePhase, *bool, string) {
continue
}
if ctr.Name == common.WaitContainerName {
if ctr.State.Terminated.Message != "" {
failMessages[ctr.Name] = fmt.Sprintf("failed to save artifacts: %s", ctr.State.Terminated.Message)
} else {
failMessages[ctr.Name] = fmt.Sprintf("failed to save artifacts")
errMsg := fmt.Sprintf("failed to save artifacts")
for _, msg := range []string{annotatedMsg, ctr.State.Terminated.Message} {
if msg != "" {
errMsg += ": " + msg
break
}
}
failMessages[ctr.Name] = errMsg
} else {
if ctr.State.Terminated.Message != "" {
failMessages[ctr.Name] = ctr.State.Terminated.Message
Expand Down
63 changes: 24 additions & 39 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,9 @@ func (we *WorkflowExecutor) GetMainContainerID() (string, error) {
func archivePath(containerID string, sourcePath string, destPath string) error {
log.Infof("Archiving %s:%s to %s", containerID, sourcePath, destPath)
dockerCpCmd := fmt.Sprintf("docker cp -a %s:%s - | gzip > %s", containerID, sourcePath, destPath)
cmd := exec.Command("sh", "-c", dockerCpCmd)
log.Info(cmd.Args)
err := cmd.Run()
err := common.RunCommand("sh", "-c", dockerCpCmd)
if err != nil {
if exErr, ok := err.(*exec.ExitError); ok {
log.Errorf("`%s` stderr:\n%s", cmd.Args, string(exErr.Stderr))
}
return errors.InternalWrapError(err)
return err
}
log.Infof("Archiving completed")
return nil
Expand Down Expand Up @@ -326,25 +321,21 @@ func (we *WorkflowExecutor) AnnotateOutputs() error {
return nil
}
log.Infof("Annotating pod with output")
// NOTE: this should eventually be improved to do what kubectl/cmd/annotate.go
// is doing during a `kubectl annotate pod`, which uses Patch instead of Update.
// For now, the easiest thing to do is to Get + Update and hope the pod
// resource version does not change from underneath us.
podIf := we.ClientSet.CoreV1().Pods(we.Namespace)
pod, err := podIf.Get(we.PodName, metav1.GetOptions{})
if err != nil {
return errors.InternalWrapError(err)
}
outputBytes, err := json.Marshal(we.Template.Outputs)
if err != nil {
return errors.InternalWrapError(err)
}
pod.Annotations[common.AnnotationKeyOutputs] = string(outputBytes)
_, err = podIf.Update(pod)
if err != nil {
return errors.InternalWrapError(err)
}
return nil
return we.AddAnnotation(common.AnnotationKeyOutputs, string(outputBytes))
}

// AddAnnotation adds an annotation to the workflow pod
func (we *WorkflowExecutor) AddAnnotation(key, value string) error {
// TODO: switch to k8s go-sdk to perform this logic.
// See kubectl/cmd/annotate.go for reference implementation.
// For now we just kubectl because it uses our desired
// overwite Patch strategy.
return common.RunCommand("kubectl", "annotate", "--overwrite", "pods",
we.PodName, fmt.Sprintf("%s=%s", key, value))
}

// isTarball returns whether or not the file is a tarball
Expand All @@ -364,11 +355,9 @@ func untar(tarPath string, destPath string) error {
if err != nil {
return errors.InternalWrapError(err)
}
cmd := exec.Command("tar", "-xf", tarPath, "-C", tmpDir)
log.Info(cmd.Args)
err = cmd.Run()
err = common.RunCommand("tar", "-xf", tarPath, "-C", tmpDir)
if err != nil {
return errors.InternalWrapError(err)
return err
}
// next, decide how we wish to rename the file/dir
// to the destination path.
Expand Down Expand Up @@ -429,14 +418,9 @@ func (we *WorkflowExecutor) Wait() error {
}
}

cmd := exec.Command("docker", "wait", mainContainerID)
log.Info(cmd.Args)
err := cmd.Run()
err := common.RunCommand("docker", "wait", mainContainerID)
if err != nil {
if exErr, ok := err.(*exec.ExitError); ok {
log.Errorf("`%s` stderr:\n%s", cmd.Args, string(exErr.Stderr))
}
return errors.InternalWrapError(err)
return err
}
log.Infof("Waiting completed")
err = we.killSidecars()
Expand All @@ -446,7 +430,9 @@ func (we *WorkflowExecutor) Wait() error {
return nil
}

const killGracePeriod = 20
// killGracePeriod is the time in seconds after sending SIGTERM before
// forcefully killing the sidcar with SIGKILL (value matches k8s)
const killGracePeriod = 30

func (we *WorkflowExecutor) killSidecars() error {
log.Infof("Killing sidecars")
Expand Down Expand Up @@ -475,14 +461,13 @@ func (we *WorkflowExecutor) killSidecars() error {
return nil
}
killArgs = append(killArgs, "--signal", "TERM")
cmd := exec.Command("docker", killArgs...)
log.Info(cmd.Args)
if err = cmd.Run(); err != nil {
return errors.InternalWrapError(err)
err = common.RunCommand("docker", killArgs...)
if err != nil {
return err
}

log.Infof("Waiting (%ds) for sidecars to terminate", killGracePeriod)
cmd = exec.Command("docker", waitArgs...)
cmd := exec.Command("docker", waitArgs...)
log.Info(cmd.Args)
if err := cmd.Start(); err != nil {
return errors.InternalWrapError(err)
Expand Down

0 comments on commit 5286728

Please sign in to comment.