Skip to content

Commit

Permalink
Added status of previous steps as variables (argoproj#1681)
Browse files Browse the repository at this point in the history
  • Loading branch information
simster7 authored and sarabala1979 committed Oct 18, 2019
1 parent ad3dd4d commit 09a6cb4
Show file tree
Hide file tree
Showing 7 changed files with 454 additions and 12 deletions.
14 changes: 8 additions & 6 deletions docs/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ The following variables are made available to reference various metadata of a wo
| Variable | Description|
|----------|------------|
| `steps.<STEPNAME>.ip` | IP address of a previous daemon container step |
| `steps.<STEPNAME>.outputs.result` | Output result of a previous script step |
| `steps.<STEPNAME>.outputs.parameters.<NAME>` | Output parameter of a previous step |
| `steps.<STEPNAME>.outputs.artifacts.<NAME>` | Output artifact of a previous step |
| `steps.<STEPNAME>.status` | Phase status of any previous script step |
| `steps.<STEPNAME>.outputs.result` | Output result of any previous script step |
| `steps.<STEPNAME>.outputs.parameters.<NAME>` | Output parameter of any previous step |
| `steps.<STEPNAME>.outputs.artifacts.<NAME>` | Output artifact of any previous step |

## DAG Templates:
| Variable | Description|
|----------|------------|
| `tasks.<TASKNAME>.ip` | IP address of a previous daemon container task |
| `tasks.<TASKNAME>.outputs.result` | Output result of a previous script task |
| `tasks.<TASKNAME>.outputs.parameters.<NAME>` | Output parameter of a previous task |
| `tasks.<TASKNAME>.outputs.artifacts.<NAME>` | Output artifact of a previous task |
| `tasks.<STEPNAME>.status` | Phase status of any previous task step |
| `tasks.<TASKNAME>.outputs.result` | Output result of any previous script task |
| `tasks.<TASKNAME>.outputs.parameters.<NAME>` | Output parameter of any previous task |
| `tasks.<TASKNAME>.outputs.artifacts.<NAME>` | Output artifact of any previous task |

## Container/Script Templates:
| Variable | Description|
Expand Down
41 changes: 41 additions & 0 deletions examples/status-reference.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# The status reference example combines the use of a status result,
# along with conditionals, to take a dynamic path in the
# workflow. In this example, depending on the status of 'flakey-container'
# the template will either run the 'succeeded' step or the 'failed' step.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: status-reference-
spec:
entrypoint: status-reference
templates:
- name: status-reference
steps:
- - name: flakey-container
template: flakey-container
continueOn:
failed: true
- - name: failed
template: failed
when: "{{steps.flakey-container.status}} == Failed"
- name: succeeded
template: succeeded
when: "{{steps.flakey-container.status}} == Succeeded"

- name: flakey-container
script:
image: alpine:3.6
command: [sh, -c]
args: ["exit 1"]

- name: failed
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"the flakey container failed\""]

- name: succeeded
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"the flakey container passed\""]
4 changes: 4 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,10 @@ func (woc *wfOperationCtx) processNodeOutputs(scope *wfScope, prefix string, nod
key := fmt.Sprintf("%s.ip", prefix)
scope.addParamToScope(key, node.PodIP)
}
if node.Phase != "" {
key := fmt.Sprintf("%s.status", prefix)
scope.addParamToScope(key, string(node.Phase))
}
woc.addOutputsToScope(prefix, node.Outputs, scope)
}

Expand Down
56 changes: 55 additions & 1 deletion workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"strings"
"testing"

"sigs.k8s.io/yaml"
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/test"
Expand Down Expand Up @@ -1102,6 +1102,60 @@ func TestResolvePlaceholdersInOutputValues(t *testing.T) {
assert.Equal(t, "output-value-placeholders-wf", *parameterValue)
}

var outputStatuses = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: scripts-bash-
spec:
entrypoint: bash-script-example
templates:
- name: bash-script-example
steps:
- - name: first
template: flakey-container
continueOn:
failed: true
- - name: print
template: print-message
arguments:
parameters:
- name: message
value: "{{steps.first.status}}"
- name: flakey-container
script:
image: busybox
command: [sh, -c]
args: ["exit 0"]
- name: print-message
inputs:
parameters:
- name: message
container:
image: alpine:latest
command: [sh, -c]
args: ["echo result was: {{inputs.parameters.message}}"]
`

func TestResolveStatuses(t *testing.T) {

controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

// operate the workflow. it should create a pod.
wf := unmarshalWF(outputStatuses)
wf, err := wfcset.Create(wf)
assert.Nil(t, err)
jsonValue, err := json.Marshal(&wf.Spec.Templates[0])
assert.NoError(t, err)

assert.Contains(t, string(jsonValue), "{{steps.first.status}}")
assert.NotContains(t, string(jsonValue), "{{steps.print.status}}")
}

var resourceTemplate = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down
14 changes: 9 additions & 5 deletions workflow/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ func (ctx *templateValidationCtx) validateSteps(scope map[string]interface{}, tm
}
stepNames[step.Name] = true
prefix := fmt.Sprintf("steps.%s", step.Name)
scope[fmt.Sprintf("%s.status", prefix)] = true
err := addItemsToScope(prefix, step.WithItems, step.WithParam, step.WithSequence, scope)
if err != nil {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.steps[%d].%s %s", tmpl.Name, i, step.Name, err.Error())
Expand All @@ -581,7 +582,7 @@ func (ctx *templateValidationCtx) validateSteps(scope map[string]interface{}, tm
for i, step := range stepGroup {
aggregate := len(step.WithItems) > 0 || step.WithParam != ""
resolvedTmpl := resolvedTemplates[step.Name]
ctx.addOutputsToScope(resolvedTmpl, fmt.Sprintf("steps.%s", step.Name), scope, aggregate)
ctx.addOutputsToScope(resolvedTmpl, fmt.Sprintf("steps.%s", step.Name), scope, aggregate, false)

// Validate the template again with actual arguments.
_, err = ctx.validateTemplateHolder(&step, tmplCtx, &step.Arguments, scope)
Expand Down Expand Up @@ -634,7 +635,7 @@ func addItemsToScope(prefix string, withItems []wfv1.Item, withParam string, wit
return nil
}

func (ctx *templateValidationCtx) addOutputsToScope(tmpl *wfv1.Template, prefix string, scope map[string]interface{}, aggregate bool) {
func (ctx *templateValidationCtx) addOutputsToScope(tmpl *wfv1.Template, prefix string, scope map[string]interface{}, aggregate bool, isAncestor bool) {
if tmpl.Daemon != nil && *tmpl.Daemon {
scope[fmt.Sprintf("%s.ip", prefix)] = true
}
Expand Down Expand Up @@ -665,6 +666,9 @@ func (ctx *templateValidationCtx) addOutputsToScope(tmpl *wfv1.Template, prefix
scope[fmt.Sprintf("%s.outputs.parameters", prefix)] = true
}
}
if isAncestor {
scope[fmt.Sprintf("%s.status", prefix)] = true
}
}

func validateOutputs(scope map[string]interface{}, tmpl *wfv1.Template) error {
Expand Down Expand Up @@ -883,7 +887,7 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl
return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks.%s %s", tmpl.Name, task.Name, err.Error())
}
prefix := fmt.Sprintf("tasks.%s", task.Name)
ctx.addOutputsToScope(resolvedTmpl, prefix, scope, false)
ctx.addOutputsToScope(resolvedTmpl, prefix, scope, false, false)
resolvedTemplates[task.Name] = resolvedTmpl
dupDependencies := make(map[string]bool)
for j, depName := range task.Dependencies {
Expand Down Expand Up @@ -917,7 +921,7 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl
resolvedTmpl := resolvedTemplates[task.Name]
// add all tasks outputs to scope so that a nested DAGs can have outputs
prefix := fmt.Sprintf("tasks.%s", task.Name)
ctx.addOutputsToScope(resolvedTmpl, prefix, scope, false)
ctx.addOutputsToScope(resolvedTmpl, prefix, scope, false, false)
taskBytes, err := json.Marshal(task)
if err != nil {
return errors.InternalWrapError(err)
Expand All @@ -932,7 +936,7 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl
resolvedTmpl := resolvedTemplates[ancestor]
ancestorPrefix := fmt.Sprintf("tasks.%s", ancestor)
aggregate := len(ancestorTask.WithItems) > 0 || ancestorTask.WithParam != ""
ctx.addOutputsToScope(resolvedTmpl, ancestorPrefix, taskScope, aggregate)
ctx.addOutputsToScope(resolvedTmpl, ancestorPrefix, taskScope, aggregate, true)
}
err = addItemsToScope(prefix, task.WithItems, task.WithParam, task.WithSequence, taskScope)
if err != nil {
Expand Down
Loading

0 comments on commit 09a6cb4

Please sign in to comment.