Skip to content

Commit

Permalink
Support for argument passing and substitution in templates
Browse files Browse the repository at this point in the history
  • Loading branch information
jessesuen committed Oct 30, 2017
1 parent 5e8ba87 commit f3010c1
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 25 deletions.
14 changes: 13 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion api/workflow/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type WorkflowList struct {
type WorkflowSpec struct {
Templates []Template `json:"templates"`
Entrypoint string `json:"entrypoint"`
Arguments Arguments `json:"arguments,omitempty"`
}

type Template struct {
Expand Down Expand Up @@ -105,15 +106,18 @@ type OutputParameter struct {
Path string `json:"path,omitempty"`
}

type Item interface{}

// WorkflowStep is either a template ref, an inlined container, with added flags
type WorkflowStep struct {
Template string `json:"template,omitempty"`
Arguments Arguments `json:"arguments,omitempty"`
Flags []string `json:"flags,omitempty"`
WithItems Item `json:"withItems,omitempty"`
}

// Arguments to a template
type Arguments map[string]*string
type Arguments map[string]string

type WorkflowStatus struct {
Phase string `json:"phase"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ metadata:
spec:
entrypoint: artifact-example
templates:
- name: artifact-example
-
name: artifact-example
type: workflow
steps:
- COWSAY:
Expand All @@ -14,7 +15,8 @@ spec:
template: print-message
arguments:
artifacts.MESSAGE: "{{steps.COWSAY.outputs.artifacts.MESSAGE}}"
- name: cowsay
-
name: cowsay
type: container
image: docker/whalesay:latest
command: [sh, -c]
Expand All @@ -23,7 +25,8 @@ spec:
artifacts:
MESSAGE:
path: /tmp/hello_world.txt
- name: print-message
-
name: print-message
type: container
inputs:
artifacts:
Expand Down
File renamed without changes.
File renamed without changes.
27 changes: 27 additions & 0 deletions examples/loops-maps.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
apiVersion: argoproj.io/v1
kind: Workflow
metadata:
generateName: argo-wf-
spec:
entrypoint: loop-map-example
templates:
-
name: loop-map-example
type: workflow
steps:
- COWSAY:
template: cowsay
arguments:
parameters.MESSAGE: "{{item.adjective}} {{item.noun}}"
with_items:
- { adjective: 'friendly', noun: 'dog' }
- { adjective: 'sneaky', noun: 'cat' }
-
name: cowsay
type: container
inputs:
parameters:
MESSAGE:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.MESSAGE}}"]
27 changes: 27 additions & 0 deletions examples/loops.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
apiVersion: argoproj.io/v1
kind: Workflow
metadata:
generateName: argo-wf-
spec:
entrypoint: loop-example
templates:
-
name: loop-example
type: workflow
steps:
- COWSAY:
template: cowsay
arguments:
parameters.MESSAGE: "{{item}}"
with_items:
- hello world
- goodbye world
-
name: cowsay
type: container
inputs:
parameters:
MESSAGE:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.MESSAGE}}"]
85 changes: 66 additions & 19 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package controller

import (
"encoding/json"
"fmt"
"strings"

wfv1 "github.com/argoproj/argo/api/workflow/v1"
"github.com/argoproj/argo/errors"
log "github.com/sirupsen/logrus"
"github.com/valyala/fasttemplate"
)

// wfOperationCtx is the context for evaluation and operation of a single workflow
Expand Down Expand Up @@ -58,13 +61,13 @@ func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) {
woc.updated = true
}

err := woc.executeTemplate(wf.Spec.Entrypoint, nil, wf.ObjectMeta.Name)
err := woc.executeTemplate(wf.Spec.Entrypoint, wf.Spec.Arguments, wf.ObjectMeta.Name)
if err != nil {
woc.log.Errorf("%s error: %+v", wf.ObjectMeta.Name, err)
}
}

func (woc *wfOperationCtx) executeTemplate(templateName string, args *wfv1.Arguments, nodeName string) error {
func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Arguments, nodeName string) error {
woc.log.Infof("Evaluating node %s: %v, args: %#v", nodeName, templateName, args)
nodeID := woc.wf.NodeID(nodeName)
node, ok := woc.wf.Status.Nodes[nodeID]
Expand All @@ -75,29 +78,37 @@ func (woc *wfOperationCtx) executeTemplate(templateName string, args *wfv1.Argum
tmpl := woc.wf.GetTemplate(templateName)
if tmpl == nil {
err := errors.Errorf(errors.CodeBadRequest, "Node %s error: template '%s' undefined", nodeName, templateName)
woc.wf.Status.Nodes[nodeID] = wfv1.NodeStatus{ID: nodeID, Name: nodeName, Status: wfv1.NodeStatusError}
woc.updated = true
woc.markNodeStatus(nodeName, wfv1.NodeStatusError)
return err
}
if len(args) > 0 {
var err error
tmpl, err = substituteArgs(tmpl, args)
if err != nil {
woc.markNodeStatus(nodeName, wfv1.NodeStatusError)
return err
}
}

switch tmpl.Type {
case wfv1.TypeContainer:
if !ok {
// We have not yet created the pod
status := wfv1.NodeStatusRunning
err := woc.createWorkflowPod(nodeName, tmpl, args)
if err != nil {
// TODO: may need to query pod status if we hit already exists error
status = wfv1.NodeStatusError
return err
}
node = wfv1.NodeStatus{ID: nodeID, Name: nodeName, Status: status}
woc.wf.Status.Nodes[nodeID] = node
woc.log.Infof("Initialized container node %v", node)
woc.updated = true
if ok {
// There's already a node entry for the container. This means the container was already
// scheduled (or had a create pod error). Nothing to more to do with this node.
return nil
}
return nil
// We have not yet created the pod
status := wfv1.NodeStatusRunning
err := woc.createWorkflowPod(nodeName, tmpl, args)
if err != nil {
// TODO: may need to query pod status if we hit already exists error
status = wfv1.NodeStatusError
}
node = wfv1.NodeStatus{ID: nodeID, Name: nodeName, Status: status}
woc.wf.Status.Nodes[nodeID] = node
woc.log.Infof("Initialized container node %v", node)
woc.updated = true
return err

case wfv1.TypeWorkflow:
if !ok {
Expand Down Expand Up @@ -140,6 +151,19 @@ func (woc *wfOperationCtx) executeTemplate(templateName string, args *wfv1.Argum
}
}

// markNodeError marks a node with the given status, creating the node if necessary
func (woc *wfOperationCtx) markNodeStatus(nodeName string, status string) {
nodeID := woc.wf.NodeID(nodeName)
node, ok := woc.wf.Status.Nodes[nodeID]
if !ok {
node = wfv1.NodeStatus{ID: nodeID, Name: nodeName, Status: status}
} else {
node.Status = status
}
woc.wf.Status.Nodes[nodeID] = node
woc.updated = true
}

func (woc *wfOperationCtx) executeStepGroup(stepGroup map[string]wfv1.WorkflowStep, nodeName string) error {
nodeID := woc.wf.NodeID(nodeName)
node, ok := woc.wf.Status.Nodes[nodeID]
Expand All @@ -158,7 +182,7 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup map[string]wfv1.WorkflowSt
for stepName, step := range stepGroup {
childNodeName := fmt.Sprintf("%s.%s", nodeName, stepName)
childNodeIDs = append(childNodeIDs, woc.wf.NodeID(childNodeName))
err := woc.executeTemplate(step.Template, &step.Arguments, childNodeName)
err := woc.executeTemplate(step.Template, step.Arguments, childNodeName)
if err != nil {
node.Status = wfv1.NodeStatusError
woc.wf.Status.Nodes[nodeID] = node
Expand Down Expand Up @@ -188,3 +212,26 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup map[string]wfv1.WorkflowSt
woc.log.Infof("Step group node %s successful", nodeID)
return nil
}

// substituteArgs returns a new copy of the template with all input parameters substituted
func substituteArgs(tmpl *wfv1.Template, args wfv1.Arguments) (*wfv1.Template, error) {
tmplBytes, err := json.Marshal(tmpl)
if err != nil {
return nil, errors.InternalWrapError(err)
}
fstTmpl := fasttemplate.New(string(tmplBytes), "{{", "}}")
replaceMap := make(map[string]interface{})
for argName, argVal := range args {
if strings.HasPrefix(argName, "parameters.") {
replaceMap["inputs."+argName] = argVal
}
}
s := fstTmpl.ExecuteString(replaceMap)

var newTmpl wfv1.Template
err = json.Unmarshal([]byte(s), &newTmpl)
if err != nil {
return nil, errors.InternalWrapError(err)
}
return &newTmpl, nil
}
2 changes: 1 addition & 1 deletion workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func envFromField(envVarName, fieldPath string) corev1.EnvVar {
}
}

func (woc *wfOperationCtx) createWorkflowPod(nodeName string, tmpl *wfv1.Template, args *wfv1.Arguments) error {
func (woc *wfOperationCtx) createWorkflowPod(nodeName string, tmpl *wfv1.Template, args wfv1.Arguments) error {
woc.log.Infof("Creating Pod: %s", nodeName)
initCtr, err := woc.newInitContainer(tmpl)
if err != nil {
Expand Down

0 comments on commit f3010c1

Please sign in to comment.