Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fork sub-process. Fixes #8680 #8906

Merged
merged 2 commits into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,11 @@ lint: server/static/files.go $(GOPATH)/bin/golangci-lint

# for local we have a faster target that prints to stdout, does not use json, and can cache because it has no coverage
.PHONY: test
test: server/static/files.go dist/argosay
test: server/static/files.go
go build ./...
env KUBECONFIG=/dev/null $(GOTEST) ./...
# marker file, based on it's modification time, we know how long ago this target was run
@mkdir -p dist
touch dist/test

.PHONY: install
Expand Down
51 changes: 22 additions & 29 deletions cmd/argoexec/commands/emissary.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"syscall"
"time"

"github.com/argoproj/argo-workflows/v3/util/errors"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/client-go/util/retry"
Expand Down Expand Up @@ -60,18 +62,6 @@ func NewEmissaryCommand() *cobra.Command {

name, args := args[0], args[1:]

signals := make(chan os.Signal, 1)
defer close(signals)
signal.Notify(signals)
defer signal.Reset()
go func() {
for s := range signals {
if !osspecific.IsSIGCHLD(s) {
_ = osspecific.Kill(-os.Getpid(), s.(syscall.Signal))
}
}
}()

data, err := ioutil.ReadFile(varRunArgo + "/template")
if err != nil {
return fmt.Errorf("failed to read template: %w", err)
Expand Down Expand Up @@ -127,25 +117,28 @@ func NewEmissaryCommand() *cobra.Command {
return fmt.Errorf("failed to get retry strategy: %w", err)
}

var command *exec.Cmd
var stdout *os.File
var combined *os.File
cmdErr := retry.OnError(backoff, func(error) bool { return true }, func() error {
if stdout != nil {
stdout.Close()
}
if combined != nil {
combined.Close()
}
command, stdout, combined, err = createCommand(name, args, template)
command, stdout, combined, err := createCommand(name, args, template)
if err != nil {
return fmt.Errorf("failed to create command: %w", err)
}

defer stdout.Close()
defer combined.Close()
signals := make(chan os.Signal, 1)
defer close(signals)
signal.Notify(signals)
defer signal.Reset()
if err := command.Start(); err != nil {
return err
}

go func() {
for s := range signals {
if !osspecific.IsSIGCHLD(s) {
_ = osspecific.Kill(command.Process.Pid, s.(syscall.Signal))
}
}
}()
pid := command.Process.Pid
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
Expand All @@ -158,16 +151,16 @@ func NewEmissaryCommand() *cobra.Command {
_ = os.Remove(varRunArgo + "/ctr/" + containerName + "/signal")
s, _ := strconv.Atoi(string(data))
if s > 0 {
_ = osspecific.Kill(command.Process.Pid, syscall.Signal(s))
_ = osspecific.Kill(pid, syscall.Signal(s))
}
time.Sleep(2 * time.Second)
}
}
}()
return command.Wait()
return osspecific.Wait(command.Process)

})
defer stdout.Close()
defer combined.Close()
logger.WithError(err).Info("sub-process exited")

if _, ok := os.LookupEnv("ARGO_DEBUG_PAUSE_AFTER"); ok {
for {
Expand All @@ -184,7 +177,7 @@ func NewEmissaryCommand() *cobra.Command {

if cmdErr == nil {
exitCode = 0
} else if exitError, ok := cmdErr.(*exec.ExitError); ok {
} else if exitError, ok := cmdErr.(errors.Exited); ok {
if exitError.ExitCode() >= 0 {
exitCode = exitError.ExitCode()
} else {
Expand Down
44 changes: 19 additions & 25 deletions cmd/argoexec/commands/emissary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package commands
import (
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strconv"
"sync"
"syscall"
"testing"
"time"

"github.com/argoproj/argo-workflows/v3/util/errors"

"github.com/stretchr/testify/assert"
)
Expand All @@ -20,45 +19,40 @@ func TestEmissary(t *testing.T) {
varRunArgo = tmp
includeScriptOutput = true

wd, err := os.Getwd()
assert.NoError(t, err)

x := filepath.Join(wd, "../../../dist/argosay")

err = ioutil.WriteFile(varRunArgo+"/template", []byte(`{}`), 0o600)
err := ioutil.WriteFile(varRunArgo+"/template", []byte(`{}`), 0o600)
assert.NoError(t, err)

t.Run("Exit0", func(t *testing.T) {
err := run(x, []string{"exit"})
err := run("exit")
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/exitcode")
assert.NoError(t, err)
assert.Equal(t, "0", string(data))
})

t.Run("Exit1", func(t *testing.T) {
err := run(x, []string{"exit", "1"})
assert.Equal(t, 1, err.(*exec.ExitError).ExitCode())
err := run("exit 1")
assert.Equal(t, 1, err.(errors.Exited).ExitCode())
data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/exitcode")
assert.NoError(t, err)
assert.Equal(t, "1", string(data))
})
t.Run("Stdout", func(t *testing.T) {
err := run(x, []string{"echo", "hello", "/dev/stdout"})
err := run("echo hello")
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/stdout")
assert.NoError(t, err)
assert.Contains(t, string(data), "hello")
})
t.Run("Comined", func(t *testing.T) {
err := run(x, []string{"echo", "hello", "/dev/stderr"})
err := run("echo hello > /dev/stderr")
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/combined")
assert.NoError(t, err)
assert.Contains(t, string(data), "hello")
})
t.Run("Signal", func(t *testing.T) {
for signal, message := range map[syscall.Signal]string{
for signal := range map[syscall.Signal]string{
syscall.SIGTERM: "terminated",
syscall.SIGKILL: "killed",
} {
Expand All @@ -68,10 +62,10 @@ func TestEmissary(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
err := run(x, []string{"sleep", "5s"})
assert.EqualError(t, err, "signal: "+message)
err := run("sleep 3")
assert.NoError(t, err)
}()
time.Sleep(time.Second)
wg.Wait()
}
})
t.Run("Artifact", func(t *testing.T) {
Expand All @@ -85,7 +79,7 @@ func TestEmissary(t *testing.T) {
}
`), 0o600)
assert.NoError(t, err)
err := run(x, []string{"echo", "hello", "/tmp/artifact"})
err := run("echo hello > /tmp/artifact")
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/outputs/artifacts/tmp/artifact.tgz")
assert.NoError(t, err)
Expand All @@ -102,7 +96,7 @@ func TestEmissary(t *testing.T) {
}
`), 0o600)
assert.NoError(t, err)
err := run(x, []string{"echo", "hello", "/tmp/artifact"})
err := run("echo hello > /tmp/artifact")
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/outputs/artifacts/tmp/artifact.tgz")
assert.NoError(t, err)
Expand All @@ -121,7 +115,7 @@ func TestEmissary(t *testing.T) {
}
`), 0o600)
assert.NoError(t, err)
err := run(x, []string{"echo", "hello", "/tmp/parameter"})
err := run("echo hello > /tmp/parameter")
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/outputs/parameters/tmp/parameter")
assert.NoError(t, err)
Expand Down Expand Up @@ -151,7 +145,7 @@ func TestEmissary(t *testing.T) {
`), 0o600)
assert.NoError(t, err)
_ = os.Remove("test.txt")
err = run(x, []string{"sh", "./test/containerSetRetryTest.sh", "/tmp/artifact"})
err = run("sh ./test/containerSetRetryTest.sh /tmp/artifact")
assert.Error(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/outputs/artifacts/tmp/artifact.tgz")
assert.NoError(t, err)
Expand Down Expand Up @@ -181,16 +175,16 @@ func TestEmissary(t *testing.T) {
`), 0o600)
assert.NoError(t, err)
_ = os.Remove("test.txt")
err = run(x, []string{"sh", "./test/containerSetRetryTest.sh", "/tmp/artifact"})
err = run("sh ./test/containerSetRetryTest.sh /tmp/artifact")
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/outputs/artifacts/tmp/artifact.tgz")
assert.NoError(t, err)
assert.NotEmpty(t, string(data)) // data is tgz format
})
}

func run(name string, args []string) error {
func run(script string) error {
cmd := NewEmissaryCommand()
containerName = "main"
return cmd.RunE(cmd, append([]string{name}, args...))
return cmd.RunE(cmd, append([]string{"sh", "-c"}, script))
}
5 changes: 3 additions & 2 deletions cmd/argoexec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package main

import (
"os"
"os/exec"

"github.com/argoproj/argo-workflows/v3/util/errors"

// load authentication plugin for obtaining credentials from cloud providers.
_ "k8s.io/client-go/plugin/pkg/client/auth"
Expand All @@ -14,7 +15,7 @@ import (
func main() {
err := commands.NewRootCommand().Execute()
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
if exitError, ok := err.(errors.Exited); ok {
if exitError.ExitCode() >= 0 {
os.Exit(exitError.ExitCode())
} else {
Expand Down
8 changes: 8 additions & 0 deletions test/e2e/signals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ func (s *SignalsSuite) TestInjectedSidecarKillAnnotation() {
WaitForWorkflow(fixtures.ToBeSucceeded, kill2xDuration)
}

func (s *SignalsSuite) TestSubProcess() {
s.Given().
Workflow("@testdata/subprocess-workflow.yaml").
When().
SubmitWorkflow().
WaitForWorkflow()
}

func TestSignalsSuite(t *testing.T) {
suite.Run(t, new(SignalsSuite))
}
15 changes: 15 additions & 0 deletions test/e2e/testdata/subprocess-workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: subprocess-
spec:
entrypoint: main
templates:
- name: main
container:
image: argoproj/argosay:v1
command: [ sh, -c ]
args:
- |
sleep 60 &
ps -aef
24 changes: 24 additions & 0 deletions util/errors/exec_err.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package errors

import "fmt"

type Exited interface {
ExitCode() int
}

func NewExitErr(exitCode int) error {
if exitCode > 0 {
return execErr(exitCode)
}
return nil
}

type execErr int

func (e execErr) ExitCode() int {
return int(e)
}

func (e execErr) Error() string {
return fmt.Sprintf("exit status %d", e)
}
31 changes: 31 additions & 0 deletions workflow/executor/os-specific/signal_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package os_specific
import (
"os"
"syscall"
"time"

"github.com/argoproj/argo-workflows/v3/util/errors"
)

func IsSIGCHLD(s os.Signal) bool { return s == syscall.SIGCHLD }
Expand All @@ -18,3 +21,31 @@ func Kill(pid int, s syscall.Signal) error {
func Setpgid(a *syscall.SysProcAttr) {
a.Setpgid = true
}

func Wait(process *os.Process) error {
// We must copy the behaviour of Kubernetes in how we handle sub-processes.
// Kubernetes only waits on PID 1, not on any sub-process that process might fork.
// The only way for those forked processes to run in the background is to background the
// sub-process by calling Process.Release.
// Background processes always become zombies when they exit.
// Because the sub-process is now running in the background it will become a zombie,
// so we must wait for it.
// Because we run the process in the background, we cannot Process.Wait for it to get the exit code.
// Instead, we can reap it to get the exit code
pid := process.Pid
if err := process.Release(); err != nil {
return err
}

for {
var s syscall.WaitStatus
wpid, err := syscall.Wait4(-1, &s, syscall.WNOHANG, nil)
if err != nil {
return err
}
if wpid == pid {
return errors.NewExitErr(s.ExitStatus())
}
time.Sleep(time.Second)
}
}
20 changes: 0 additions & 20 deletions workflow/executor/os-specific/signal_linux.go

This file was deleted.

1 change: 1 addition & 0 deletions workflow/executor/os-specific/signal_linux.go
Loading