Skip to content

Commit

Permalink
Implemented Conditionally annotate outputs of script template only wh…
Browse files Browse the repository at this point in the history
…en consumed argoproj#1359 (argoproj#1462)

* Fixed argoproj#1359 Implemented Conditionally annotate outputs of script template only when consumed
  • Loading branch information
sarabala1979 committed Jul 30, 2019
1 parent b028e61 commit 781d3b8
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 21 deletions.
3 changes: 3 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (

// AnnotationKeyNodeName is the pod metadata annotation key containing the workflow node name
AnnotationKeyNodeName = workflow.FullName + "/node-name"

// AnnotationKeyNodeMessage is the pod metadata annotation key the executor will use to
// communicate errors encountered by the executor during artifact load/save, etc...
AnnotationKeyNodeMessage = workflow.FullName + "/node-message"
Expand Down Expand Up @@ -130,6 +131,8 @@ type ExecutionControl struct {
// It is used to signal the executor to terminate a daemoned container. In the future it will be
// used to support workflow or steps/dag level timeouts.
Deadline *time.Time `json:"deadline,omitempty"`
// IncludeScriptOutput is containing flag to include script output
IncludeScriptOutput bool `json:"includeScriptOutput,omitempty"`
}

type ResourceInterface interface {
Expand Down
19 changes: 10 additions & 9 deletions workflow/controller/exec_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,32 +48,33 @@ func (woc *wfOperationCtx) applyExecutionControl(pod *apiv1.Pod, wfNodesLock *sy
}
}

// Now ensure the pod's current annotation matches our desired deadline
desiredExecCtl := common.ExecutionControl{
Deadline: woc.workflowDeadline,
}
var podExecCtl common.ExecutionControl
if execCtlStr, ok := pod.Annotations[common.AnnotationKeyExecutionControl]; ok && execCtlStr != "" {
err := json.Unmarshal([]byte(execCtlStr), &podExecCtl)
if err != nil {
woc.log.Warnf("Failed to unmarshal execution control from pod %s", pod.Name)
}
}
if podExecCtl.Deadline == nil && desiredExecCtl.Deadline == nil {
if podExecCtl.Deadline == nil && woc.workflowDeadline == nil {
return nil
} else if podExecCtl.Deadline != nil && desiredExecCtl.Deadline != nil {
if podExecCtl.Deadline.Equal(*desiredExecCtl.Deadline) {
} else if podExecCtl.Deadline != nil && woc.workflowDeadline != nil {
if podExecCtl.Deadline.Equal(*woc.workflowDeadline) {
return nil
}
}

if podExecCtl.Deadline != nil && podExecCtl.Deadline.IsZero() {
// If the pod has already been explicitly signaled to terminate, then do nothing.
// This can happen when daemon steps are terminated.
woc.log.Infof("Skipping sync of execution control of pod %s. pod has been signaled to terminate", pod.Name)
return nil
}
woc.log.Infof("Execution control for pod %s out-of-sync desired: %v, actual: %v", pod.Name, desiredExecCtl.Deadline, podExecCtl.Deadline)
return woc.updateExecutionControl(pod.Name, desiredExecCtl)

// Assign new deadline value to PodExeCtl
podExecCtl.Deadline = woc.workflowDeadline

woc.log.Infof("Execution control for pod %s out-of-sync desired: %v, actual: %v", pod.Name, woc.workflowDeadline, podExecCtl.Deadline)
return woc.updateExecutionControl(pod.Name, podExecCtl)
}

// killDaemonedChildren kill any daemoned pods of a steps or DAG template node.
Expand Down
49 changes: 46 additions & 3 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1293,7 +1293,7 @@ func (woc *wfOperationCtx) executeContainer(nodeName string, tmpl *wfv1.Template
return node
}
woc.log.Debugf("Executing node %s with container template: %v\n", nodeName, tmpl)
_, err := woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl)
_, err := woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl, false)
if err != nil {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
}
Expand Down Expand Up @@ -1365,14 +1365,57 @@ func getTemplateOutputsFromScope(tmpl *wfv1.Template, scope *wfScope) (*wfv1.Out
return &outputs, nil
}

// hasOutputResultRef will check given template output has any reference
func hasOutputResultRef(name string, parentTmpl *wfv1.Template) bool {

var variableRefName string
if parentTmpl.DAG != nil {
variableRefName = "{{tasks." + name + ".outputs.result}}"
} else if parentTmpl.Steps != nil {
variableRefName = "{{steps." + name + ".outputs.result}}"
}

jsonValue, err := json.Marshal(parentTmpl)
if err != nil {
log.Warnf("Unable to marshal the template. %v, %v", parentTmpl, err)
}

return strings.Contains(string(jsonValue), variableRefName)
}

// getStepOrDAGTaskName will extract the node from NodeStatus Name
func getStepOrDAGTaskName(nodeName string, hasRetryStrategy bool) string {
if strings.Contains(nodeName, ".") {
name := nodeName[strings.LastIndex(nodeName, ".")+1:]
// Check retry scenario
if hasRetryStrategy {
if indx := strings.LastIndex(name, "("); indx > 0 {
return name[0:indx]
}
}
return name
}
return nodeName
}

func (woc *wfOperationCtx) executeScript(nodeName string, tmpl *wfv1.Template, boundaryID string) *wfv1.NodeStatus {

boundaryNode := woc.wf.Status.Nodes[boundaryID]
parentTemplate := woc.wf.GetTemplate(boundaryNode.TemplateName)

includeScriptOutput := false
if parentTemplate != nil {
name := getStepOrDAGTaskName(nodeName, tmpl.RetryStrategy != nil)
includeScriptOutput = hasOutputResultRef(name, parentTemplate)
}
node := woc.getNodeByName(nodeName)

if node != nil {
return node
}
mainCtr := tmpl.Script.Container
mainCtr.Args = append(mainCtr.Args, common.ExecutorScriptSourcePath)
_, err := woc.createWorkflowPod(nodeName, mainCtr, tmpl)
_, err := woc.createWorkflowPod(nodeName, mainCtr, tmpl, includeScriptOutput)
if err != nil {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
}
Expand Down Expand Up @@ -1607,7 +1650,7 @@ func (woc *wfOperationCtx) executeResource(nodeName string, tmpl *wfv1.Template,
mainCtr.VolumeMounts = []apiv1.VolumeMount{
volumeMountPodMetadata,
}
_, err = woc.createWorkflowPod(nodeName, *mainCtr, tmpl)
_, err = woc.createWorkflowPod(nodeName, *mainCtr, tmpl, false)
if err != nil {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
}
Expand Down
128 changes: 125 additions & 3 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"fmt"
"github.com/argoproj/argo/workflow/config"
"strings"
"testing"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
Expand Down Expand Up @@ -1160,6 +1161,127 @@ func TestResourceWithOwnerReferenceTemplate(t *testing.T) {
}
}

var stepScriptTmpl = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: scripts-bash-
spec:
entrypoint: bash-script-example
templates:
- name: bash-script-example
steps:
- - name: generate
template: gen-random-int
- - name: print
template: print-message
arguments:
parameters:
- name: message
value: "{{steps.generate.outputs.result}}"
- name: gen-random-int
script:
image: debian:9.4
command: [bash]
source: |
cat /dev/urandom | od -N2 -An -i | awk -v f=1 -v r=100 '{printf "%i\n", f + r * $1 / 65536}'
- name: print-message
inputs:
parameters:
- name: message
container:
image: alpine:latest
command: [sh, -c]
args: ["echo result was: {{inputs.parameters.message}}"]
`

var dagScriptTmpl = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-target-
spec:
entrypoint: dag-target
arguments:
parameters:
- name: target
value: E
templates:
- name: dag-target
dag:
tasks:
- name: A
template: echo
arguments:
parameters: [{name: message, value: A}]
- name: B
template: echo
arguments:
parameters: [{name: message, value: B}]
- name: C
dependencies: [A]
template: echo
arguments:
parameters: [{name: message, value: "{{tasks.A.outputs.result}}"}]
- name: echo
script:
image: debian:9.4
command: [bash]
source: |
cat /dev/urandom | od -N2 -An -i | awk -v f=1 -v r=100 '{printf "%i\n", f + r * $1 / 65536}'`

func TestStepWFGetNodeName(t *testing.T) {

controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

// operate the workflow. it should create a pod.
wf := unmarshalWF(stepScriptTmpl)
wf, err := wfcset.Create(wf)
assert.Nil(t, err)
assert.True(t, hasOutputResultRef("generate", &wf.Spec.Templates[0]))
assert.False(t, hasOutputResultRef("print-message", &wf.Spec.Templates[0]))
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)
for _, node := range wf.Status.Nodes {
if strings.Contains(node.Name, "generate") {
assert.True(t, getStepOrDAGTaskName(node.Name, &wf.Spec.Templates[0].RetryStrategy != nil) == "generate")
} else if strings.Contains(node.Name, "print-message") {
assert.True(t, getStepOrDAGTaskName(node.Name, &wf.Spec.Templates[0].RetryStrategy != nil) == "print-message")
}
}
}

func TestDAGWFGetNodeName(t *testing.T) {

controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

// operate the workflow. it should create a pod.
wf := unmarshalWF(dagScriptTmpl)
wf, err := wfcset.Create(wf)
assert.Nil(t, err)
assert.True(t, hasOutputResultRef("A", &wf.Spec.Templates[0]))
assert.False(t, hasOutputResultRef("B", &wf.Spec.Templates[0]))
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)
for _, node := range wf.Status.Nodes {
if strings.Contains(node.Name, ".A") {
assert.True(t, getStepOrDAGTaskName(node.Name, wf.Spec.Templates[0].RetryStrategy != nil) == "A")
}
if strings.Contains(node.Name, ".B") {
assert.True(t, getStepOrDAGTaskName(node.Name, wf.Spec.Templates[0].RetryStrategy != nil) == "B")
}
}
}

var withParamAsJsonList = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand All @@ -1181,14 +1303,14 @@ spec:
- name: message
value: "{{item}}"
withParam: "{{workflow.parameters.input}}"
- name: whalesay
- name: whalesay
inputs:
parameters:
- name: message
container:
script:
image: alpine:latest
command: [sh, -c]
args: ["echo "]
args: ["echo result was: {{inputs.parameters.message}}"]
`

func TestWithParamAsJsonList(t *testing.T) {
Expand Down
19 changes: 13 additions & 6 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (woc *wfOperationCtx) getVolumeDockerSock() apiv1.Volume {
}
}

func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Container, tmpl *wfv1.Template) (*apiv1.Pod, error) {
func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Container, tmpl *wfv1.Template, includeScriptOutput bool) (*apiv1.Pod, error) {
nodeID := woc.wf.NodeID(nodeName)
woc.log.Debugf("Creating Pod: %s (%s)", nodeName, nodeID)
tmpl = tmpl.DeepCopy()
Expand Down Expand Up @@ -168,7 +168,7 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
}

addSchedulingConstraints(pod, wfSpec, tmpl)
woc.addMetadata(pod, tmpl)
woc.addMetadata(pod, tmpl, includeScriptOutput)

err = addVolumeReferences(pod, woc.volumes, tmpl, woc.wf.Status.PersistentVolumeClaims)
if err != nil {
Expand Down Expand Up @@ -446,21 +446,28 @@ func isResourcesSpecified(ctr *apiv1.Container) bool {
}

// addMetadata applies metadata specified in the template
func (woc *wfOperationCtx) addMetadata(pod *apiv1.Pod, tmpl *wfv1.Template) {
func (woc *wfOperationCtx) addMetadata(pod *apiv1.Pod, tmpl *wfv1.Template, includeScriptOutput bool) {
for k, v := range tmpl.Metadata.Annotations {
pod.ObjectMeta.Annotations[k] = v
}
for k, v := range tmpl.Metadata.Labels {
pod.ObjectMeta.Labels[k] = v
}

execCtl := common.ExecutionControl{
IncludeScriptOutput: includeScriptOutput,
}

if woc.workflowDeadline != nil {
execCtl := common.ExecutionControl{
Deadline: woc.workflowDeadline,
}
execCtl.Deadline = woc.workflowDeadline

}
if woc.workflowDeadline != nil || includeScriptOutput {
execCtlBytes, err := json.Marshal(execCtl)
if err != nil {
panic(err)
}

pod.ObjectMeta.Annotations[common.AnnotationKeyExecutionControl] = string(execCtlBytes)
}
}
Expand Down
5 changes: 5 additions & 0 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,11 @@ func (we *WorkflowExecutor) GetMainContainerID() (string, error) {

// CaptureScriptResult will add the stdout of a script template as output result
func (we *WorkflowExecutor) CaptureScriptResult() error {

if we.ExecutionControl == nil || !we.ExecutionControl.IncludeScriptOutput {
log.Infof("No Script output reference in workflow. Capturing script output ignored")
return nil
}
if we.Template.Script == nil {
return nil
}
Expand Down

0 comments on commit 781d3b8

Please sign in to comment.