Skip to content

Commit

Permalink
feat(controller) Emissary executor. (argoproj#4925)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec committed Feb 24, 2021
1 parent 8af3595 commit ab36166
Show file tree
Hide file tree
Showing 26 changed files with 873 additions and 50 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ endif

# for local we have a faster target that prints to stdout, does not use json, and can cache because it has no coverage
.PHONY: test
test: server/static/files.go
test: server/static/files.go dist/argosay
env KUBECONFIG=/dev/null $(GOTEST) ./...

.PHONY: install
Expand Down Expand Up @@ -411,6 +411,9 @@ endif
test/e2e/images/argosay/v2/argosay: test/e2e/images/argosay/v2/main/argosay.go
cd test/e2e/images/argosay/v2 && GOOS=linux CGO_ENABLED=0 go build -ldflags '-w -s' -o argosay ./main

dist/argosay: test/e2e/images/argosay/v2/main/argosay.go
go build -ldflags '-w -s' -o dist/argosay ./test/e2e/images/argosay/v2/main

.PHONY: test-images
test-images:
$(call docker_pull,argoproj/argosay:v1)
Expand Down
2 changes: 1 addition & 1 deletion cmd/argo/commands/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func waitWatchOrLog(ctx context.Context, serviceClient workflowpkg.WorkflowServi
if cliSubmitOpts.log {
for _, workflow := range workflowNames {
logWorkflow(ctx, serviceClient, namespace, workflow, "", &corev1.PodLogOptions{
Container: "main",
Container: common.MainContainerName,
Follow: true,
Previous: false,
})
Expand Down
218 changes: 218 additions & 0 deletions cmd/argoexec/commands/emissary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package commands

import (
"compress/gzip"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"os/signal"
"path/filepath"
"strconv"
"syscall"
"time"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util/archive"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/util/path"
)

var (
varRunArgo = "/var/run/argo"
containerName = os.Getenv(common.EnvVarContainerName)
includeScriptOutput = os.Getenv(common.EnvVarIncludeScriptOutput) == "true" // capture stdout/stderr
template = &wfv1.Template{}
logger = log.WithField("argo", true)
)

func NewEmissaryCommand() *cobra.Command {
return &cobra.Command{
Use: "emissary",
SilenceUsage: true, // this prevents confusing usage message being printed when we SIGTERM
RunE: func(cmd *cobra.Command, args []string) error {
exitCode := 64

defer func() {
err := ioutil.WriteFile(varRunArgo+"/ctr/"+containerName+"/exitcode", []byte(strconv.Itoa(exitCode)), 0600)
if err != nil {
logger.Error(fmt.Errorf("failed to write exit code: %w", err))
}
}()

// this also indicates we've started
if err := os.MkdirAll(varRunArgo+"/ctr/"+containerName, 0700); err != nil {
return fmt.Errorf("failed to create ctr directory: %w", err)
}

name, args := args[0], args[1:]

signals := make(chan os.Signal, 1)
defer close(signals)
signal.Notify(signals)
defer signal.Reset()
go func() {
for s := range signals {
if s != syscall.SIGCHLD {
_ = syscall.Kill(-os.Getpid(), s.(syscall.Signal))
}
}
}()

data, err := ioutil.ReadFile(varRunArgo + "/template")
if err != nil {
return fmt.Errorf("failed to read template: %w", err)
}

if err := json.Unmarshal(data, template); err != nil {
return fmt.Errorf("failed to unmarshal template: %w", err)
}

name, err = path.Search(name)
if err != nil {
return fmt.Errorf("failed to find name in PATH: %w", err)
}

command := exec.Command(name, args...)
command.Env = os.Environ()
command.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
command.Stdout = os.Stdout
command.Stderr = os.Stderr

// this may not be that important an optimisation, except for very long logs we don't want to capture
if includeScriptOutput {
logger.Info("capturing script output")
stdout, err := os.Create(varRunArgo + "/ctr/" + containerName + "/stdout")
if err != nil {
return fmt.Errorf("failed to open stdout: %w", err)
}
defer func() { _ = stdout.Close() }()
command.Stdout = io.MultiWriter(os.Stdout, stdout)

stderr, err := os.Create(varRunArgo + "/ctr/" + containerName + "/stderr")
if err != nil {
return fmt.Errorf("failed to open stderr: %w", err)
}
defer func() { _ = stderr.Close() }()
command.Stderr = io.MultiWriter(os.Stderr, stderr)
}

if err := command.Start(); err != nil {
return err
}

go func() {
for {
data, _ := ioutil.ReadFile(varRunArgo + "/ctr/" + containerName + "/signal")
_ = os.Remove(varRunArgo + "/ctr/" + containerName + "/signal")
s, _ := strconv.Atoi(string(data))
if s > 0 {
_ = syscall.Kill(command.Process.Pid, syscall.Signal(s))
}
time.Sleep(2 * time.Second)
}
}()

cmdErr := command.Wait()

if cmdErr == nil {
exitCode = 0
} else if exitError, ok := cmdErr.(*exec.ExitError); ok {
if exitError.ExitCode() >= 0 {
exitCode = exitError.ExitCode()
} else {
exitCode = 137 // SIGTERM
}
}

if containerName == common.MainContainerName {
for _, x := range template.Outputs.Parameters {
if x.ValueFrom != nil && x.ValueFrom.Path != "" {
if err := saveParameter(x.ValueFrom.Path); err != nil {
return err
}
}
}
for _, x := range template.Outputs.Artifacts {
if x.Path != "" {
if err := saveArtifact(x.Path); err != nil {
return err
}
}
}
} else {
logger.Info("not saving outputs - not main container")
}

return cmdErr // this is the error returned from cmd.Wait(), which maybe an exitError
},
}
}

func saveArtifact(srcPath string) error {
if common.FindOverlappingVolume(template, srcPath) != nil {
logger.Infof("no need to save artifact - on overlapping volume: %s", srcPath)
return nil
}
if _, err := os.Stat(srcPath); os.IsNotExist(err) { // might be optional, so we ignore
logger.WithError(err).Errorf("cannot save artifact %s", srcPath)
return nil
}
dstPath := varRunArgo + "/outputs/artifacts/" + srcPath + ".tgz"
logger.Infof("%s -> %s", srcPath, dstPath)
z := filepath.Dir(dstPath)
if err := os.MkdirAll(z, 0700); err != nil { // chmod rwx------
return fmt.Errorf("failed to create directory %s: %w", z, err)
}
dst, err := os.Create(dstPath)
if err != nil {
return fmt.Errorf("failed to create destination %s: %w", dstPath, err)
}
defer func() { _ = dst.Close() }()
if err = archive.TarGzToWriter(srcPath, gzip.DefaultCompression, dst); err != nil {
return fmt.Errorf("failed to tarball the output %s to %s: %w", srcPath, dstPath, err)
}
if err = dst.Close(); err != nil {
return fmt.Errorf("failed to close %s: %w", dstPath, err)
}
return nil
}

func saveParameter(srcPath string) error {
if common.FindOverlappingVolume(template, srcPath) != nil {
logger.Infof("no need to save parameter - on overlapping volume: %s", srcPath)
return nil
}
src, err := os.Open(srcPath)
if os.IsNotExist(err) { // might be optional, so we ignore
logger.WithError(err).Errorf("cannot save parameter %s", srcPath)
return nil
}
if err != nil {
return fmt.Errorf("failed to open %s: %w", srcPath, err)
}
defer func() { _ = src.Close() }()
dstPath := varRunArgo + "/outputs/parameters/" + srcPath
logger.Infof("%s -> %s", srcPath, dstPath)
z := filepath.Dir(dstPath)
if err := os.MkdirAll(z, 0700); err != nil { // chmod rwx------
return fmt.Errorf("failed to create directory %s: %w", z, err)
}
dst, err := os.Create(dstPath)
if err != nil {
return fmt.Errorf("failed to create %s: %w", srcPath, err)
}
defer func() { _ = dst.Close() }()
if _, err = io.Copy(dst, src); err != nil {
return fmt.Errorf("failed to copy %s to %s: %w", srcPath, dstPath, err)
}
if err = dst.Close(); err != nil {
return fmt.Errorf("failed to close %s: %w", dstPath, err)
}
return nil
}
119 changes: 119 additions & 0 deletions cmd/argoexec/commands/emissary_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package commands

import (
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strconv"
"sync"
"syscall"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestEmissary(t *testing.T) {
tmp, err := ioutil.TempDir("", "")
assert.NoError(t, err)

varRunArgo = tmp
includeScriptOutput = true

wd, err := os.Getwd()
assert.NoError(t, err)

x := filepath.Join(wd, "../../../dist/argosay")

err = ioutil.WriteFile(varRunArgo+"/template", []byte(`{}`), 0600)
assert.NoError(t, err)

t.Run("Exit0", func(t *testing.T) {
err := run(x, []string{"exit"})
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/exitcode")
assert.NoError(t, err)
assert.Equal(t, "0", string(data))
})
t.Run("Exit1", func(t *testing.T) {
err := run(x, []string{"exit", "1"})
assert.Equal(t, 1, err.(*exec.ExitError).ExitCode())
data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/exitcode")
assert.NoError(t, err)
assert.Equal(t, "1", string(data))
})
t.Run("Stdout", func(t *testing.T) {
err := run(x, []string{"echo", "hello", "/dev/stdout"})
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/stdout")
assert.NoError(t, err)
assert.Equal(t, "hello", string(data))
})
t.Run("Stderr", func(t *testing.T) {
err := run(x, []string{"echo", "hello", "/dev/stderr"})
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/stderr")
assert.NoError(t, err)
assert.Equal(t, "hello", string(data))
})
t.Run("Signal", func(t *testing.T) {
for signal, message := range map[syscall.Signal]string{
syscall.SIGTERM: "terminated",
syscall.SIGKILL: "killed",
} {
err := ioutil.WriteFile(varRunArgo+"/ctr/main/signal", []byte(strconv.Itoa(int(signal))), 0600)
assert.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := run(x, []string{"sleep", "5s"})
assert.EqualError(t, err, "signal: "+message)
}()
time.Sleep(time.Second)
}
})
t.Run("Artifact", func(t *testing.T) {
err = ioutil.WriteFile(varRunArgo+"/template", []byte(`
{
"outputs": {
"artifacts": [
{"path": "/tmp/artifact"}
]
}
}
`), 0600)
assert.NoError(t, err)
err := run(x, []string{"echo", "hello", "/tmp/artifact"})
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/outputs/artifacts/tmp/artifact.tgz")
assert.NoError(t, err)
assert.NotEmpty(t, string(data)) // data is tgz format
})
t.Run("Parameter", func(t *testing.T) {
err = ioutil.WriteFile(varRunArgo+"/template", []byte(`
{
"outputs": {
"parameters": [
{
"valueFrom": {"path": "/tmp/parameter"}
}
]
}
}
`), 0600)
assert.NoError(t, err)
err := run(x, []string{"echo", "hello", "/tmp/parameter"})
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/outputs/parameters/tmp/parameter")
assert.NoError(t, err)
assert.Equal(t, "hello", string(data))
})
}

func run(name string, args []string) error {
cmd := NewEmissaryCommand()
containerName = "main"
return cmd.RunE(cmd, append([]string{name}, args...))
}
4 changes: 4 additions & 0 deletions cmd/argoexec/commands/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func loadArtifacts(ctx context.Context) error {
defer wfExecutor.HandleError(ctx)
defer stats.LogStats()

if err := wfExecutor.Init(); err != nil {
wfExecutor.AddError(err)
return err
}
// Download input artifacts
err := wfExecutor.StageFiles()
if err != nil {
Expand Down
Loading

0 comments on commit ab36166

Please sign in to comment.