Skip to content

Commit

Permalink
Support withItems/withParam and parameter aggregation with DAG templa…
Browse files Browse the repository at this point in the history
…tes (issue argoproj#801)
  • Loading branch information
jessesuen committed Aug 6, 2018
1 parent 94c195c commit d07c1d2
Show file tree
Hide file tree
Showing 16 changed files with 410 additions and 122 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
## 2.2.0 (TBD)

### Changelog since v2.1
+ Support withItems/withParam and parameter aggregation with DAG templates (issue #801)
+ Add ability to aggregate and reference output parameters expanded by loops (issue #861)
+ Github login using go-git, with support for ssh keys (@andreimc)
+ Add `argo delete --older` flag to delete completed workflows older than a duration
+ Support submission of workflows as json, and from stdin (issue #926)
+ Support submission of workflows from json files (issue #926)
+ Support submission of workflows from stdin (issue #926)
* Update golang compiler to v1.10.3
* Update k8s dependencies to v1.10 and client-go to v7.0
* Update argo-cluster-role to work with OpenShift
Expand Down
13 changes: 12 additions & 1 deletion api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,17 @@
"template": {
"description": "Name of template to execute",
"type": "string"
},
"withItems": {
"description": "WithItems expands a task into multiple parallel tasks from the items in the list",
"type": "array",
"items": {
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.Item"
}
},
"withParam": {
"description": "WithParam expands a task into multiple parallel tasks from the value in the parameter, which is expected to be a JSON list.",
"type": "string"
}
}
},
Expand Down Expand Up @@ -969,7 +980,7 @@
}
},
"withParam": {
"description": "WithParam expands a step into from the value in the parameter",
"description": "WithParam expands a step into multiple parallel steps from the value in the parameter, which is expected to be a JSON list.",
"type": "string"
}
}
Expand Down
41 changes: 41 additions & 0 deletions examples/loops-dag.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Example of loops using DAGs
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: loops-dag-
spec:
entrypoint: loops-dag
templates:
- name: loops-dag
dag:
tasks:
- name: A
template: whalesay
arguments:
parameters:
- {name: message, value: A}
- name: B
dependencies: [A]
template: whalesay
arguments:
parameters:
- {name: message, value: "{{item}}"}
withItems:
- foo
- bar
- baz
- name: C
dependencies: [B]
template: whalesay
arguments:
parameters:
- {name: message, value: C}

- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
61 changes: 61 additions & 0 deletions examples/parameter-aggregation-dag.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: parameter-aggregation-dag-
spec:
entrypoint: parameter-aggregation
templates:
- name: parameter-aggregation
dag:
tasks:
- name: odd-or-even
template: odd-or-even
arguments:
parameters:
- name: num
value: "{{item}}"
withItems: [1, 2, 3, 4]
- name: pick-evens
template: pick-evens
dependencies: [odd-or-even]
arguments:
parameters:
- name: num
value: "{{item.num}}"
withParam: "{{tasks.odd-or-even.outputs.parameters}}"
#when: "{{item.evenness}} == even"

# odd-or-even accepts a number and returns whether or not that number is odd or even
- name: odd-or-even
inputs:
parameters:
- name: num
container:
image: alpine:latest
command: [sh, -xc]
args:
- |
echo {{inputs.parameters.num}} > /tmp/num &&
if [ $(({{inputs.parameters.num}}%2)) -eq 0 ]; then
echo "even" > /tmp/even;
else
echo "odd" > /tmp/even;
fi
outputs:
parameters:
- name: num
valueFrom:
path: /tmp/num
- name: evenness
valueFrom:
path: /tmp/even

- name: pick-evens
inputs:
parameters:
- name: num
container:
image: alpine:latest
command: [sh, -xc]
args:
- echo {{inputs.parameters.num}}
24 changes: 22 additions & 2 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion pkg/apis/workflow/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
NodeTypeSteps NodeType = "Steps"
NodeTypeStepGroup NodeType = "StepGroup"
NodeTypeDAG NodeType = "DAG"
NodeTypeTaskGroup NodeType = "TaskGroup"
NodeTypeRetry NodeType = "Retry"
NodeTypeSkipped NodeType = "Skipped"
NodeTypeSuspend NodeType = "Suspend"
Expand Down Expand Up @@ -312,7 +313,8 @@ type WorkflowStep struct {
// WithItems expands a step into multiple parallel steps from the items in the list
WithItems []Item `json:"withItems,omitempty"`

// WithParam expands a step into from the value in the parameter
// WithParam expands a step into multiple parallel steps from the value in the parameter,
// which is expected to be a JSON list.
WithParam string `json:"withParam,omitempty"`

// When is an expression in which the step should conditionally execute
Expand Down Expand Up @@ -679,6 +681,13 @@ type DAGTask struct {

// Dependencies are name of other targets which this depends on
Dependencies []string `json:"dependencies,omitempty"`

// WithItems expands a task into multiple parallel tasks from the items in the list
WithItems []Item `json:"withItems,omitempty"`

// WithParam expands a task into multiple parallel tasks from the value in the parameter,
// which is expected to be a JSON list.
WithParam string `json:"withParam,omitempty"`
}

// SuspendTemplate is a template subtype to suspend a workflow at a predetermined point in time
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions test/e2e/functional/loops-dag.yaml
1 change: 1 addition & 0 deletions test/e2e/functional/parameter-aggregation-dag.yaml
26 changes: 26 additions & 0 deletions test/e2e/ui/dag-with-params.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-nested-
spec:
entrypoint: diamond
templates:
- name: diamond
dag:
tasks:
- name: A
template: nested-diamond
arguments:
parameters: [{name: message, value: A}]
- name: nested-diamond
inputs:
parameters:
- name: message
dag:
tasks:
- name: A
template: echo
- name: echo
container:
image: alpine:3.7
command: [echo, "hello"]
2 changes: 1 addition & 1 deletion test/e2e/ui/ui-nested-steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
spec:
entrypoint: ui-nested-steps
templates:
- name: nested-steps
- name: ui-nested-steps
steps:
- - name: LOCATE-FACES
template: locate-faces
Expand Down
38 changes: 25 additions & 13 deletions workflow/common/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ func (ctx *wfValidationCtx) validateSteps(scope map[string]interface{}, tmpl *wf
return errors.Errorf(errors.CodeBadRequest, "templates.%s.steps[%d].name '%s' is invalid: %s", tmpl.Name, i, step.Name, strings.Join(errs, ";"))
}
stepNames[step.Name] = true
err := addItemsToScope(&step, scope)
prefix := fmt.Sprintf("steps.%s", step.Name)
err := addItemsToScope(prefix, step.WithItems, step.WithParam, scope)
if err != nil {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.steps[%d].%s %s", tmpl.Name, i, step.Name, err.Error())
}
Expand All @@ -348,19 +349,20 @@ func (ctx *wfValidationCtx) validateSteps(scope map[string]interface{}, tmpl *wf
}
}
for _, step := range stepGroup {
ctx.addOutputsToScope(step.Template, fmt.Sprintf("steps.%s", step.Name), scope)
aggregate := len(step.WithItems) > 0 || step.WithParam != ""
ctx.addOutputsToScope(step.Template, fmt.Sprintf("steps.%s", step.Name), scope, aggregate)
}
}
return nil
}

func addItemsToScope(step *wfv1.WorkflowStep, scope map[string]interface{}) error {
if len(step.WithItems) > 0 && step.WithParam != "" {
func addItemsToScope(prefix string, withItems []wfv1.Item, withParam string, scope map[string]interface{}) error {
if len(withItems) > 0 && withParam != "" {
return fmt.Errorf("only one of withItems or withParam can be specified")
}
if len(step.WithItems) > 0 {
for i := range step.WithItems {
switch val := step.WithItems[i].Value.(type) {
if len(withItems) > 0 {
for i := range withItems {
switch val := withItems[i].Value.(type) {
case string, int32, int64, float32, float64, bool:
scope["item"] = true
case map[string]interface{}:
Expand All @@ -371,17 +373,16 @@ func addItemsToScope(step *wfv1.WorkflowStep, scope map[string]interface{}) erro
return fmt.Errorf("unsupported withItems type: %v", val)
}
}
} else if step.WithParam != "" {
} else if withParam != "" {
scope["item"] = true
// 'item.*' is magic placeholder value which resolveAllVariables() will look for
// when considering if all variables are resolveable.
scope[anyItemMagicValue] = true
}
scope[fmt.Sprintf("steps.%s.outputs.parameters", step.Name)] = true
return nil
}

func (ctx *wfValidationCtx) addOutputsToScope(templateName string, prefix string, scope map[string]interface{}) {
func (ctx *wfValidationCtx) addOutputsToScope(templateName string, prefix string, scope map[string]interface{}, aggregate bool) {
tmpl := ctx.wf.GetTemplate(templateName)
if tmpl.Daemon != nil && *tmpl.Daemon {
scope[fmt.Sprintf("%s.ip", prefix)] = true
Expand All @@ -403,6 +404,9 @@ func (ctx *wfValidationCtx) addOutputsToScope(templateName string, prefix string
scope[fmt.Sprintf("workflow.outputs.artifacts.%s", art.GlobalName)] = true
}
}
if aggregate {
scope[fmt.Sprintf("%s.outputs.parameters", prefix)] = true
}
}

func validateOutputs(scope map[string]interface{}, tmpl *wfv1.Template) error {
Expand Down Expand Up @@ -591,8 +595,9 @@ func (ctx *wfValidationCtx) validateDAG(scope map[string]interface{}, tmpl *wfv1
}

for _, task := range tmpl.DAG.Tasks {
// add all tasks outputs to scope so that DAGs can have outputs
ctx.addOutputsToScope(task.Template, fmt.Sprintf("tasks.%s", task.Name), scope)
// add all tasks outputs to scope so that a nested DAGs can have outputs
prefix := fmt.Sprintf("tasks.%s", task.Name)
ctx.addOutputsToScope(task.Template, prefix, scope, false)

taskBytes, err := json.Marshal(task)
if err != nil {
Expand All @@ -604,7 +609,14 @@ func (ctx *wfValidationCtx) validateDAG(scope map[string]interface{}, tmpl *wfv1
}
ancestry := GetTaskAncestry(task.Name, tmpl.DAG.Tasks)
for _, ancestor := range ancestry {
ctx.addOutputsToScope(nameToTask[ancestor].Template, fmt.Sprintf("tasks.%s", ancestor), taskScope)
ancestorTask := nameToTask[ancestor]
ancestorPrefix := fmt.Sprintf("tasks.%s", ancestor)
aggregate := len(ancestorTask.WithItems) > 0 || ancestorTask.WithParam != ""
ctx.addOutputsToScope(ancestorTask.Template, ancestorPrefix, taskScope, aggregate)
}
err = addItemsToScope(prefix, task.WithItems, task.WithParam, taskScope)
if err != nil {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks.%s %s", tmpl.Name, task.Name, err.Error())
}
err = resolveAllVariables(taskScope, string(taskBytes))
if err != nil {
Expand Down
Loading

0 comments on commit d07c1d2

Please sign in to comment.