Skip to content

Commit

Permalink
Support parameter and artifact passing between DAG tasks. Improved te…
Browse files Browse the repository at this point in the history
…mplate validation
  • Loading branch information
jessesuen committed Feb 9, 2018
1 parent 03d409a commit 989e8ed
Show file tree
Hide file tree
Showing 15 changed files with 503 additions and 59 deletions.
2 changes: 1 addition & 1 deletion cmd/argo/commands/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func printNode(w *tabwriter.Writer, wf *wfv1.Workflow, node wfv1.NodeStatus, dep
if node.Type == wfv1.NodeTypePod {
args = []interface{}{nodePrefix, nodeName, node.ID, duration, node.Message}
} else {
args = []interface{}{nodePrefix, nodeName, "", "", ""}
args = []interface{}{nodePrefix, nodeName, "", "", node.Message}
}
if getArgs.output == "wide" {
msg := args[len(args)-1]
Expand Down
34 changes: 30 additions & 4 deletions test/e2e/functional/dag-argument-passing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,52 @@ spec:
inputs:
parameters:
- name: message
artifacts:
- name: passthrough
path: /tmp/passthrough
container:
image: alpine:3.7
command: [echo, "{{inputs.parameters.message}}"]
command: [sh, -c, -x]
args: ['echo "{{inputs.parameters.message}}"; cat /tmp/passthrough']
outputs:
parameters:
- name: hosts
path: /etc/hosts
valueFrom:
path: /etc/hosts
artifacts:
- name: someoutput
path: /tmp/passthrough

- name: dag-arg-passing
dag:
tasks:
- name: A
template: echo
arguments:
parameters:
parameters:
- name: message
value: val
artifacts:
- name: passthrough
raw:
data: hello
- name: B
dependencies: [A]
template: echo
arguments:
parameters:
- name: message
value: "{{dependencies.A.outputs.parameters.hosts}}"
value: "{{tasks.A.outputs.parameters.hosts}}"
artifacts:
- name: passthrough
from: "{{tasks.A.outputs.artifacts.someoutput}}"
- name: C
dependencies: [B]
template: echo
arguments:
parameters:
- name: message
value: "{{tasks.A.outputs.parameters.hosts}}"
artifacts:
- name: passthrough
from: "{{tasks.A.outputs.artifacts.someoutput}}"
1 change: 1 addition & 0 deletions test/e2e/functional/dag-coinflip.yaml
1 change: 1 addition & 0 deletions test/e2e/functional/dag-diamond-steps.yaml
1 change: 1 addition & 0 deletions test/e2e/functional/dag-diamond.yaml
1 change: 1 addition & 0 deletions test/e2e/functional/dag-multiroot.yaml
1 change: 1 addition & 0 deletions test/e2e/functional/dag-nested.yaml
Empty file.
27 changes: 27 additions & 0 deletions workflow/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,3 +316,30 @@ func IsPodTemplate(tmpl *wfv1.Template) bool {
}
return false
}

// GetTaskAncestry returns a list of taskNames which are ancestors of this task
func GetTaskAncestry(taskName string, tasks []wfv1.DAGTask) []string {
taskByName := make(map[string]wfv1.DAGTask)
for _, task := range tasks {
taskByName[task.Name] = task
}

visited := make(map[string]bool)
var getAncestry func(s string)
getAncestry = func(currTask string) {
task := taskByName[currTask]
for _, depTask := range task.Dependencies {
getAncestry(depTask)
}
if currTask != taskName {
visited[currTask] = true
}
}

getAncestry(taskName)
ancestry := make([]string, 0)
for ancestor := range visited {
ancestry = append(ancestry, ancestor)
}
return ancestry
}
123 changes: 89 additions & 34 deletions workflow/common/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ func (ctx *wfValidationCtx) validateTemplate(tmpl *wfv1.Template, args wfv1.Argu
return nil
}
ctx.results[tmpl.Name] = true
if err := validateTemplateType(tmpl); err != nil {
return err
}
_, err := ProcessArgs(tmpl, args, ctx.globalParams, true)
if err != nil {
return errors.Errorf(errors.CodeBadRequest, "templates.%s %s", tmpl.Name, err)
Expand All @@ -97,30 +100,6 @@ func (ctx *wfValidationCtx) validateTemplate(tmpl *wfv1.Template, args wfv1.Argu
for globalVar, val := range ctx.globalParams {
scope[globalVar] = val
}
// the following validates that only one template type is defined
tmplTypes := 0
if tmpl.Container != nil {
tmplTypes++
}
if tmpl.Steps != nil {
tmplTypes++
}
if tmpl.Script != nil {
tmplTypes++
}
if tmpl.Resource != nil {
tmplTypes++
}
if tmpl.DAG != nil {
tmplTypes++
}
switch tmplTypes {
case 0:
return errors.New(errors.CodeBadRequest, "template type unspecified. choose one of: container, steps, script, resource, dag")
case 1:
default:
return errors.New(errors.CodeBadRequest, "multiple template types specified. choose one of: container, steps, script, resource, dag")
}
switch tmpl.GetType() {
case wfv1.TemplateTypeSteps:
err = ctx.validateSteps(scope, tmpl)
Expand All @@ -139,6 +118,24 @@ func (ctx *wfValidationCtx) validateTemplate(tmpl *wfv1.Template, args wfv1.Argu
return nil
}

// validateTemplateType validates that only one template type is defined
func validateTemplateType(tmpl *wfv1.Template) error {
numTypes := 0
for _, tmplType := range []interface{}{tmpl.Container, tmpl.Steps, tmpl.Script, tmpl.Resource, tmpl.DAG} {
if !reflect.ValueOf(tmplType).IsNil() {
numTypes++
}
}
switch numTypes {
case 0:
return errors.New(errors.CodeBadRequest, "template type unspecified. choose one of: container, steps, script, resource, dag")
case 1:
default:
return errors.New(errors.CodeBadRequest, "multiple template types specified. choose one of: container, steps, script, resource, dag")
}
return nil
}

func validateInputs(tmpl *wfv1.Template) (map[string]interface{}, error) {
err := validateWorkflowFieldNames(tmpl.Inputs.Parameters)
if err != nil {
Expand Down Expand Up @@ -213,7 +210,7 @@ func validateNonLeaf(tmpl *wfv1.Template) error {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.activeDeadlineSeconds is only valid for leaf templates", tmpl.Name)
}
if tmpl.RetryStrategy != nil {
return errors.Errorf(errors.CodeBadRequest, "template.%s.retryStrategy is only valid for container templates", tmpl.Name)
return errors.Errorf(errors.CodeBadRequest, "templates.%s.retryStrategy is only valid for container templates", tmpl.Name)
}
return nil
}
Expand Down Expand Up @@ -265,6 +262,17 @@ func validateArguments(prefix string, arguments wfv1.Arguments) error {
return errors.Errorf(errors.CodeBadRequest, "%s%s%s", prefix, fieldName, err.Error())
}
}
for _, param := range arguments.Parameters {
if param.Value == nil {
return errors.Errorf(errors.CodeBadRequest, "%svalue is required", prefix)
}
}

for _, art := range arguments.Artifacts {
if art.From == "" && !art.HasLocation() {
return errors.Errorf(errors.CodeBadRequest, "%sfrom or artifact location is required", prefix)
}
}
return nil
}

Expand All @@ -277,27 +285,27 @@ func (ctx *wfValidationCtx) validateSteps(scope map[string]interface{}, tmpl *wf
for i, stepGroup := range tmpl.Steps {
for _, step := range stepGroup {
if step.Name == "" {
return errors.Errorf(errors.CodeBadRequest, "template.%s.steps[%d].name is required", tmpl.Name, i)
return errors.Errorf(errors.CodeBadRequest, "templates.%s.steps[%d].name is required", tmpl.Name, i)
}
_, ok := stepNames[step.Name]
if ok {
return errors.Errorf(errors.CodeBadRequest, "template.%s.steps[%d].name '%s' is not unique", tmpl.Name, i, step.Name)
return errors.Errorf(errors.CodeBadRequest, "templates.%s.steps[%d].name '%s' is not unique", tmpl.Name, i, step.Name)
}
if errs := IsValidWorkflowFieldName(step.Name); len(errs) != 0 {
return errors.Errorf(errors.CodeBadRequest, "template.%s.steps[%d].name '%s' is invalid: %s", tmpl.Name, i, step.Name, strings.Join(errs, ";"))
return errors.Errorf(errors.CodeBadRequest, "templates.%s.steps[%d].name '%s' is invalid: %s", tmpl.Name, i, step.Name, strings.Join(errs, ";"))
}
stepNames[step.Name] = true
err := addItemsToScope(&step, scope)
if err != nil {
return errors.Errorf(errors.CodeBadRequest, "template.%s.steps[%d].%s %s", tmpl.Name, i, step.Name, err.Error())
return errors.Errorf(errors.CodeBadRequest, "templates.%s.steps[%d].%s %s", tmpl.Name, i, step.Name, err.Error())
}
stepBytes, err := json.Marshal(stepGroup)
if err != nil {
return errors.InternalWrapError(err)
}
err = resolveAllVariables(scope, string(stepBytes))
if err != nil {
return errors.Errorf(errors.CodeBadRequest, "template.%s.steps[%d].%s %s", tmpl.Name, i, step.Name, err.Error())
return errors.Errorf(errors.CodeBadRequest, "templates.%s.steps[%d].%s %s", tmpl.Name, i, step.Name, err.Error())
}
childTmpl := ctx.wf.GetTemplate(step.Template)
if childTmpl == nil {
Expand Down Expand Up @@ -388,6 +396,49 @@ func validateOutputs(scope map[string]interface{}, tmpl *wfv1.Template) error {
}
}
}
for _, param := range tmpl.Outputs.Parameters {
paramRef := fmt.Sprintf("templates.%s.outputs.parameters.%s", tmpl.Name, param.Name)
err = validateOutputParameter(paramRef, &param)
if err != nil {
return err
}
tmplType := tmpl.GetType()
switch tmplType {
case wfv1.TemplateTypeContainer, wfv1.TemplateTypeScript:
if param.ValueFrom.Path == "" {
return errors.Errorf(errors.CodeBadRequest, "%s.path must be specified for %s templates", paramRef, tmplType)
}
case wfv1.TemplateTypeResource:
if param.ValueFrom.JQFilter == "" && param.ValueFrom.JSONPath == "" {
return errors.Errorf(errors.CodeBadRequest, "%s .jqFilter or jsonPath must be specified for %s templates", paramRef, tmplType)
}
case wfv1.TemplateTypeDAG, wfv1.TemplateTypeSteps:
if param.ValueFrom.Parameter == "" {
return errors.Errorf(errors.CodeBadRequest, "%s.parameter must be specified for %s templates", paramRef, tmplType)
}
}
}
return nil
}

// validateOutputParameter verifies that only one of valueFrom is defined in an output
func validateOutputParameter(paramRef string, param *wfv1.Parameter) error {
if param.ValueFrom == nil {
return errors.Errorf(errors.CodeBadRequest, "%s.valueFrom not specified", paramRef)
}
paramTypes := 0
for _, value := range []string{param.ValueFrom.Path, param.ValueFrom.JQFilter, param.ValueFrom.JSONPath, param.ValueFrom.Parameter} {
if value != "" {
paramTypes++
}
}
switch paramTypes {
case 0:
return errors.New(errors.CodeBadRequest, "valueFrom type unspecified. choose one of: path, jqFilter, jsonPath, parameter")
case 1:
default:
return errors.New(errors.CodeBadRequest, "multiple valueFrom types specified. choose one of: path, jqFilter, jsonPath, parameter")
}
return nil
}

Expand Down Expand Up @@ -486,17 +537,21 @@ func (ctx *wfValidationCtx) validateDAG(scope map[string]interface{}, tmpl *wfv1
if err != nil {
return errors.InternalWrapError(err)
}
// add outputs of all our dependencies to scope
taskScope := make(map[string]interface{})
for k, v := range scope {
taskScope[k] = v
}
for _, depName := range task.Dependencies {
ctx.addOutputsToScope(nameToTask[depName].Template, fmt.Sprintf("dependencies.%s", depName), taskScope)
ancestry := GetTaskAncestry(task.Name, tmpl.DAG.Tasks)
for _, ancestor := range ancestry {
ctx.addOutputsToScope(nameToTask[ancestor].Template, fmt.Sprintf("tasks.%s", ancestor), taskScope)
}
err = resolveAllVariables(taskScope, string(taskBytes))
if err != nil {
return errors.Errorf(errors.CodeBadRequest, "template.%s.tasks.%s %s", tmpl.Name, task.Name, err.Error())
return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks.%s %s", tmpl.Name, task.Name, err.Error())
}
err = validateArguments(fmt.Sprintf("templates.%s.tasks.%s.arguments.", tmpl.Name, task.Name), task.Arguments)
if err != nil {
return err
}
taskTmpl := ctx.wf.GetTemplate(task.Template)
err = ctx.validateTemplate(taskTmpl, task.Arguments)
Expand Down
Loading

0 comments on commit 989e8ed

Please sign in to comment.