From 09a6cb4e81c1d9f5c8c082c9e96ce783fa20796f Mon Sep 17 00:00:00 2001 From: Simon Behar Date: Fri, 18 Oct 2019 10:02:46 -0700 Subject: [PATCH] Added status of previous steps as variables (#1681) --- docs/variables.md | 14 +- examples/status-reference.yaml | 41 ++++ workflow/controller/operator.go | 4 + workflow/controller/operator_test.go | 56 +++++- workflow/validate/validate.go | 14 +- workflow/validate/validate_dag_test.go | 256 +++++++++++++++++++++++++ workflow/validate/validate_test.go | 81 ++++++++ 7 files changed, 454 insertions(+), 12 deletions(-) create mode 100644 examples/status-reference.yaml diff --git a/docs/variables.md b/docs/variables.md index b8127834b165..1018e5f4107f 100644 --- a/docs/variables.md +++ b/docs/variables.md @@ -13,17 +13,19 @@ The following variables are made available to reference various metadata of a wo | Variable | Description| |----------|------------| | `steps..ip` | IP address of a previous daemon container step | -| `steps..outputs.result` | Output result of a previous script step | -| `steps..outputs.parameters.` | Output parameter of a previous step | -| `steps..outputs.artifacts.` | Output artifact of a previous step | +| `steps..status` | Phase status of any previous script step | +| `steps..outputs.result` | Output result of any previous script step | +| `steps..outputs.parameters.` | Output parameter of any previous step | +| `steps..outputs.artifacts.` | Output artifact of any previous step | ## DAG Templates: | Variable | Description| |----------|------------| | `tasks..ip` | IP address of a previous daemon container task | -| `tasks..outputs.result` | Output result of a previous script task | -| `tasks..outputs.parameters.` | Output parameter of a previous task | -| `tasks..outputs.artifacts.` | Output artifact of a previous task | +| `tasks..status` | Phase status of any previous task step | +| `tasks..outputs.result` | Output result of any previous script task | +| `tasks..outputs.parameters.` | Output parameter of any previous task | +| `tasks..outputs.artifacts.` | Output artifact of any previous task | ## Container/Script Templates: | Variable | Description| diff --git a/examples/status-reference.yaml b/examples/status-reference.yaml new file mode 100644 index 000000000000..e02ebe399931 --- /dev/null +++ b/examples/status-reference.yaml @@ -0,0 +1,41 @@ +# The status reference example combines the use of a status result, +# along with conditionals, to take a dynamic path in the +# workflow. In this example, depending on the status of 'flakey-container' +# the template will either run the 'succeeded' step or the 'failed' step. +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: status-reference- +spec: + entrypoint: status-reference + templates: + - name: status-reference + steps: + - - name: flakey-container + template: flakey-container + continueOn: + failed: true + - - name: failed + template: failed + when: "{{steps.flakey-container.status}} == Failed" + - name: succeeded + template: succeeded + when: "{{steps.flakey-container.status}} == Succeeded" + + - name: flakey-container + script: + image: alpine:3.6 + command: [sh, -c] + args: ["exit 1"] + + - name: failed + container: + image: alpine:3.6 + command: [sh, -c] + args: ["echo \"the flakey container failed\""] + + - name: succeeded + container: + image: alpine:3.6 + command: [sh, -c] + args: ["echo \"the flakey container passed\""] diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index c6772a8706da..3df0829bd120 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1602,6 +1602,10 @@ func (woc *wfOperationCtx) processNodeOutputs(scope *wfScope, prefix string, nod key := fmt.Sprintf("%s.ip", prefix) scope.addParamToScope(key, node.PodIP) } + if node.Phase != "" { + key := fmt.Sprintf("%s.status", prefix) + scope.addParamToScope(key, string(node.Phase)) + } woc.addOutputsToScope(prefix, node.Outputs, scope) } diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index d47512dff796..ebaa83752a02 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -6,11 +6,11 @@ import ( "strings" "testing" - "sigs.k8s.io/yaml" "github.com/stretchr/testify/assert" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/yaml" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/test" @@ -1102,6 +1102,60 @@ func TestResolvePlaceholdersInOutputValues(t *testing.T) { assert.Equal(t, "output-value-placeholders-wf", *parameterValue) } +var outputStatuses = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: scripts-bash- +spec: + entrypoint: bash-script-example + templates: + - name: bash-script-example + steps: + - - name: first + template: flakey-container + continueOn: + failed: true + - - name: print + template: print-message + arguments: + parameters: + - name: message + value: "{{steps.first.status}}" + + + - name: flakey-container + script: + image: busybox + command: [sh, -c] + args: ["exit 0"] + + - name: print-message + inputs: + parameters: + - name: message + container: + image: alpine:latest + command: [sh, -c] + args: ["echo result was: {{inputs.parameters.message}}"] +` + +func TestResolveStatuses(t *testing.T) { + + controller := newController() + wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("") + + // operate the workflow. it should create a pod. + wf := unmarshalWF(outputStatuses) + wf, err := wfcset.Create(wf) + assert.Nil(t, err) + jsonValue, err := json.Marshal(&wf.Spec.Templates[0]) + assert.NoError(t, err) + + assert.Contains(t, string(jsonValue), "{{steps.first.status}}") + assert.NotContains(t, string(jsonValue), "{{steps.print.status}}") +} + var resourceTemplate = ` apiVersion: argoproj.io/v1alpha1 kind: Workflow diff --git a/workflow/validate/validate.go b/workflow/validate/validate.go index 2a7034feed49..b70f66a863c0 100644 --- a/workflow/validate/validate.go +++ b/workflow/validate/validate.go @@ -556,6 +556,7 @@ func (ctx *templateValidationCtx) validateSteps(scope map[string]interface{}, tm } stepNames[step.Name] = true prefix := fmt.Sprintf("steps.%s", step.Name) + scope[fmt.Sprintf("%s.status", prefix)] = true err := addItemsToScope(prefix, step.WithItems, step.WithParam, step.WithSequence, scope) if err != nil { return errors.Errorf(errors.CodeBadRequest, "templates.%s.steps[%d].%s %s", tmpl.Name, i, step.Name, err.Error()) @@ -581,7 +582,7 @@ func (ctx *templateValidationCtx) validateSteps(scope map[string]interface{}, tm for i, step := range stepGroup { aggregate := len(step.WithItems) > 0 || step.WithParam != "" resolvedTmpl := resolvedTemplates[step.Name] - ctx.addOutputsToScope(resolvedTmpl, fmt.Sprintf("steps.%s", step.Name), scope, aggregate) + ctx.addOutputsToScope(resolvedTmpl, fmt.Sprintf("steps.%s", step.Name), scope, aggregate, false) // Validate the template again with actual arguments. _, err = ctx.validateTemplateHolder(&step, tmplCtx, &step.Arguments, scope) @@ -634,7 +635,7 @@ func addItemsToScope(prefix string, withItems []wfv1.Item, withParam string, wit return nil } -func (ctx *templateValidationCtx) addOutputsToScope(tmpl *wfv1.Template, prefix string, scope map[string]interface{}, aggregate bool) { +func (ctx *templateValidationCtx) addOutputsToScope(tmpl *wfv1.Template, prefix string, scope map[string]interface{}, aggregate bool, isAncestor bool) { if tmpl.Daemon != nil && *tmpl.Daemon { scope[fmt.Sprintf("%s.ip", prefix)] = true } @@ -665,6 +666,9 @@ func (ctx *templateValidationCtx) addOutputsToScope(tmpl *wfv1.Template, prefix scope[fmt.Sprintf("%s.outputs.parameters", prefix)] = true } } + if isAncestor { + scope[fmt.Sprintf("%s.status", prefix)] = true + } } func validateOutputs(scope map[string]interface{}, tmpl *wfv1.Template) error { @@ -883,7 +887,7 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks.%s %s", tmpl.Name, task.Name, err.Error()) } prefix := fmt.Sprintf("tasks.%s", task.Name) - ctx.addOutputsToScope(resolvedTmpl, prefix, scope, false) + ctx.addOutputsToScope(resolvedTmpl, prefix, scope, false, false) resolvedTemplates[task.Name] = resolvedTmpl dupDependencies := make(map[string]bool) for j, depName := range task.Dependencies { @@ -917,7 +921,7 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl resolvedTmpl := resolvedTemplates[task.Name] // add all tasks outputs to scope so that a nested DAGs can have outputs prefix := fmt.Sprintf("tasks.%s", task.Name) - ctx.addOutputsToScope(resolvedTmpl, prefix, scope, false) + ctx.addOutputsToScope(resolvedTmpl, prefix, scope, false, false) taskBytes, err := json.Marshal(task) if err != nil { return errors.InternalWrapError(err) @@ -932,7 +936,7 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl resolvedTmpl := resolvedTemplates[ancestor] ancestorPrefix := fmt.Sprintf("tasks.%s", ancestor) aggregate := len(ancestorTask.WithItems) > 0 || ancestorTask.WithParam != "" - ctx.addOutputsToScope(resolvedTmpl, ancestorPrefix, taskScope, aggregate) + ctx.addOutputsToScope(resolvedTmpl, ancestorPrefix, taskScope, aggregate, true) } err = addItemsToScope(prefix, task.WithItems, task.WithParam, task.WithSequence, taskScope) if err != nil { diff --git a/workflow/validate/validate_dag_test.go b/workflow/validate/validate_dag_test.go index 1beb9c60bd32..6507125ba5d7 100644 --- a/workflow/validate/validate_dag_test.go +++ b/workflow/validate/validate_dag_test.go @@ -290,6 +290,262 @@ func TestDAGArtifactResolution(t *testing.T) { assert.Nil(t, err) } +var dagStatusReference = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: dag-arg-passing- +spec: + entrypoint: dag-arg-passing + templates: + - name: echo + inputs: + parameters: + - name: message + container: + image: alpine:3.7 + command: [echo, "{{inputs.parameters.message}}"] + + - name: dag-arg-passing + dag: + tasks: + - name: A + template: echo + continueOn: + failed: true + arguments: + parameters: + - name: message + value: "Hello!" + - name: B + dependencies: [A] + template: echo + arguments: + parameters: + - name: message + value: "{{tasks.A.status}}" +` + +var dagStatusNoFutureReferenceSimple = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: dag-arg-passing- +spec: + entrypoint: dag-arg-passing + templates: + - name: echo + inputs: + parameters: + - name: message + container: + image: alpine:3.7 + command: [echo, "{{inputs.parameters.message}}"] + + - name: dag-arg-passing + dag: + tasks: + - name: A + template: echo + continueOn: + failed: true + arguments: + parameters: + - name: message + value: "{{tasks.B.status}}" + - name: B + dependencies: [A] + template: echo + arguments: + parameters: + - name: message + value: "{{tasks.A.status}}" +` + +var dagStatusNoFutureReferenceWhenFutureReferenceHasChild = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: dag-arg-passing- +spec: + entrypoint: dag-arg-passing + templates: + - name: echo + inputs: + parameters: + - name: message + container: + image: alpine:3.7 + command: [echo, "{{inputs.parameters.message}}"] + + - name: dag-arg-passing + dag: + tasks: + - name: A + template: echo + continueOn: + failed: true + arguments: + parameters: + - name: message + value: "{{tasks.B.status}}" + - name: B + dependencies: [A] + template: echo + arguments: + parameters: + - name: message + value: "{{tasks.A.status}}" + - name: C + dependencies: [B] + template: echo + arguments: + parameters: + - name: message + value: "{{tasks.B.status}}" +` + +var dagStatusPastReferenceChain = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: dag-arg-passing- +spec: + entrypoint: dag-arg-passing + templates: + - name: echo + inputs: + parameters: + - name: message + container: + image: alpine:3.7 + command: [echo, "{{inputs.parameters.message}}"] + + - name: dag-arg-passing + dag: + tasks: + - name: A + template: echo + continueOn: + failed: true + arguments: + parameters: + - name: message + value: "Hello" + - name: B + dependencies: [A] + template: echo + arguments: + parameters: + - name: message + value: "{{tasks.A.status}}" + - name: C + dependencies: [B] + template: echo + arguments: + parameters: + - name: message + value: "{{tasks.B.status}}" + - name: D + dependencies: [A] + template: echo + arguments: + parameters: + - name: message + value: "{{tasks.A.status}}" + - name: E + dependencies: [D] + template: echo + arguments: + parameters: + - name: message + value: "{{tasks.D.status}}" +` + +var dagStatusOnlyDirectAncestors = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: dag-arg-passing- +spec: + entrypoint: dag-arg-passing + templates: + - name: echo + inputs: + parameters: + - name: message + container: + image: alpine:3.7 + command: [echo, "{{inputs.parameters.message}}"] + + - name: dag-arg-passing + dag: + tasks: + - name: A + template: echo + continueOn: + failed: true + arguments: + parameters: + - name: message + value: "Hello" + - name: B + dependencies: [A] + template: echo + arguments: + parameters: + - name: message + value: "{{tasks.A.status}}" + - name: C + dependencies: [B] + template: echo + arguments: + parameters: + - name: message + value: "{{tasks.B.status}}" + - name: D + dependencies: [A] + template: echo + arguments: + parameters: + - name: message + value: "{{tasks.A.status}}" + - name: E + dependencies: [D] + template: echo + arguments: + parameters: + - name: message + value: "{{tasks.B.status}}" +` + +func TestDAGStatusReference(t *testing.T) { + err := validate(dagStatusReference) + assert.Nil(t, err) + + err = validate(dagStatusNoFutureReferenceSimple) + // Can't reference the status of steps that have not run yet + if assert.NotNil(t, err) { + assert.Contains(t, err.Error(), "failed to resolve {{tasks.B.status}}") + } + + err = validate(dagStatusNoFutureReferenceWhenFutureReferenceHasChild) + // Can't reference the status of steps that have not run yet, even if the referenced steps have children + if assert.NotNil(t, err) { + assert.Contains(t, err.Error(), "failed to resolve {{tasks.B.status}}") + } + + err = validate(dagStatusPastReferenceChain) + assert.Nil(t, err) + + err = validate(dagStatusOnlyDirectAncestors) + // Can't reference steps that are not direct ancestors of node + // Here Node E references the status of Node B, even though it is not its descendent + if assert.NotNil(t, err) { + assert.Contains(t, err.Error(), "failed to resolve {{tasks.B.status}}") + } +} + var dagNonexistantTarget = ` apiVersion: argoproj.io/v1alpha1 kind: Workflow diff --git a/workflow/validate/validate_test.go b/workflow/validate/validate_test.go index 8b6f36b1f534..98830bb88b96 100644 --- a/workflow/validate/validate_test.go +++ b/workflow/validate/validate_test.go @@ -302,6 +302,87 @@ func TestStepOutputReference(t *testing.T) { assert.Nil(t, err) } + +var stepStatusReferences = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: status-ref- +spec: + entrypoint: statusref + templates: + - name: statusref + steps: + - - name: one + template: say + arguments: + parameters: + - name: message + value: "Hello, world" + - - name: two + template: say + arguments: + parameters: + - name: message + value: "{{steps.one.status}}" + - name: say + inputs: + parameters: + - name: message + value: "value" + container: + image: alpine:latest + command: [sh, -c] + args: ["echo {{inputs.parameters.message}}"] +` + +func TestStepStatusReference(t *testing.T) { + err := validate(stepStatusReferences) + assert.Nil(t, err) +} + + +var stepStatusReferencesNoFutureReference = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: status-ref- +spec: + entrypoint: statusref + templates: + - name: statusref + steps: + - - name: one + template: say + arguments: + parameters: + - name: message + value: "{{steps.two.status}}" + - - name: two + template: say + arguments: + parameters: + - name: message + value: "{{steps.one.status}}" + - name: say + inputs: + parameters: + - name: message + value: "value" + container: + image: alpine:latest + command: [sh, -c] + args: ["echo {{inputs.parameters.message}}"] +` + +func TestStepStatusReferenceNoFutureReference(t *testing.T) { + err := validate(stepStatusReferencesNoFutureReference) + // Can't reference the status of steps that have not run yet + if assert.NotNil(t, err) { + assert.Contains(t, err.Error(), "failed to resolve {{steps.two.status}}") + } +} + var stepArtReferences = ` apiVersion: argoproj.io/v1alpha1 kind: Workflow