Skip to content

Commit

Permalink
Implement support for DAG templates to have output parameters/artifacts
Browse files Browse the repository at this point in the history
  • Loading branch information
jessesuen committed Feb 9, 2018
1 parent 989e8ed commit 62a3fba
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 0 deletions.
1 change: 1 addition & 0 deletions test/e2e/functional/dag-argument-passing.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Test to ensure parameters and artifacts can be passed from an ancestor
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
Expand Down
98 changes: 98 additions & 0 deletions test/e2e/functional/dag-outputs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Test to verify parameters and artifacts can be passed into and out of nested dag
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: nested-dag-outputs-
spec:
entrypoint: nested-dag-outputs
templates:
- name: nested-dag-outputs
dag:
tasks:
- name: generate
template: generate
- name: nested-wf
dependencies: [generate]
template: nested-wf
arguments:
parameters:
- name: nested-in-parameter
value: "{{tasks.generate.outputs.parameters.out-parameter}}"
artifacts:
- name: nested-in-artifact
from: "{{tasks.generate.outputs.artifacts.out-artifact}}"
- name: consume
dependencies: [nested-wf]
template: consume
arguments:
parameters:
- name: in-parameter
value: "{{tasks.nested-wf.outputs.parameters.nested-out-parameter}}"
artifacts:
- name: in-artifact
from: "{{tasks.nested-wf.outputs.artifacts.nested-out-artifact}}"

# container template which generates an output parameter and artifact
- name: generate
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["
cowsay hello world | tee /tmp/my-output-artifact.txt &&
echo 'my-output-parameter' > /tmp/my-output-parameter.txt
"]
outputs:
parameters:
- name: out-parameter
valueFrom:
path: /tmp/my-output-parameter.txt
artifacts:
- name: out-artifact
path: /tmp/my-output-artifact.txt
# dag template which consumes an input artifact and also produces
# its own output parameters and artifacts taken from one of its tasks.
- name: nested-wf
inputs:
parameters:
- name: nested-in-parameter
artifacts:
- name: nested-in-artifact
dag:
tasks:
- name: consume
template: consume
arguments:
parameters:
- name: in-parameter
value: "{{inputs.parameters.nested-in-parameter}}"
artifacts:
- name: in-artifact
from: "{{inputs.artifacts.nested-in-artifact}}"
- name: generate
template: generate
outputs:
parameters:
- name: nested-out-parameter
valueFrom:
parameter: "{{tasks.generate.outputs.parameters.out-parameter}}"
artifacts:
- name: nested-out-artifact
from: "{{tasks.generate.outputs.artifacts.out-artifact}}"

# container template which consumes an input parameter and artifact
- name: consume
inputs:
parameters:
- name: in-parameter
artifacts:
- name: in-artifact
path: /tmp/art
container:
image: alpine:3.7
command: [sh, -c]
args: ["
echo 'input parameter value: {{inputs.parameters.in-parameter}}' &&
echo 'input artifact contents:' &&
cat /tmp/art
"]
3 changes: 3 additions & 0 deletions workflow/common/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,9 @@ func (ctx *wfValidationCtx) validateDAG(scope map[string]interface{}, tmpl *wfv1
}

for _, task := range tmpl.DAG.Tasks {
// add all tasks outputs to scope so that DAGs can have outputs
ctx.addOutputsToScope(task.Template, fmt.Sprintf("tasks.%s", task.Name), scope)

taskBytes, err := json.Marshal(task)
if err != nil {
return errors.InternalWrapError(err)
Expand Down
18 changes: 18 additions & 0 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,24 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmpl *wfv1.Template, boun
return woc.markNodePhase(nodeName, dagPhase)
}

// set outputs from tasks in order for DAG templates to support outputs
scope := wfScope{
tmpl: tmpl,
scope: make(map[string]interface{}),
}
for _, task := range tmpl.DAG.Tasks {
scope.addNodeOutputsToScope(fmt.Sprintf("tasks.%s", task.Name), dagCtx.getTaskNode(task.Name))
}
outputs, err := getTemplateOutputsFromScope(tmpl, &scope)
if err != nil {
return woc.markNodeError(nodeName, err)
}
if outputs != nil {
node = woc.getNodeByName(nodeName)
node.Outputs = outputs
woc.wf.Status.Nodes[node.ID] = *node
}

// set the outbound nodes from the target tasks
node = woc.getNodeByName(nodeName)
outbound := make([]string, 0)
Expand Down

0 comments on commit 62a3fba

Please sign in to comment.