Skip to content

Commit

Permalink
fix(controller): Include global params when using withParam (argoproj…
Browse files Browse the repository at this point in the history
  • Loading branch information
dtaniwaki committed Apr 23, 2020
1 parent 3441b11 commit e0f2697
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 29 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ require (
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/soheilhy/cmux v0.1.4
github.com/spf13/cobra v0.0.4-0.20181021141114-fe5e611709b0
github.com/stretchr/testify v1.4.0
github.com/stretchr/testify v1.5.1
github.com/tidwall/gjson v1.3.5
github.com/valyala/fasthttp v0.0.0-20171207120941-e5f51c11919d // indirect
github.com/valyala/fasttemplate v1.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/tidwall/gjson v1.3.5 h1:2oW9FBNu8qt9jy5URgrzsVx/T/KSn3qn/smJQ0crlDQ=
github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
Expand Down
24 changes: 24 additions & 0 deletions workflow/common/params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package common

// Parameters extends string map with useful methods.
type Parameters map[string]string

// Merge merges given parameteres.
func (ps Parameters) Merge(args ...Parameters) Parameters {
newParams := ps.DeepCopy()
for _, params := range args {
for k, v := range params {
newParams[k] = v
}
}
return newParams
}

// DeepCopy returns a new instance which has the same parameters as the receiver.
func (ps Parameters) DeepCopy() Parameters {
newParams := make(Parameters)
for k, v := range ps {
newParams[k] = v
}
return newParams
}
23 changes: 23 additions & 0 deletions workflow/common/params_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package common

import (
"testing"

"github.com/stretchr/testify/assert"
)

// TestParamsMerge ensures Merge of Parameters works correctly.
func TestParamsMerge(t *testing.T) {
params := Parameters{"foo": "1"}
newParams := params.Merge(Parameters{"foo": "2", "bar": "1"}, Parameters{"wow": "1"})
assert.Equal(t, Parameters{"foo": "2", "bar": "1", "wow": "1"}, newParams)
assert.NotSame(t, &params, &newParams)
}

// TestParamsClone ensures Clone of Parameters works correctly.
func TestParamsClone(t *testing.T) {
params := Parameters{"foo": "1"}
newParams := params.DeepCopy()
assert.Equal(t, params, newParams)
assert.NotSame(t, &params, &newParams)
}
12 changes: 3 additions & 9 deletions workflow/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func GetExecutorOutput(exec remotecommand.Executor) (*bytes.Buffer, *bytes.Buffe
// * parameters in the template from the arguments
// * global parameters (e.g. {{workflow.parameters.XX}}, {{workflow.name}}, {{workflow.status}})
// * local parameters (e.g. {{pod.name}})
func ProcessArgs(tmpl *wfv1.Template, args wfv1.ArgumentsProvider, globalParams, localParams map[string]string, validateOnly bool) (*wfv1.Template, error) {
func ProcessArgs(tmpl *wfv1.Template, args wfv1.ArgumentsProvider, globalParams, localParams Parameters, validateOnly bool) (*wfv1.Template, error) {
// For each input parameter:
// 1) check if was supplied as argument. if so use the supplied value from arg
// 2) if not, use default value.
Expand Down Expand Up @@ -300,19 +300,13 @@ func ProcessArgs(tmpl *wfv1.Template, args wfv1.ArgumentsProvider, globalParams,
}

// SubstituteParams returns a new copy of the template with global, pod, and input parameters substituted
func SubstituteParams(tmpl *wfv1.Template, globalParams, localParams map[string]string) (*wfv1.Template, error) {
func SubstituteParams(tmpl *wfv1.Template, globalParams, localParams Parameters) (*wfv1.Template, error) {
tmplBytes, err := json.Marshal(tmpl)
if err != nil {
return nil, errors.InternalWrapError(err)
}
// First replace globals & locals, then replace inputs because globals could be referenced in the inputs
replaceMap := make(map[string]string)
for k, v := range globalParams {
replaceMap[k] = v
}
for k, v := range localParams {
replaceMap[k] = v
}
replaceMap := globalParams.Merge(localParams)
fstTmpl := fasttemplate.New(string(tmplBytes), "{{", "}}")
globalReplacedTmplStr, err := Replace(fstTmpl, replaceMap, true)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func (woc *wfOperationCtx) resolveDependencyReferences(dagCtx *dagContext, task

// Perform replacement
// Replace woc.volumes
err = woc.substituteParamsInVolumes(scope.replaceMap())
err = woc.substituteParamsInVolumes(scope.getParameters())
if err != nil {
return nil, err
}
Expand All @@ -543,7 +543,8 @@ func (woc *wfOperationCtx) resolveDependencyReferences(dagCtx *dagContext, task
return nil, errors.InternalWrapError(err)
}
fstTmpl := fasttemplate.New(string(taskBytes), "{{", "}}")
newTaskStr, err := common.Replace(fstTmpl, scope.replaceMap(), true)

newTaskStr, err := common.Replace(fstTmpl, woc.globalParams.Merge(scope.getParameters()), true)
if err != nil {
return nil, err
}
Expand Down
46 changes: 46 additions & 0 deletions workflow/controller/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,3 +607,49 @@ func TestDagAssessPhaseContinueOnExpandedTask(t *testing.T) {
woc.operate()
assert.Equal(t, wfv1.NodeSucceeded, woc.wf.Status.Phase)
}

var dagWithParamAndGlobalParam = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-with-param-and-global-param-
spec:
entrypoint: main
arguments:
parameters:
- name: workspace
value: /argo_workspace/{{workflow.uid}}
templates:
- name: main
dag:
tasks:
- name: use-with-param
template: whalesay
arguments:
parameters:
- name: message
value: "hello {{workflow.parameters.workspace}} {{item}}"
withParam: "[0, 1, 2]"
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
`

func TestDAGWithParamAndGlobalParam(t *testing.T) {
cancel, controller := newController()
defer cancel()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

wf := unmarshalWF(dagWithParamAndGlobalParam)
wf, err := wfcset.Create(wf)
assert.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)

woc.operate()
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Phase)
}
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type wfOperationCtx struct {
controller *WorkflowController
// globalParams holds any parameters that are available to be referenced
// in the global scope (e.g. workflow.parameters.XXX).
globalParams map[string]string
globalParams common.Parameters
// volumes holds a DeepCopy of wf.Spec.Volumes to perform substitutions.
// It is then used in addVolumeReferences() when creating a pod.
volumes []apiv1.Volume
Expand Down
11 changes: 6 additions & 5 deletions workflow/controller/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/argoproj/argo/errors"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
)

// wfScope contains the current scope of variables available when executing a template
Expand All @@ -13,16 +14,16 @@ type wfScope struct {
scope map[string]interface{}
}

// replaceMap returns a replacement map of strings intended to be used simple string substitution
func (s *wfScope) replaceMap() map[string]string {
replaceMap := make(map[string]string)
// getParameters returns a map of strings intended to be used simple string substitution
func (s *wfScope) getParameters() common.Parameters {
params := make(common.Parameters)
for key, val := range s.scope {
valStr, ok := val.(string)
if ok {
replaceMap[key] = valStr
params[key] = valStr
}
}
return replaceMap
return params
}

func (s *wfScope) addParamToScope(key, val string) {
Expand Down
10 changes: 4 additions & 6 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop
newStepGroup := make([]wfv1.WorkflowStep, len(stepGroup))

// Step 0: replace all parameter scope references for volumes
err := woc.substituteParamsInVolumes(scope.replaceMap())
err := woc.substituteParamsInVolumes(scope.getParameters())
if err != nil {
return nil, err
}
Expand All @@ -349,7 +349,8 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop
return nil, errors.InternalWrapError(err)
}
fstTmpl := fasttemplate.New(string(stepBytes), "{{", "}}")
newStepStr, err := common.Replace(fstTmpl, scope.replaceMap(), true)

newStepStr, err := common.Replace(fstTmpl, woc.globalParams.Merge(scope.getParameters()), true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -465,10 +466,7 @@ func (woc *wfOperationCtx) expandStep(step wfv1.WorkflowStep) ([]wfv1.WorkflowSt

func (woc *wfOperationCtx) prepareMetricScope(node *wfv1.NodeStatus) (map[string]string, map[string]func() float64) {
realTimeScope := make(map[string]func() float64)
localScope := make(map[string]string)
for key, val := range woc.globalParams {
localScope[key] = val
}
localScope := woc.globalParams.DeepCopy()

if node.Completed() {
localScope["duration"] = fmt.Sprintf("%f", node.FinishedAt.Sub(node.StartedAt.Time).Seconds())
Expand Down
45 changes: 45 additions & 0 deletions workflow/controller/steps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,48 @@ func TestArtifactResolutionWhenSkipped(t *testing.T) {
woc.operate()
assert.Equal(t, wfv1.NodeSucceeded, woc.wf.Status.Phase)
}

var stepsWithParamAndGlobalParam = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: steps-with-param-and-global-param-
spec:
entrypoint: main
arguments:
parameters:
- name: workspace
value: /argo_workspace/{{workflow.uid}}
templates:
- name: main
steps:
- - name: use-with-param
template: whalesay
arguments:
parameters:
- name: message
value: "hello {{workflow.parameters.workspace}} {{item}}"
withParam: "[0, 1, 2]"
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
`

func TestStepsWithParamAndGlobalParam(t *testing.T) {
cancel, controller := newController()
defer cancel()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

wf := unmarshalWF(stepsWithParamAndGlobalParam)
wf, err := wfcset.Create(wf)
assert.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)

woc.operate()
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Phase)
}
7 changes: 2 additions & 5 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,8 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
}

// substitutePodParams returns a pod spec with parameter references substituted as well as pod.name
func substitutePodParams(pod *apiv1.Pod, globalParams map[string]string, tmpl *wfv1.Template) (*apiv1.Pod, error) {
podParams := make(map[string]string)
for k, v := range globalParams {
podParams[k] = v
}
func substitutePodParams(pod *apiv1.Pod, globalParams common.Parameters, tmpl *wfv1.Template) (*apiv1.Pod, error) {
podParams := globalParams.DeepCopy()
for _, inParam := range tmpl.Inputs.Parameters {
podParams["inputs.parameters."+inParam.Name] = *inParam.Value
}
Expand Down

0 comments on commit e0f2697

Please sign in to comment.