Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(controller): allow to use global params with withParam #2757

Merged
merged 12 commits into from
Apr 23, 2020
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ require (
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/soheilhy/cmux v0.1.4
github.com/spf13/cobra v0.0.4-0.20181021141114-fe5e611709b0
github.com/stretchr/testify v1.4.0
github.com/stretchr/testify v1.5.1
github.com/tidwall/gjson v1.3.5
github.com/valyala/fasthttp v0.0.0-20171207120941-e5f51c11919d // indirect
github.com/valyala/fasttemplate v1.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/tidwall/gjson v1.3.5 h1:2oW9FBNu8qt9jy5URgrzsVx/T/KSn3qn/smJQ0crlDQ=
github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
Expand Down
24 changes: 24 additions & 0 deletions workflow/common/params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package common

// Parameters extends string map with useful methods.
type Parameters map[string]string

// Merge merges given parameteres.
func (ps Parameters) Merge(args ...Parameters) Parameters {
newParams := ps.DeepCopy()
for _, params := range args {
for k, v := range params {
newParams[k] = v
}
}
return newParams
}

// DeepCopy returns a new instance which has the same parameters as the receiver.
func (ps Parameters) DeepCopy() Parameters {
newParams := make(Parameters)
for k, v := range ps {
newParams[k] = v
}
return newParams
}
23 changes: 23 additions & 0 deletions workflow/common/params_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package common

import (
"testing"

"github.com/stretchr/testify/assert"
)

// TestParamsMerge ensures Merge of Parameters works correctly.
func TestParamsMerge(t *testing.T) {
params := Parameters{"foo": "1"}
newParams := params.Merge(Parameters{"foo": "2", "bar": "1"}, Parameters{"wow": "1"})
assert.Equal(t, Parameters{"foo": "2", "bar": "1", "wow": "1"}, newParams)
assert.NotSame(t, &params, &newParams)
}

// TestParamsClone ensures Clone of Parameters works correctly.
func TestParamsClone(t *testing.T) {
params := Parameters{"foo": "1"}
newParams := params.DeepCopy()
assert.Equal(t, params, newParams)
assert.NotSame(t, &params, &newParams)
}
12 changes: 3 additions & 9 deletions workflow/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func GetExecutorOutput(exec remotecommand.Executor) (*bytes.Buffer, *bytes.Buffe
// * parameters in the template from the arguments
// * global parameters (e.g. {{workflow.parameters.XX}}, {{workflow.name}}, {{workflow.status}})
// * local parameters (e.g. {{pod.name}})
func ProcessArgs(tmpl *wfv1.Template, args wfv1.ArgumentsProvider, globalParams, localParams map[string]string, validateOnly bool) (*wfv1.Template, error) {
func ProcessArgs(tmpl *wfv1.Template, args wfv1.ArgumentsProvider, globalParams, localParams Parameters, validateOnly bool) (*wfv1.Template, error) {
// For each input parameter:
// 1) check if was supplied as argument. if so use the supplied value from arg
// 2) if not, use default value.
Expand Down Expand Up @@ -300,19 +300,13 @@ func ProcessArgs(tmpl *wfv1.Template, args wfv1.ArgumentsProvider, globalParams,
}

// SubstituteParams returns a new copy of the template with global, pod, and input parameters substituted
func SubstituteParams(tmpl *wfv1.Template, globalParams, localParams map[string]string) (*wfv1.Template, error) {
func SubstituteParams(tmpl *wfv1.Template, globalParams, localParams Parameters) (*wfv1.Template, error) {
tmplBytes, err := json.Marshal(tmpl)
if err != nil {
return nil, errors.InternalWrapError(err)
}
// First replace globals & locals, then replace inputs because globals could be referenced in the inputs
replaceMap := make(map[string]string)
for k, v := range globalParams {
replaceMap[k] = v
}
for k, v := range localParams {
replaceMap[k] = v
}
replaceMap := globalParams.Merge(localParams)
fstTmpl := fasttemplate.New(string(tmplBytes), "{{", "}}")
globalReplacedTmplStr, err := Replace(fstTmpl, replaceMap, true)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func (woc *wfOperationCtx) resolveDependencyReferences(dagCtx *dagContext, task

// Perform replacement
// Replace woc.volumes
err = woc.substituteParamsInVolumes(scope.replaceMap())
err = woc.substituteParamsInVolumes(scope.getParameters())
if err != nil {
return nil, err
}
Expand All @@ -543,7 +543,8 @@ func (woc *wfOperationCtx) resolveDependencyReferences(dagCtx *dagContext, task
return nil, errors.InternalWrapError(err)
}
fstTmpl := fasttemplate.New(string(taskBytes), "{{", "}}")
newTaskStr, err := common.Replace(fstTmpl, scope.replaceMap(), true)

newTaskStr, err := common.Replace(fstTmpl, woc.globalParams.Merge(scope.getParameters()), true)
if err != nil {
return nil, err
}
Expand Down
46 changes: 46 additions & 0 deletions workflow/controller/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,3 +607,49 @@ func TestDagAssessPhaseContinueOnExpandedTask(t *testing.T) {
woc.operate()
assert.Equal(t, wfv1.NodeSucceeded, woc.wf.Status.Phase)
}

var dagWithParamAndGlobalParam = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-with-param-and-global-param-
spec:
entrypoint: main
arguments:
parameters:
- name: workspace
value: /argo_workspace/{{workflow.uid}}
templates:
- name: main
dag:
tasks:
- name: use-with-param
template: whalesay
arguments:
parameters:
- name: message
value: "hello {{workflow.parameters.workspace}} {{item}}"
withParam: "[0, 1, 2]"
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
`

func TestDAGWithParamAndGlobalParam(t *testing.T) {
cancel, controller := newController()
defer cancel()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

wf := unmarshalWF(dagWithParamAndGlobalParam)
wf, err := wfcset.Create(wf)
assert.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)

woc.operate()
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Phase)
}
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type wfOperationCtx struct {
controller *WorkflowController
// globalParams holds any parameters that are available to be referenced
// in the global scope (e.g. workflow.parameters.XXX).
globalParams map[string]string
globalParams common.Parameters
// volumes holds a DeepCopy of wf.Spec.Volumes to perform substitutions.
// It is then used in addVolumeReferences() when creating a pod.
volumes []apiv1.Volume
Expand Down
11 changes: 6 additions & 5 deletions workflow/controller/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/argoproj/argo/errors"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
)

// wfScope contains the current scope of variables available when executing a template
Expand All @@ -13,16 +14,16 @@ type wfScope struct {
scope map[string]interface{}
}

// replaceMap returns a replacement map of strings intended to be used simple string substitution
func (s *wfScope) replaceMap() map[string]string {
replaceMap := make(map[string]string)
// getParameters returns a map of strings intended to be used simple string substitution
func (s *wfScope) getParameters() common.Parameters {
params := make(common.Parameters)
for key, val := range s.scope {
valStr, ok := val.(string)
if ok {
replaceMap[key] = valStr
params[key] = valStr
}
}
return replaceMap
return params
}

func (s *wfScope) addParamToScope(key, val string) {
Expand Down
10 changes: 4 additions & 6 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop
newStepGroup := make([]wfv1.WorkflowStep, len(stepGroup))

// Step 0: replace all parameter scope references for volumes
err := woc.substituteParamsInVolumes(scope.replaceMap())
err := woc.substituteParamsInVolumes(scope.getParameters())
if err != nil {
return nil, err
}
Expand All @@ -349,7 +349,8 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop
return nil, errors.InternalWrapError(err)
}
fstTmpl := fasttemplate.New(string(stepBytes), "{{", "}}")
newStepStr, err := common.Replace(fstTmpl, scope.replaceMap(), true)

newStepStr, err := common.Replace(fstTmpl, woc.globalParams.Merge(scope.getParameters()), true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -465,10 +466,7 @@ func (woc *wfOperationCtx) expandStep(step wfv1.WorkflowStep) ([]wfv1.WorkflowSt

func (woc *wfOperationCtx) prepareMetricScope(node *wfv1.NodeStatus) (map[string]string, map[string]func() float64) {
realTimeScope := make(map[string]func() float64)
localScope := make(map[string]string)
for key, val := range woc.globalParams {
localScope[key] = val
}
localScope := woc.globalParams.DeepCopy()

if node.Completed() {
localScope["duration"] = fmt.Sprintf("%f", node.FinishedAt.Sub(node.StartedAt.Time).Seconds())
Expand Down
45 changes: 45 additions & 0 deletions workflow/controller/steps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,48 @@ func TestArtifactResolutionWhenSkipped(t *testing.T) {
woc.operate()
assert.Equal(t, wfv1.NodeSucceeded, woc.wf.Status.Phase)
}

var stepsWithParamAndGlobalParam = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: steps-with-param-and-global-param-
spec:
entrypoint: main
arguments:
parameters:
- name: workspace
value: /argo_workspace/{{workflow.uid}}
templates:
- name: main
steps:
- - name: use-with-param
template: whalesay
arguments:
parameters:
- name: message
value: "hello {{workflow.parameters.workspace}} {{item}}"
withParam: "[0, 1, 2]"
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
`

func TestStepsWithParamAndGlobalParam(t *testing.T) {
cancel, controller := newController()
defer cancel()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

wf := unmarshalWF(stepsWithParamAndGlobalParam)
wf, err := wfcset.Create(wf)
assert.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)

woc.operate()
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Phase)
}
7 changes: 2 additions & 5 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,8 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
}

// substitutePodParams returns a pod spec with parameter references substituted as well as pod.name
func substitutePodParams(pod *apiv1.Pod, globalParams map[string]string, tmpl *wfv1.Template) (*apiv1.Pod, error) {
podParams := make(map[string]string)
for k, v := range globalParams {
podParams[k] = v
}
func substitutePodParams(pod *apiv1.Pod, globalParams common.Parameters, tmpl *wfv1.Template) (*apiv1.Pod, error) {
podParams := globalParams.DeepCopy()
for _, inParam := range tmpl.Inputs.Parameters {
podParams["inputs.parameters."+inParam.Name] = *inParam.Value
}
Expand Down