Skip to content

Commit

Permalink
fix(executor): Kill injected sidecars. Fixes #5337 (#5345)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Mar 12, 2021
1 parent 1f7cf1e commit fcb0989
Show file tree
Hide file tree
Showing 20 changed files with 232 additions and 95 deletions.
44 changes: 44 additions & 0 deletions docs/sidecar-injection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Sidecar Injection

Automatic (i.e. mutating webhook based) sidecar injection systems, including service meshes such as Anthos and Istio
Proxy, create a unique problem for Kubernetes workloads that run to completion.

Because sidecars are injected outside of the view of the workflow controller, the controller has no awareness of them.
It has no opportunity to rewrite the containers command (when using the Emissary Executor) and as the sidecar's process
will run as PID 1, which is protected. It can be impossible for the wait container to terminate the sidecar.

You will minimize problems by not using Istio with Argo Workflows.

See [#1282](https://github.com/argoproj/argo-workflows/issues/1282).

## How We Kill Sidecars

Kubernetes does not provide a way to kill a single container. You can delete a pod, but this kills all containers, and loses all information
and logs of that pod.

Instead, try to mimic the Kubernetes termination behaviour, which is:

1. SIGTERM PID 1
1. Wait for the pod's `terminateGracePeriodSeconds` (30s by default).
1. SIGKILL PID 1

The following are not supported:

* `preStop`
* `STOPSIGNAL`

### Support Matrix

Key:

* Any - we can kill any image
* Shell - we can only kill images with `/bin/sh` installed on them (e.g. Debian)
* None - we cannot kill these images

| Executor | Sidecar | Injected Sidecar |
|---|---|---|
| `docker` | Any | Any |
| `emissary` | Any | None |
| `k8sapi` | Shell | Shell |
| `kubelet` | Shell | Shell |
| `pns` | Any | Any |
6 changes: 6 additions & 0 deletions manifests/quick-start-minimal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,12 @@ rules:
verbs:
- get
- watch
- apiGroups:
- ""
resources:
- pods/exec
verbs:
- create
- apiGroups:
- argoproj.io
resources:
Expand Down
6 changes: 6 additions & 0 deletions manifests/quick-start-mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,12 @@ rules:
verbs:
- get
- watch
- apiGroups:
- ""
resources:
- pods/exec
verbs:
- create
- apiGroups:
- argoproj.io
resources:
Expand Down
6 changes: 6 additions & 0 deletions manifests/quick-start-postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,12 @@ rules:
verbs:
- get
- watch
- apiGroups:
- ""
resources:
- pods/exec
verbs:
- create
- apiGroups:
- argoproj.io
resources:
Expand Down
7 changes: 7 additions & 0 deletions manifests/quick-start/base/workflow-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ rules:
verbs:
- get
- watch
# Only needed if you are running the `k8sapi` executor.
- apiGroups:
- ""
resources:
- pods/exec
verbs:
- create
# This allows one workflow to create another.
# This is only needed for resource templates.
- apiGroups:
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ nav:
- configure-artifact-repository.md
- workflow-controller-configmap.md
- workflow-executors.md
- sidecar-injection.md
- default-workflow-specs.md
- offloading-large-workflows.md
- workflow-archive.md
Expand Down
8 changes: 0 additions & 8 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,14 +636,6 @@ func (tmpl *Template) HasPodSpecPatch() bool {
return tmpl.PodSpecPatch != ""
}

func (tmpl *Template) GetSidecarNames() []string {
var containerNames []string
for _, s := range tmpl.Sidecars {
containerNames = append(containerNames, s.Name)
}
return containerNames
}

type Artifacts []Artifact

func (a Artifacts) GetArtifactByName(name string) *Artifact {
Expand Down
9 changes: 0 additions & 9 deletions pkg/apis/workflow/v1alpha1/workflow_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,15 +617,6 @@ func TestWorkflow_GetSemaphoreKeys(t *testing.T) {
assert.Contains(keys, "test/template1")
}

func TestTemplate_GetSidecarNames(t *testing.T) {
m := &Template{
Sidecars: []UserContainer{
{Container: corev1.Container{Name: "sidecar-0"}},
},
}
assert.ElementsMatch(t, []string{"sidecar-0"}, m.GetSidecarNames())
}

func TestTemplate_IsMainContainerNamed(t *testing.T) {
t.Run("Default", func(t *testing.T) {
x := &Template{}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ func (s *CLISuite) TestWorkflowRetry() {
WaitForWorkflow(fixtures.ToStart).
WaitForWorkflow(fixtures.Condition(func(wf *wfv1.Workflow) (bool, string) {
return wf.Status.AnyActiveSuspendNode(), "suspended node"
})).
}), time.Minute).
RunCli([]string{"terminate", "retry-test"}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err) {
assert.Contains(t, output, "workflow retry-test terminated")
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s *FunctionalSuite) TestArchiveStrategies() {
Workflow(`@testdata/archive-strategies.yaml`).
When().
SubmitWorkflow().
WaitForWorkflow().
WaitForWorkflow(time.Minute).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
Expand Down
46 changes: 9 additions & 37 deletions test/e2e/signals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
)

const kill2xDuration = 70 * time.Second
const kill2xDuration = 1 * time.Minute

// Tests the use of signals to kill containers.
// argoproj/argosay:v2 does not contain sh, so you must use argoproj/argosay:v1.
Expand Down Expand Up @@ -94,7 +94,7 @@ func (s *SignalsSuite) TestDoNotCreatePodsUnderStopBehavior() {
assert.NoError(t, err)
assert.Regexp(t, "workflow stop-terminate-.* stopped", output)
}).
WaitForWorkflow(1 * time.Minute).
WaitForWorkflow(kill2xDuration).
Then().
ExpectWorkflow(func(t *testing.T, m *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
Expand All @@ -107,47 +107,19 @@ func (s *SignalsSuite) TestDoNotCreatePodsUnderStopBehavior() {
})
}

func (s *SignalsSuite) TestPropagateMaxDuration() {
s.T().Skip("too hard to get working")
func (s *SignalsSuite) TestSidecars() {
s.Given().
Workflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: retry-backoff-2
spec:
entrypoint: retry-backoff
templates:
- name: retry-backoff
retryStrategy:
limit: 10
backoff:
duration: "1"
factor: 1
maxDuration: "10"
container:
image: argoproj/argosay:v1
command: [sh, -c]
args: ["sleep $(( {{retries}} * 40 )); exit 1"]
`).
Workflow("@testdata/sidecar-workflow.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(kill2xDuration).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Contains(t, []wfv1.WorkflowPhase{wfv1.WorkflowFailed, wfv1.WorkflowError}, status.Phase)
assert.Len(t, status.Nodes, 3)
node := status.Nodes.FindByDisplayName("retry-backoff-2(1)")
if assert.NotNil(t, node) {
assert.Equal(t, wfv1.NodeFailed, node.Phase)
}
})
WaitForWorkflow(fixtures.ToBeSucceeded, kill2xDuration)
}

func (s *SignalsSuite) TestSidecars() {
// make sure Istio/Anthos and other sidecar injectors will work
func (s *SignalsSuite) TestInjectedSidecar() {
s.Need(fixtures.None(fixtures.Emissary))
s.Given().
Workflow("@testdata/sidecar-workflow.yaml").
Workflow("@testdata/sidecar-injected-workflow.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded, kill2xDuration)
Expand Down
22 changes: 22 additions & 0 deletions test/e2e/testdata/sidecar-injected-workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: sidecar-injected-
spec:
entrypoint: main
podSpecPatch: |
terminationGracePeriodSeconds: 3
containers:
- name: wait
- name: main
- name: sidecar
image: argoproj/argosay:v1
command:
- sh
- -c
args:
- "sleep 999"
templates:
- name: main
container:
image: argoproj/argosay:v1
12 changes: 3 additions & 9 deletions test/e2e/testdata/sidecar-workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,8 @@ spec:
- name: main
container:
image: argoproj/argosay:v1
command: [ sleep ]
args: [ "5" ]
sidecars:
- name: sidecar-0
- name: sidecar
image: argoproj/argosay:v1
command: [ sleep ]
args: [ "999" ]
- name: sidecar-1
image: argoproj/argosay:v1
command: [ sleep ]
args: [ "999" ]
command: [ sh, -c ]
args: [ "sleep 999" ]
24 changes: 20 additions & 4 deletions workflow/executor/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ func (d *DockerExecutor) GetExitCode(ctx context.Context, containerName string)
return exitCode, nil
}

func (d *DockerExecutor) Wait(ctx context.Context, containerNames, sidecarNames []string) error {
err := d.syncContainerIDs(ctx, append(containerNames, sidecarNames...))
func (d *DockerExecutor) Wait(ctx context.Context, containerNames []string) error {
err := d.syncContainerIDs(ctx, containerNames)
if err != nil {
return err
}
Expand Down Expand Up @@ -224,6 +224,9 @@ func (d *DockerExecutor) syncContainerIDs(ctx context.Context, containerNames []
}
status := strings.SplitN(parts[0], " ", 2)[0] // Created,Exited,Up,
containerName := parts[1]
if containerName == "POD" {
continue
}
containerID := parts[2]
createdAt, err := time.Parse("2006-01-02 15:04:05 -0700 MST", parts[3])
if err != nil {
Expand All @@ -244,7 +247,9 @@ func (d *DockerExecutor) syncContainerIDs(ctx context.Context, containerNames []
containerStatus[containerName] = status
log.Infof("mapped container name %q to container ID %q (created at %v, status %s)", containerName, containerID, createdAt, status)
}
if d.haveContainers(containerNames) {
// sidecars start after the main containers, so we can't just exit once we know about all the main containers,
// we need a bit more time
if d.haveContainers(containerNames) && time.Since(started) > 3*time.Second {
return nil
}
}
Expand Down Expand Up @@ -285,6 +290,9 @@ func (d *DockerExecutor) Kill(ctx context.Context, containerNames []string, term
// We therefore ignore any error. docker wait that follows will re-raise any other error with the container.
_, err = common.RunCommand("docker", killArgs...)
if err != nil {
if strings.Contains(err.Error(), "is not running") {
return nil
}
log.Warningf("Ignored error from 'docker kill --signal TERM': %s", err)
}
waitArgs := append([]string{"wait"}, containerIDs...)
Expand All @@ -305,7 +313,7 @@ func (d *DockerExecutor) Kill(ctx context.Context, containerNames []string, term
case err = <-waitCh:
// waitCmd completed
case <-time.After(terminationGracePeriodDuration):
log.Infof("Timed out (%ds) for containers to terminate gracefully. Killing forcefully", terminationGracePeriodDuration)
log.Infof("Timed out (%v) for containers to terminate gracefully. Killing forcefully", terminationGracePeriodDuration)
forceKillArgs := append([]string{"kill", "--signal", "KILL"}, containerIDs...)
forceKillCmd := exec.Command("docker", forceKillArgs...)
log.Info(forceKillCmd.Args)
Expand All @@ -321,6 +329,14 @@ func (d *DockerExecutor) Kill(ctx context.Context, containerNames []string, term
return nil
}

func (d *DockerExecutor) ListContainerNames(ctx context.Context) ([]string, error) {
var containerNames []string
for n := range d.containers {
containerNames = append(containerNames, n)
}
return containerNames, nil
}

func (d *DockerExecutor) getContainerIDs(containerNames []string) ([]string, error) {
var containerIDs []string
for _, n := range containerNames {
Expand Down
18 changes: 15 additions & 3 deletions workflow/executor/emissary/emissary.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (e emissary) GetExitCode(_ context.Context, containerName string) (string,
return strconv.Itoa(exitCode), err
}

func (e emissary) Wait(ctx context.Context, containerNames, sidecarNames []string) error {
func (e emissary) Wait(ctx context.Context, containerNames []string) error {
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -152,7 +152,7 @@ func (e emissary) Kill(ctx context.Context, containerNames []string, termination
}
ctx, cancel := context.WithTimeout(ctx, terminationGracePeriodDuration)
defer cancel()
err := e.Wait(ctx, containerNames, nil)
err := e.Wait(ctx, containerNames)
if err != context.Canceled {
return err
}
Expand All @@ -161,5 +161,17 @@ func (e emissary) Kill(ctx context.Context, containerNames []string, termination
return err
}
}
return e.Wait(ctx, containerNames, nil)
return e.Wait(ctx, containerNames)
}

func (e emissary) ListContainerNames(ctx context.Context) ([]string, error) {
var containerNames []string
dir, err := ioutil.ReadDir("/var/run/argo/ctr")
if err != nil {
return nil, err
}
for _, n := range dir {
containerNames = append(containerNames, n.Name())
}
return containerNames, nil
}
Loading

0 comments on commit fcb0989

Please sign in to comment.