Skip to content

Commit

Permalink
Fix issue where daemoned steps were not terminated properly in DAG te…
Browse files Browse the repository at this point in the history
…mplates (resolves argoproj#832)
  • Loading branch information
jessesuen committed Apr 24, 2018
1 parent 2e9e113 commit e34728c
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 79 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ cli-linux: builder
LDFLAGS='-extldflags "-static"' \
ARGO_CLI_NAME=argo-linux-amd64

.PHONY: cli
.PHONY: cli-darwin
cli-darwin: builder
${BUILDER_CMD} make cli \
GOOS=darwin \
Expand Down
74 changes: 74 additions & 0 deletions examples/dag-daemon-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# This example demonstrates daemoned steps when used in in DAG templates. It is is equivalent to the
# daemon-step.yaml example, but written in DAG format. The IP address of the daemoned step can be
# referenced using the '{{tasks.taskname.ip}}' variable.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-daemon-task-
spec:
entrypoint: daemon-example
templates:
- name: daemon-example
dag:
tasks:
- name: influx
template: influxdb

- name: init-database
template: influxdb-client
dependencies: [influx]
arguments:
parameters:
- name: cmd
value: curl -XPOST 'http:https://{{tasks.influx.ip}}:8086/query' --data-urlencode "q=CREATE DATABASE mydb"

- name: producer-1
template: influxdb-client
dependencies: [init-database]
arguments:
parameters:
- name: cmd
value: for i in $(seq 1 20); do curl -XPOST 'http:https://{{tasks.influx.ip}}:8086/write?db=mydb' -d "cpu,host=server01,region=uswest load=$i" ; sleep .5 ; done
- name: producer-2
template: influxdb-client
dependencies: [init-database]
arguments:
parameters:
- name: cmd
value: for i in $(seq 1 20); do curl -XPOST 'http:https://{{tasks.influx.ip}}:8086/write?db=mydb' -d "cpu,host=server02,region=uswest load=$((RANDOM % 100))" ; sleep .5 ; done
- name: producer-3
template: influxdb-client
dependencies: [init-database]
arguments:
parameters:
- name: cmd
value: curl -XPOST 'http:https://{{tasks.influx.ip}}:8086/write?db=mydb' -d 'cpu,host=server03,region=useast load=15.4'

- name: consumer
template: influxdb-client
dependencies: [producer-1, producer-2, producer-3]
arguments:
parameters:
- name: cmd
value: curl --silent -G http:https://{{tasks.influx.ip}}:8086/query?pretty=true --data-urlencode "db=mydb" --data-urlencode "q=SELECT * FROM cpu"

- name: influxdb
daemon: true
container:
image: influxdb:1.2
restartPolicy: Always
readinessProbe:
httpGet:
path: /ping
port: 8086
initialDelaySeconds: 5
timeoutSeconds: 1

- name: influxdb-client
inputs:
parameters:
- name: cmd
container:
image: appropriate/curl:latest
command: ["sh", "-c"]
args: ["{{inputs.parameters.cmd}}"]
1 change: 1 addition & 0 deletions test/e2e/functional/dag-daemon-task.yaml
81 changes: 81 additions & 0 deletions workflow/controller/daemon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package controller

import (
"encoding/json"
"time"

"github.com/argoproj/argo/errors"
"github.com/argoproj/argo/workflow/common"
log "github.com/sirupsen/logrus"
)

// killDeamonedChildren kill any daemoned pods of a steps or DAG template node.
func (woc *wfOperationCtx) killDeamonedChildren(nodeID string) error {
woc.log.Infof("Checking deamoned children of %s", nodeID)
var firstErr error
execCtl := common.ExecutionControl{
Deadline: &time.Time{},
}
for _, childNode := range woc.wf.Status.Nodes {
if childNode.BoundaryID != nodeID {
continue
}
if childNode.Daemoned == nil || !*childNode.Daemoned {
continue
}
err := woc.updateExecutionControl(childNode.ID, execCtl)
if err != nil {
woc.log.Errorf("Failed to update execution control of %s: %+v", childNode, err)
if firstErr == nil {
firstErr = err
}
}
}
return firstErr
}

// updateExecutionControl updates the execution control parameters
func (woc *wfOperationCtx) updateExecutionControl(podName string, execCtl common.ExecutionControl) error {
execCtlBytes, err := json.Marshal(execCtl)
if err != nil {
return errors.InternalWrapError(err)
}

woc.log.Infof("Updating execution control of %s: %s", podName, execCtlBytes)
err = common.AddPodAnnotation(
woc.controller.kubeclientset,
podName,
woc.wf.ObjectMeta.Namespace,
common.AnnotationKeyExecutionControl,
string(execCtlBytes),
)
if err != nil {
return err
}

// Ideally we would simply annotate the pod with the updates and be done with it, allowing
// the executor to notice the updates naturally via the Downward API annotations volume
// mounted file. However, updates to the Downward API volumes take a very long time to
// propagate (minutes). The following code fast-tracks this by signaling the executor
// using SIGUSR2 that something changed.
woc.log.Infof("Signalling %s of updates", podName)
exec, err := common.ExecPodContainer(
woc.controller.restConfig, woc.wf.ObjectMeta.Namespace, podName,
common.WaitContainerName, true, true, "sh", "-c", "kill -s USR2 1",
)
if err != nil {
return err
}
go func() {
// This call is necessary to actually send the exec. Since signalling is best effort,
// it is launched as a goroutine and the error is discarded
_, _, err = common.GetExecutorOutput(exec)
if err != nil {
log.Warnf("Signal command failed: %v", err)
return
}
log.Infof("Signal of %s (%s) successfully issued", podName, common.WaitContainerName)
}()

return nil
}
6 changes: 6 additions & 0 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmpl *wfv1.Template, boun
if node != nil && node.Completed() {
return node
}
defer func() {
if node != nil && woc.wf.Status.Nodes[node.ID].Completed() {
_ = woc.killDeamonedChildren(node.ID)
}
}()

dagCtx := &dagContext{
boundaryName: nodeName,
boundaryID: woc.wf.NodeID(nodeName),
Expand Down
90 changes: 12 additions & 78 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import (
"regexp"
"sort"
"strings"
"time"

"github.com/argoproj/argo/errors"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
log "github.com/sirupsen/logrus"
"github.com/valyala/fasttemplate"
)

Expand Down Expand Up @@ -110,7 +108,18 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmpl *wfv1.Template, bo
// updateOutboundNodes set the outbound nodes from the last step group
func (woc *wfOperationCtx) updateOutboundNodes(nodeName string, tmpl *wfv1.Template) {
outbound := make([]string, 0)
lastSGNode := woc.getNodeByName(fmt.Sprintf("%s[%d]", nodeName, len(tmpl.Steps)-1))
// Find the last, initialized stepgroup node
var lastSGNode *wfv1.NodeStatus
for i := len(tmpl.Steps) - 1; i >= 0; i-- {
sgNode := woc.getNodeByName(fmt.Sprintf("%s[%d]", nodeName, i))
if sgNode != nil {
lastSGNode = sgNode
}
}
if lastSGNode == nil {
woc.log.Warnf("node '%s' had no initialized StepGroup nodes", nodeName)
return
}
for _, childID := range lastSGNode.Children {
outboundNodeIDs := woc.getOutboundNodes(childID)
woc.log.Infof("Outbound nodes of %s is %s", childID, outboundNodeIDs)
Expand Down Expand Up @@ -360,78 +369,3 @@ func (woc *wfOperationCtx) expandStep(step wfv1.WorkflowStep) ([]wfv1.WorkflowSt
}
return expandedStep, nil
}

// killDeamonedChildren kill any granchildren of a step template node, which have been daemoned.
// We only need to check grandchildren instead of children because the direct children of a step
// template are actually stepGroups, which are nodes that cannot represent actual containers.
// Returns the first error that occurs (if any)
// TODO(jessesuen): this logic will need to change with DAGs
func (woc *wfOperationCtx) killDeamonedChildren(nodeID string) error {
woc.log.Infof("Checking deamon children of %s", nodeID)
var firstErr error
execCtl := common.ExecutionControl{
Deadline: &time.Time{},
}
for _, childNodeID := range woc.wf.Status.Nodes[nodeID].Children {
for _, grandChildID := range woc.wf.Status.Nodes[childNodeID].Children {
gcNode := woc.wf.Status.Nodes[grandChildID]
if gcNode.Daemoned == nil || !*gcNode.Daemoned {
continue
}
err := woc.updateExecutionControl(gcNode.ID, execCtl)
if err != nil {
woc.log.Errorf("Failed to update execution control of %s: %+v", gcNode, err)
if firstErr == nil {
firstErr = err
}
}
}
}
return firstErr
}

// updateExecutionControl updates the execution control parameters
func (woc *wfOperationCtx) updateExecutionControl(podName string, execCtl common.ExecutionControl) error {
execCtlBytes, err := json.Marshal(execCtl)
if err != nil {
return errors.InternalWrapError(err)
}

woc.log.Infof("Updating execution control of %s: %s", podName, execCtlBytes)
err = common.AddPodAnnotation(
woc.controller.kubeclientset,
podName,
woc.wf.ObjectMeta.Namespace,
common.AnnotationKeyExecutionControl,
string(execCtlBytes),
)
if err != nil {
return err
}

// Ideally we would simply annotate the pod with the updates and be done with it, allowing
// the executor to notice the updates naturally via the Downward API annotations volume
// mounted file. However, updates to the Downward API volumes take a very long time to
// propagate (minutes). The following code fast-tracks this by signaling the executor
// using SIGUSR2 that something changed.
woc.log.Infof("Signalling %s of updates", podName)
exec, err := common.ExecPodContainer(
woc.controller.restConfig, woc.wf.ObjectMeta.Namespace, podName,
common.WaitContainerName, true, true, "sh", "-c", "kill -s USR2 1",
)
if err != nil {
return err
}
go func() {
// This call is necessary to actually send the exec. Since signalling is best effort,
// it is launched as a goroutine and the error is discarded
_, _, err = common.GetExecutorOutput(exec)
if err != nil {
log.Warnf("Signal command failed: %v", err)
return
}
log.Infof("Signal of %s (%s) successfully issued", podName, common.WaitContainerName)
}()

return nil
}

0 comments on commit e34728c

Please sign in to comment.