Skip to content

Commit

Permalink
Support for automatic termination for daemoned workflow steps
Browse files Browse the repository at this point in the history
  • Loading branch information
jessesuen committed Nov 9, 2017
1 parent 2435e6f commit 6a54b31
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 29 deletions.
12 changes: 9 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 71 additions & 0 deletions workflow/common/util.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package common

import (
"bytes"
"fmt"
"strings"

wfv1 "github.com/argoproj/argo/api/workflow/v1"
"github.com/argoproj/argo/errors"
log "github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
)

// FindOverlappingVolume looks an artifact path, checks if it overlaps with any
Expand All @@ -24,3 +31,67 @@ func FindOverlappingVolume(tmpl *wfv1.Template, path string) *apiv1.VolumeMount
}
return volMnt
}

// KillPodContainer is a convenience funtion to issue a kill signal to a container in a pod
// It gives a 15 second grace period before issuing SIGKILL
// NOTE: this only works with containers that have sh
func KillPodContainer(restConfig *rest.Config, namespace string, pod string, container string) error {
exec, err := ExecPodContainer(restConfig, namespace, pod, container, true, true, "sh", "-c", "kill 1; sleep 15; kill -9 1")
if err != nil {
return err
}
// Stream will initiate the command. We do want to wait for the result so we launch as a goroutine
go func() {
_, _, err := GetExecutorOutput(exec)
if err != nil {
log.Warnf("Kill command failed (expected to fail with 137): %v", err)
return
}
log.Infof("Kill of %s (%s) successfully issued", pod, container)
}()
return nil
}

// ExecPodContainer runs a command in a container in a pod and returns the remotecommand.Executor
func ExecPodContainer(restConfig *rest.Config, namespace string, pod string, container string, stdout bool, stderr bool, command ...string) (remotecommand.Executor, error) {
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
errors.InternalWrapError(err)
}

execRequest := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(pod).
Namespace(namespace).
SubResource("exec").
Param("container", container).
Param("stdout", fmt.Sprintf("%v", stdout)).
Param("stderr", fmt.Sprintf("%v", stderr)).
Param("tty", "false")

for _, cmd := range command {
execRequest = execRequest.Param("command", cmd)
}

log.Info(execRequest.URL())
exec, err := remotecommand.NewSPDYExecutor(restConfig, "POST", execRequest.URL())
if err != nil {
return nil, errors.InternalWrapError(err)
}
return exec, nil
}

// GetExecutorOutput returns the output of an remotecommand.Executor
func GetExecutorOutput(exec remotecommand.Executor) (string, string, error) {
var stdOut bytes.Buffer
var stdErr bytes.Buffer
err := exec.Stream(remotecommand.StreamOptions{
Stdout: &stdOut,
Stderr: &stdErr,
Tty: false,
})
if err != nil {
return "", "", errors.InternalWrapError(err)
}
return stdOut.String(), stdErr.String(), nil
}
78 changes: 58 additions & 20 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type WorkflowController struct {
WorkflowScheme *runtime.Scheme
Config WorkflowControllerConfig

restConfig *rest.Config
clientset *kubernetes.Clientset
podCl corev1.PodInterface
wfUpdates chan *wfv1.Workflow
Expand Down Expand Up @@ -66,6 +67,7 @@ func NewWorkflowController(config *rest.Config, configMap string) *WorkflowContr
}

wfc := WorkflowController{
restConfig: config,
clientset: clientset,
WorkflowClient: wfClient,
WorkflowScheme: wfScheme,
Expand Down Expand Up @@ -235,21 +237,26 @@ func (wfc *WorkflowController) watchWorkflowPods(ctx context.Context) (cache.Con
return controller, nil
}

// handlePodUpdate receives an update from a pod, and updates the status of the node in the workflow object accordingly
func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) {
workflowName, ok := pod.Labels[common.LabelKeyWorkflow]
if !ok {
// Ignore pods unrelated to workflow (this shouldn't happen unless the watch is setup incorrectly)
return
}
var newStatus string
var deamoned *bool
var newDaemonStatus *bool
switch pod.Status.Phase {
case apiv1.PodPending:
return
case apiv1.PodSucceeded:
newStatus = wfv1.NodeStatusSucceeded
f := false
newDaemonStatus = &f
case apiv1.PodFailed:
newStatus = wfv1.NodeStatusFailed
f := false
newDaemonStatus = &f
case apiv1.PodRunning:
tmplStr, ok := pod.Annotations[common.AnnotationKeyTemplate]
if !ok {
Expand All @@ -263,6 +270,7 @@ func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) {
return
}
if tmpl.Daemon == nil || !*tmpl.Daemon {
// incidental state change of a running pod. No need to inspect further
return
}
// pod is running and template is marked daemon. check if everything is ready
Expand All @@ -271,10 +279,10 @@ func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) {
return
}
}
// proceed to mark node status as completed (and daemoned)
// proceed to mark node status as succeeded (and daemoned)
newStatus = wfv1.NodeStatusSucceeded
t := true
deamoned = &t
newDaemonStatus = &t
log.Infof("Processing ready daemon pod: %v", pod.ObjectMeta.SelfLink)
default:
log.Infof("Unexpected pod phase for %s: %s", pod.ObjectMeta.Name, pod.Status.Phase)
Expand All @@ -291,31 +299,61 @@ func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) {
log.Warnf("pod %s unassociated with workflow %s", pod.Name, workflowName)
return
}
if node.Completed() {
log.Infof("node %v already marked completed (%s)", node, node.Status)
updateNeeded := applyUpdates(pod, &node, newStatus, newDaemonStatus)
if !updateNeeded {
log.Infof("No workflow updated needed for node %s", node)
return
}
//addOutputs(pod, &node)
wf.Status.Nodes[pod.Name] = node
_, err = wfc.WorkflowClient.UpdateWorkflow(wf)
if err != nil {
log.Errorf("Failed to update %s status: %+v", pod.Name, err)
// if we fail to update the CRD state, we will need to rely on resync to catch up
return
}
log.Infof("Updated %s", node)
}

// applyUpdates applies any new state information about a pod, to the current status of the workflow node
// returns whether or not any updates were necessary (resulting in a update to the workflow)
func applyUpdates(pod *apiv1.Pod, node *wfv1.NodeStatus, newStatus string, newDaemonStatus *bool) bool {
// Check various fields of the pods to see if we need to update the workflow
updateNeeded := false
if node.Status != newStatus {
log.Infof("Updating node %s status %s -> %s", node, node.Status, newStatus)
updateNeeded = true
node.Status = newStatus
}
if pod.Status.PodIP != node.PodIP {
log.Infof("Updating node %s IP %s -> %s", node, node.PodIP, pod.Status.PodIP)
updateNeeded = true
node.PodIP = pod.Status.PodIP
}
if newDaemonStatus != nil {
if *newDaemonStatus == false {
// if the daemon status switched to false, we prefer to just unset daemoned status field
// (as opposed to setting it to false)
newDaemonStatus = nil
}
if newDaemonStatus != nil && node.Daemoned == nil || newDaemonStatus == nil && node.Daemoned != nil {
log.Infof("Setting node %v daemoned: %v -> %v", node, node.Daemoned, newDaemonStatus)
node.Daemoned = newDaemonStatus
updateNeeded = true
}
}
outputStr, ok := pod.Annotations[common.AnnotationKeyOutputs]
if ok {
if ok && node.Outputs == nil {
log.Infof("Setting node %v outputs", node)
updateNeeded = true
var outputs wfv1.Outputs
err = json.Unmarshal([]byte(outputStr), &outputs)
err := json.Unmarshal([]byte(outputStr), &outputs)
if err != nil {
log.Errorf("Failed to unmarshal %s outputs from pod annotation: %v", pod.Name, err)
newStatus = wfv1.NodeStatusError
node.Status = wfv1.NodeStatusError
} else {
node.Outputs = &outputs
}
}
log.Infof("Updating node %s status %s -> %s (IP: %s)", node, node.Status, newStatus, pod.Status.PodIP)
node.Status = newStatus
node.PodIP = pod.Status.PodIP
node.Daemoned = deamoned
wf.Status.Nodes[pod.Name] = node
_, err = wfc.WorkflowClient.UpdateWorkflow(wf)
if err != nil {
log.Errorf("Failed to update %s status: %+v", pod.Name, err)
// if we fail to update the CRD state, we will need to rely on resync to catch up
return
}
log.Infof("Updated %v", node)
return updateNeeded
}
44 changes: 38 additions & 6 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

wfv1 "github.com/argoproj/argo/api/workflow/v1"
"github.com/argoproj/argo/errors"
"github.com/argoproj/argo/workflow/common"
log "github.com/sirupsen/logrus"
"github.com/valyala/fasttemplate"
apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -206,7 +207,11 @@ func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Argume
node = *woc.markNodeStatus(nodeName, wfv1.NodeStatusRunning)
woc.log.Infof("Initialized workflow node %v", node)
}
return woc.executeSteps(nodeName, tmpl)
err = woc.executeSteps(nodeName, tmpl)
if woc.wf.Status.Nodes[nodeID].Completed() {
woc.killDeamonedChildren(nodeID)
}
return err

} else if tmpl.Script != nil {
return woc.executeScript(nodeName, tmpl)
Expand Down Expand Up @@ -385,38 +390,40 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod
return err
}

childNodeIDs := make([]string, 0)
// Kick off all parallel steps in the group
for _, step := range stepGroup {
childNodeName := fmt.Sprintf("%s.%s", sgNodeName, step.Name)
woc.addChildNode(sgNodeName, childNodeName)
childNodeIDs = append(childNodeIDs, woc.wf.NodeID(childNodeName))

// Check the step's when clause to decide if it should execute
proceed, err := shouldExecute(step.When)
if err != nil {
woc.markNodeStatus(childNodeName, wfv1.NodeStatusError)
woc.markNodeStatus(sgNodeName, wfv1.NodeStatusError)
return err
}
if !proceed {
woc.log.Infof("Skipping %s: when '%s' false", childNodeName, step.When)
woc.log.Infof("Skipping %s: when '%s' evaluated false", childNodeName, step.When)
woc.markNodeStatus(childNodeName, wfv1.NodeStatusSkipped)
continue
}
err = woc.executeTemplate(step.Template, step.Arguments, childNodeName)
if err != nil {
woc.markNodeStatus(childNodeName, wfv1.NodeStatusError)
woc.markNodeStatus(sgNodeName, wfv1.NodeStatusError)
return err
}
}

node = woc.wf.Status.Nodes[nodeID]
// Return if not all children completed
for _, childNodeID := range childNodeIDs {
for _, childNodeID := range node.Children {
if !woc.wf.Status.Nodes[childNodeID].Completed() {
return nil
}
}
// All children completed. Determine step group status as a whole
for _, childNodeID := range childNodeIDs {
for _, childNodeID := range node.Children {
childNode := woc.wf.Status.Nodes[childNodeID]
if !childNode.Successful() {
woc.markNodeStatus(sgNodeName, wfv1.NodeStatusFailed)
Expand Down Expand Up @@ -711,3 +718,28 @@ func (woc *wfOperationCtx) addChildNode(parent string, child string) {
woc.wf.Status.Nodes[parentID] = node
woc.updated = true
}

// killDeamonedChildren kill any granchildren of a step template node, which have been daemoned.
// We only need to check grandchildren instead of children becuase the direct children of a step
// template are actually stepGroups, which are nodes that cannot represent actual containers.
// Returns the first error that occurs (if any)
func (woc *wfOperationCtx) killDeamonedChildren(nodeID string) error {
log.Infof("Checking deamon children of %s", nodeID)
var firstErr error
for _, childNodeID := range woc.wf.Status.Nodes[nodeID].Children {
for _, grandChildID := range woc.wf.Status.Nodes[childNodeID].Children {
gcNode := woc.wf.Status.Nodes[grandChildID]
if gcNode.Daemoned == nil || !*gcNode.Daemoned {
continue
}
err := common.KillPodContainer(woc.controller.restConfig, apiv1.NamespaceDefault, gcNode.ID, common.MainContainerName)
if err != nil {
log.Errorf("Failed to kill %s: %+v", gcNode, err)
if firstErr == nil {
firstErr = err
}
}
}
}
return firstErr
}

0 comments on commit 6a54b31

Please sign in to comment.