From f279f488a9e67ac79cf61341755b6b1548af612f Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Wed, 16 Oct 2019 21:31:59 -0700 Subject: [PATCH 01/10] Add PodSpecPatch functionality --- Gopkg.lock | 1 + api/openapi-spec/swagger.json | 6 +++ examples/pod-spec-patch.yaml | 18 +++++++++ .../workflow/v1alpha1/openapi_generated.go | 12 ++++++ pkg/apis/workflow/v1alpha1/workflow_types.go | 6 +++ workflow/controller/workflowpod.go | 33 ++++++++++++++++ workflow/controller/workflowpod_test.go | 39 ++++++++++++++++--- 7 files changed, 109 insertions(+), 6 deletions(-) create mode 100644 examples/pod-spec-patch.yaml diff --git a/Gopkg.lock b/Gopkg.lock index 1bad885666d9..bc492d1be1f3 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1466,6 +1466,7 @@ "k8s.io/apimachinery/pkg/types", "k8s.io/apimachinery/pkg/util/clock", "k8s.io/apimachinery/pkg/util/runtime", + "k8s.io/apimachinery/pkg/util/strategicpatch", "k8s.io/apimachinery/pkg/util/validation", "k8s.io/apimachinery/pkg/util/wait", "k8s.io/apimachinery/pkg/version", diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index 2c8a079133e6..fdf2f5c16672 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -1083,6 +1083,9 @@ "type": "integer", "format": "int64" }, + "podSpecPatch": { + "type": "string" + }, "priority": { "description": "Priority to apply to workflow pods.", "type": "integer", @@ -1489,6 +1492,9 @@ "description": "PriorityClassName to apply to workflow pods.", "type": "string" }, + "podSpecPatch": { + "type": "string" + }, "priority": { "description": "Priority is used if controller is configured to process limited number of workflows in parallel. Workflows with higher priority are processed first.", "type": "integer", diff --git a/examples/pod-spec-patch.yaml b/examples/pod-spec-patch.yaml new file mode 100644 index 000000000000..9a29adcde9a8 --- /dev/null +++ b/examples/pod-spec-patch.yaml @@ -0,0 +1,18 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: pod-spec-patch- +spec: + entrypoint: whalesay + arguments: + parameters: + - name: message + value: 100m + + templates: + - name: whalesay + podspecpatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "{{workflow.parameters.message}}"}}}]}' + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] diff --git a/pkg/apis/workflow/v1alpha1/openapi_generated.go b/pkg/apis/workflow/v1alpha1/openapi_generated.go index 5a2470e62cf8..fd7ba6c10237 100644 --- a/pkg/apis/workflow/v1alpha1/openapi_generated.go +++ b/pkg/apis/workflow/v1alpha1/openapi_generated.go @@ -2128,6 +2128,12 @@ func schema_pkg_apis_workflow_v1alpha1_Template(ref common.ReferenceCallback) co Ref: ref("k8s.io/api/core/v1.PodSecurityContext"), }, }, + "podSpecPatch": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, }, Required: []string{"name"}, }, @@ -2818,6 +2824,12 @@ func schema_pkg_apis_workflow_v1alpha1_WorkflowSpec(ref common.ReferenceCallback Ref: ref("k8s.io/api/core/v1.PodSecurityContext"), }, }, + "podSpecPatch": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, }, Required: []string{"templates", "entrypoint"}, }, diff --git a/pkg/apis/workflow/v1alpha1/workflow_types.go b/pkg/apis/workflow/v1alpha1/workflow_types.go index e10ee4c1846d..aa32bb700944 100644 --- a/pkg/apis/workflow/v1alpha1/workflow_types.go +++ b/pkg/apis/workflow/v1alpha1/workflow_types.go @@ -226,6 +226,9 @@ type WorkflowSpec struct { // Optional: Defaults to empty. See type description for default values of each field. // +optional SecurityContext *apiv1.PodSecurityContext `json:"securityContext,omitempty"` + + // PodSpecPatch holds json patch string to merge on Pod spec. Controller is using StrategicMergePatch to merge it. + PodSpecPatch string `json:"podSpecPatch,omitempty"` } // Template is a reusable and composable unit of execution in a workflow @@ -351,6 +354,9 @@ type Template struct { // Optional: Defaults to empty. See type description for default values of each field. // +optional SecurityContext *apiv1.PodSecurityContext `json:"securityContext,omitempty"` + + // PodSpecPatch holds json patch string to merge on Pod spec. Controller is using StrategicMergePatch to merge it. + PodSpecPatch string `json:"podSpecPatch,omitempty"` } var _ TemplateHolder = &Template{} diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 36a63ac3e70e..45688173dfb4 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -17,6 +17,7 @@ import ( apiv1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/utils/pointer" ) @@ -221,6 +222,38 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont } } + // Apply the patch string from template + if woc.wf.Spec.PodSpecPatch != "" || tmpl.PodSpecPatch != "" { + + if tmpl.PodSpecPatch == "" { + tmpl.PodSpecPatch = woc.wf.Spec.PodSpecPatch + pod, err = substitutePodParams(pod, woc.globalParams, tmpl) + if err != nil { + return nil, err + } + } + podSpecPatch := tmpl.PodSpecPatch + jsonstr, err := json.Marshal(pod.Spec) + + var spec apiv1.PodSpec + fmt.Println(podSpecPatch) + + err = json.Unmarshal([]byte(podSpecPatch), &spec) + + if err != nil { + return nil, errors.Wrap(err, "", "Invalid PodSpecPatch String") + } + + modJson, err := strategicpatch.StrategicMergePatch(jsonstr, []byte(podSpecPatch), apiv1.PodSpec{}) + + if err != nil { + return nil, errors.Wrap(err, "", "Error occured during strategicpatch") + } + err = json.Unmarshal(modJson, &pod.Spec) + if err != nil { + return nil, errors.Wrap(err, "", "Error in Unmarshalling after merge the patch") + } + } created, err := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.ObjectMeta.Namespace).Create(pod) if err != nil { if apierr.IsAlreadyExists(err) { diff --git a/workflow/controller/workflowpod_test.go b/workflow/controller/workflowpod_test.go index a6e7e5c11176..af3978758cc5 100644 --- a/workflow/controller/workflowpod_test.go +++ b/workflow/controller/workflowpod_test.go @@ -10,10 +10,10 @@ import ( wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/workflow/common" - "sigs.k8s.io/yaml" "github.com/stretchr/testify/assert" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/yaml" ) func unmarshalTemplate(yamlStr string) *wfv1.Template { @@ -106,15 +106,16 @@ script: source: | ls -al ` + // TestScriptTemplateWithVolume ensure we can a script pod with input artifacts func TestScriptTemplateWithoutVolumeOptionalArtifact(t *testing.T) { volumeMount := apiv1.VolumeMount{ - Name: "input-artifacts", - ReadOnly: false, - MountPath: "/manifest", - SubPath: "manifest", + Name: "input-artifacts", + ReadOnly: false, + MountPath: "/manifest", + SubPath: "manifest", MountPropagation: nil, - SubPathExpr: "", + SubPathExpr: "", } // Ensure that volume mount is added when artifact is provided @@ -880,3 +881,29 @@ func TestTmplLevelSecurityContext(t *testing.T) { assert.NotNil(t, pod.Spec.SecurityContext) assert.Equal(t, runAsUser, *pod.Spec.SecurityContext.RunAsUser) } + +var helloWorldWfWithPatch = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: hello-world +spec: + entrypoint: whalesay + templates: + - name: whalesay + podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "800m"}}}]}' + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] +` + +func TestPodSpecPatch(t *testing.T) { + wf := unmarshalWF(helloWorldWfWithPatch) + woc := newWoc(*wf) + mainCtr := woc.wf.Spec.Templates[0].Container + //mainCtr.Args = append(mainCtr.Args, common.ExecutorScriptSourcePath) + pod, _ := woc.createWorkflowPod(wf.Name, *mainCtr, &wf.Spec.Templates[0], false) + fmt.Println(pod.Spec.Containers[1].Resources.Limits.Cpu().AsInt64()) + assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String()) +} From 70004282e1d8695289b1298ab0522e9033e480ae Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Wed, 16 Oct 2019 21:35:04 -0700 Subject: [PATCH 02/10] Fixed unused variable and print statement --- workflow/controller/workflowpod.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 45688173dfb4..d408d3fa89ce 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -232,19 +232,15 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont return nil, err } } - podSpecPatch := tmpl.PodSpecPatch jsonstr, err := json.Marshal(pod.Spec) - var spec apiv1.PodSpec - fmt.Println(podSpecPatch) - - err = json.Unmarshal([]byte(podSpecPatch), &spec) + err = json.Unmarshal([]byte(tmpl.PodSpecPatch), &spec) if err != nil { return nil, errors.Wrap(err, "", "Invalid PodSpecPatch String") } - modJson, err := strategicpatch.StrategicMergePatch(jsonstr, []byte(podSpecPatch), apiv1.PodSpec{}) + modJson, err := strategicpatch.StrategicMergePatch(jsonstr, []byte(tmpl.PodSpecPatch), apiv1.PodSpec{}) if err != nil { return nil, errors.Wrap(err, "", "Error occured during strategicpatch") From 7bdc886fc58ad10fc595c13265653eba82235e3e Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Wed, 16 Oct 2019 22:32:38 -0700 Subject: [PATCH 03/10] Update workflowpod.go --- workflow/controller/workflowpod.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index d408d3fa89ce..938150b3be19 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -233,6 +233,11 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont } } jsonstr, err := json.Marshal(pod.Spec) + + if err != nil { + return nil, errors.Wrap(err, "", "Fail to marshal the Pod spec") + } + var spec apiv1.PodSpec err = json.Unmarshal([]byte(tmpl.PodSpecPatch), &spec) From 60ac731f1be97e383e54e2a28607efb13225fb77 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Wed, 16 Oct 2019 23:06:33 -0700 Subject: [PATCH 04/10] Fixed codegen --- api/openapi-spec/swagger.json | 2 ++ pkg/apis/workflow/v1alpha1/openapi_generated.go | 10 ++++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index fdf2f5c16672..feeceacf0619 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -1084,6 +1084,7 @@ "format": "int64" }, "podSpecPatch": { + "description": "PodSpecPatch holds json patch string to merge on Pod spec. Controller is using StrategicMergePatch to merge it.", "type": "string" }, "priority": { @@ -1493,6 +1494,7 @@ "type": "string" }, "podSpecPatch": { + "description": "PodSpecPatch holds json patch string to merge on Pod spec. Controller is using StrategicMergePatch to merge it.", "type": "string" }, "priority": { diff --git a/pkg/apis/workflow/v1alpha1/openapi_generated.go b/pkg/apis/workflow/v1alpha1/openapi_generated.go index fd7ba6c10237..eb7071ac751f 100644 --- a/pkg/apis/workflow/v1alpha1/openapi_generated.go +++ b/pkg/apis/workflow/v1alpha1/openapi_generated.go @@ -2130,8 +2130,9 @@ func schema_pkg_apis_workflow_v1alpha1_Template(ref common.ReferenceCallback) co }, "podSpecPatch": { SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", + Description: "PodSpecPatch holds json patch string to merge on Pod spec. Controller is using StrategicMergePatch to merge it.", + Type: []string{"string"}, + Format: "", }, }, }, @@ -2826,8 +2827,9 @@ func schema_pkg_apis_workflow_v1alpha1_WorkflowSpec(ref common.ReferenceCallback }, "podSpecPatch": { SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", + Description: "PodSpecPatch holds json patch string to merge on Pod spec. Controller is using StrategicMergePatch to merge it.", + Type: []string{"string"}, + Format: "", }, }, }, From 418459a98b39f445a27aa12eef0fbc9b008363ff Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Thu, 17 Oct 2019 21:31:08 -0700 Subject: [PATCH 05/10] Refactored as per Jesse comments --- api/openapi-spec/swagger.json | 4 +- examples/pod-spec-patch-wf-tmpl.yaml | 25 +++++++++ examples/pod-spec-patch.yaml | 5 +- examples/pod-spec-yaml-patch.yaml | 22 ++++++++ .../workflow/v1alpha1/openapi_generated.go | 4 +- pkg/apis/workflow/v1alpha1/workflow_types.go | 16 +++++- workflow/controller/operator.go | 9 +++ workflow/controller/workflowpod.go | 23 +++----- workflow/util/util.go | 55 +++++++++++++++++++ 9 files changed, 140 insertions(+), 23 deletions(-) create mode 100644 examples/pod-spec-patch-wf-tmpl.yaml create mode 100644 examples/pod-spec-yaml-patch.yaml diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index feeceacf0619..e7771dc641aa 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -1084,7 +1084,7 @@ "format": "int64" }, "podSpecPatch": { - "description": "PodSpecPatch holds json patch string to merge on Pod spec. Controller is using StrategicMergePatch to merge it.", + "description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of container fields which are not strings (e.g. resource limits).", "type": "string" }, "priority": { @@ -1494,7 +1494,7 @@ "type": "string" }, "podSpecPatch": { - "description": "PodSpecPatch holds json patch string to merge on Pod spec. Controller is using StrategicMergePatch to merge it.", + "description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of container fields which are not strings (e.g. resource limits).", "type": "string" }, "priority": { diff --git a/examples/pod-spec-patch-wf-tmpl.yaml b/examples/pod-spec-patch-wf-tmpl.yaml new file mode 100644 index 000000000000..052c7f664bb2 --- /dev/null +++ b/examples/pod-spec-patch-wf-tmpl.yaml @@ -0,0 +1,25 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: pod-spec-patch- +spec: + entrypoint: whalesay + arguments: + parameters: + - name: cpu-limit + value: 100m + - name: mem-limit + value: 100Mi + podSpecPatch: | + containers: + - name: main + resources: + limits: + memory: "{{workflow.parameters.mem-limit}}" + templates: + - name: whalesay + podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "{{workflow.parameters.cpu-limit}}" }}}]}' + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] diff --git a/examples/pod-spec-patch.yaml b/examples/pod-spec-patch.yaml index 9a29adcde9a8..083c7ca4622b 100644 --- a/examples/pod-spec-patch.yaml +++ b/examples/pod-spec-patch.yaml @@ -6,12 +6,11 @@ spec: entrypoint: whalesay arguments: parameters: - - name: message + - name: cpu-limit value: 100m - templates: - name: whalesay - podspecpatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "{{workflow.parameters.message}}"}}}]}' + podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "{{workflow.parameters.cpu-limit}}" }}}]}' container: image: docker/whalesay:latest command: [cowsay] diff --git a/examples/pod-spec-yaml-patch.yaml b/examples/pod-spec-yaml-patch.yaml new file mode 100644 index 000000000000..0d4a13d4f2ea --- /dev/null +++ b/examples/pod-spec-yaml-patch.yaml @@ -0,0 +1,22 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: pod-spec-patch- +spec: + entrypoint: whalesay + arguments: + parameters: + - name: mem-limit + value: 100Mi + podSpecPatch: | + containers: + - name: main + resources: + limits: + memory: "{{workflow.parameters.mem-limit}}" + templates: + - name: whalesay + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] diff --git a/pkg/apis/workflow/v1alpha1/openapi_generated.go b/pkg/apis/workflow/v1alpha1/openapi_generated.go index eb7071ac751f..328895c74bec 100644 --- a/pkg/apis/workflow/v1alpha1/openapi_generated.go +++ b/pkg/apis/workflow/v1alpha1/openapi_generated.go @@ -2130,7 +2130,7 @@ func schema_pkg_apis_workflow_v1alpha1_Template(ref common.ReferenceCallback) co }, "podSpecPatch": { SchemaProps: spec.SchemaProps{ - Description: "PodSpecPatch holds json patch string to merge on Pod spec. Controller is using StrategicMergePatch to merge it.", + Description: "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of container fields which are not strings (e.g. resource limits).", Type: []string{"string"}, Format: "", }, @@ -2827,7 +2827,7 @@ func schema_pkg_apis_workflow_v1alpha1_WorkflowSpec(ref common.ReferenceCallback }, "podSpecPatch": { SchemaProps: spec.SchemaProps{ - Description: "PodSpecPatch holds json patch string to merge on Pod spec. Controller is using StrategicMergePatch to merge it.", + Description: "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of container fields which are not strings (e.g. resource limits).", Type: []string{"string"}, Format: "", }, diff --git a/pkg/apis/workflow/v1alpha1/workflow_types.go b/pkg/apis/workflow/v1alpha1/workflow_types.go index aa32bb700944..b511251acb69 100644 --- a/pkg/apis/workflow/v1alpha1/workflow_types.go +++ b/pkg/apis/workflow/v1alpha1/workflow_types.go @@ -227,10 +227,16 @@ type WorkflowSpec struct { // +optional SecurityContext *apiv1.PodSecurityContext `json:"securityContext,omitempty"` - // PodSpecPatch holds json patch string to merge on Pod spec. Controller is using StrategicMergePatch to merge it. + // PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of + // container fields which are not strings (e.g. resource limits). PodSpecPatch string `json:"podSpecPatch,omitempty"` } + +func (wfs *WorkflowSpec) HasPodSpecPatch() bool { + return wfs.PodSpecPatch != "" +} + // Template is a reusable and composable unit of execution in a workflow type Template struct { // Name is the name of the template @@ -355,7 +361,8 @@ type Template struct { // +optional SecurityContext *apiv1.PodSecurityContext `json:"securityContext,omitempty"` - // PodSpecPatch holds json patch string to merge on Pod spec. Controller is using StrategicMergePatch to merge it. + // PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of + // container fields which are not strings (e.g. resource limits). PodSpecPatch string `json:"podSpecPatch,omitempty"` } @@ -380,6 +387,11 @@ func (tmpl *Template) GetBaseTemplate() *Template { return baseTemplate } +func (tmpl *Template) HasPodSpecPatch() bool { + return tmpl.PodSpecPatch != "" +} + + // Inputs are the mechanism for passing parameters, artifacts, volumes from one template to another type Inputs struct { // Parameters are a list of parameters passed as inputs diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 5731552bd662..08d24ea2f360 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1128,10 +1128,19 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat } newTmplCtx, basedTmpl, err := woc.getResolvedTemplate(node, orgTmpl, tmplCtx, args) + if err != nil { return woc.initializeNodeOrMarkError(node, nodeName, wfv1.NodeTypeSkipped, orgTmpl, boundaryID, err), err } + if woc.wf.Spec.HasPodSpecPatch() { + err = util.PodSpecPatchMerge(woc.wf, basedTmpl) + fmt.Println(basedTmpl.PodSpecPatch) + if err != nil { + return woc.initializeNodeOrMarkError(node, nodeName, wfv1.NodeTypeSkipped, orgTmpl, boundaryID, err), err + } + } + localParams := make(map[string]string) if basedTmpl.IsPodType() { localParams[common.LocalVarPodName] = woc.wf.NodeID(nodeName) diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 938150b3be19..b329c95f3cda 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -12,6 +12,7 @@ import ( "github.com/argoproj/argo/pkg/apis/workflow" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/workflow/common" + "github.com/argoproj/argo/workflow/util" log "github.com/sirupsen/logrus" "github.com/valyala/fasttemplate" apiv1 "k8s.io/api/core/v1" @@ -80,6 +81,10 @@ func (woc *wfOperationCtx) getVolumeDockerSock() apiv1.Volume { } } +func (woc *wfOperationCtx) hasPodSpecPatch(tmpl *wfv1.Template) bool { + return woc.wf.Spec.HasPodSpecPatch() || tmpl.HasPodSpecPatch() +} + func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Container, tmpl *wfv1.Template, includeScriptOutput bool) (*apiv1.Pod, error) { nodeID := woc.wf.NodeID(nodeName) woc.log.Debugf("Creating Pod: %s (%s)", nodeName, nodeID) @@ -223,32 +228,22 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont } // Apply the patch string from template - if woc.wf.Spec.PodSpecPatch != "" || tmpl.PodSpecPatch != "" { - - if tmpl.PodSpecPatch == "" { - tmpl.PodSpecPatch = woc.wf.Spec.PodSpecPatch - pod, err = substitutePodParams(pod, woc.globalParams, tmpl) - if err != nil { - return nil, err - } - } + if woc.hasPodSpecPatch(tmpl) { jsonstr, err := json.Marshal(pod.Spec) - if err != nil { return nil, errors.Wrap(err, "", "Fail to marshal the Pod spec") } var spec apiv1.PodSpec - err = json.Unmarshal([]byte(tmpl.PodSpecPatch), &spec) - if err != nil { - return nil, errors.Wrap(err, "", "Invalid PodSpecPatch String") + if !util.ValidateJsonStr(tmpl.PodSpecPatch, spec) { + return nil, errors.New("", "Invalid PodSpecPatch String") } modJson, err := strategicpatch.StrategicMergePatch(jsonstr, []byte(tmpl.PodSpecPatch), apiv1.PodSpec{}) if err != nil { - return nil, errors.Wrap(err, "", "Error occured during strategicpatch") + return nil, errors.Wrap(err, "", "Error occurred during strategic merge patch") } err = json.Unmarshal(modJson, &pod.Spec) if err != nil { diff --git a/workflow/util/util.go b/workflow/util/util.go index decb313fa586..cfabb0d49ee8 100644 --- a/workflow/util/util.go +++ b/workflow/util/util.go @@ -13,6 +13,8 @@ import ( "time" log "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/util/strategicpatch" + apiv1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -669,3 +671,56 @@ func ReadManifest(manifestPaths ...string) ([][]byte, error) { } return manifestContents, err } + +func IsJsonStr(str string) bool { + str = strings.TrimSpace(str) + return len(str) > 0 && str[0] == '{' +} + +func ConvertYAMLToJSON(str string) (string, error) { + if !IsJsonStr(str) { + jsonStr, err := yaml.YAMLToJSON([]byte(str)) + if err != nil { + return str, err + } + return string(jsonStr), nil + } + return str, nil +} + +// PodSpecPatchMerge will do strategic merge the workflow level PodSpecPatch and template level PodSpecPatch +func PodSpecPatchMerge(wf *wfv1.Workflow, tmpl *wfv1.Template) error { + var wfPatch, tmplPatch, mergedPatch string + var err error + + if wf.Spec.HasPodSpecPatch() { + wfPatch, err = ConvertYAMLToJSON(wf.Spec.PodSpecPatch) + if err != nil { + return err + } + } + if tmpl.HasPodSpecPatch() { + tmplPatch, err = ConvertYAMLToJSON(tmpl.PodSpecPatch) + if err != nil { + return err + } + mergedByte, err := strategicpatch.StrategicMergePatch([]byte(wfPatch), []byte(tmplPatch), apiv1.PodSpec{}) + if err != nil { + return err + } + mergedPatch = string(mergedByte) + } else { + mergedPatch = wfPatch + } + tmpl.PodSpecPatch = mergedPatch + return nil +} + +func ValidateJsonStr(jsonStr string, schema interface{}) bool { + err := json.Unmarshal([]byte(jsonStr), &schema) + if err != nil { + return false + } + return true + +} From 795c817ea21db62fae150f6a6b41c96d0f4e40db Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Thu, 17 Oct 2019 21:50:40 -0700 Subject: [PATCH 06/10] fixed lint --- workflow/util/util.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/workflow/util/util.go b/workflow/util/util.go index cfabb0d49ee8..9980083c1c4c 100644 --- a/workflow/util/util.go +++ b/workflow/util/util.go @@ -13,7 +13,6 @@ import ( "time" log "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/util/strategicpatch" apiv1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -23,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/client-go/informers/internalinterfaces" @@ -718,9 +718,5 @@ func PodSpecPatchMerge(wf *wfv1.Workflow, tmpl *wfv1.Template) error { func ValidateJsonStr(jsonStr string, schema interface{}) bool { err := json.Unmarshal([]byte(jsonStr), &schema) - if err != nil { - return false - } - return true - + return err == nil } From 48f68c7720f4029414f33569c35d62026c38ecc2 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Thu, 17 Oct 2019 22:23:52 -0700 Subject: [PATCH 07/10] Update workflow_types.go --- pkg/apis/workflow/v1alpha1/workflow_types.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/apis/workflow/v1alpha1/workflow_types.go b/pkg/apis/workflow/v1alpha1/workflow_types.go index b511251acb69..4bfe83ca632e 100644 --- a/pkg/apis/workflow/v1alpha1/workflow_types.go +++ b/pkg/apis/workflow/v1alpha1/workflow_types.go @@ -232,7 +232,6 @@ type WorkflowSpec struct { PodSpecPatch string `json:"podSpecPatch,omitempty"` } - func (wfs *WorkflowSpec) HasPodSpecPatch() bool { return wfs.PodSpecPatch != "" } @@ -391,7 +390,6 @@ func (tmpl *Template) HasPodSpecPatch() bool { return tmpl.PodSpecPatch != "" } - // Inputs are the mechanism for passing parameters, artifacts, volumes from one template to another type Inputs struct { // Parameters are a list of parameters passed as inputs From 5dcb4ba3986ed719e385de56fb357153364e6cc8 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Sat, 19 Oct 2019 10:46:10 -0700 Subject: [PATCH 08/10] Updated review comments --- workflow/controller/operator.go | 9 ---- workflow/controller/workflowpod.go | 16 +++++++ workflow/controller/workflowpod_test.go | 55 ++++++++++++++++++++++++- workflow/util/util.go | 26 +++++++----- workflow/util/util_test.go | 28 +++++++++++++ 5 files changed, 112 insertions(+), 22 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 08d24ea2f360..02461ea08a9f 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1132,15 +1132,6 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat if err != nil { return woc.initializeNodeOrMarkError(node, nodeName, wfv1.NodeTypeSkipped, orgTmpl, boundaryID, err), err } - - if woc.wf.Spec.HasPodSpecPatch() { - err = util.PodSpecPatchMerge(woc.wf, basedTmpl) - fmt.Println(basedTmpl.PodSpecPatch) - if err != nil { - return woc.initializeNodeOrMarkError(node, nodeName, wfv1.NodeTypeSkipped, orgTmpl, boundaryID, err), err - } - } - localParams := make(map[string]string) if basedTmpl.IsPodType() { localParams[common.LocalVarPodName] = woc.wf.NodeID(nodeName) diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index b329c95f3cda..d60335f8e837 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -234,6 +234,22 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont return nil, errors.Wrap(err, "", "Fail to marshal the Pod spec") } + tmpl.PodSpecPatch, err = util.PodSpecPatchMerge(woc.wf, tmpl) + + if err != nil { + return nil, errors.Wrap(err, "", "Fail to marshal the Pod spec") + } + + // Final substitution for workflow level PodSpecPatch + localParams := make(map[string]string) + if tmpl.IsPodType() { + localParams[common.LocalVarPodName] = woc.wf.NodeID(nodeName) + } + tmpl, err := common.ProcessArgs(tmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false) + if err != nil { + return nil, errors.Wrap(err, "", "Fail to substitute the PodSpecPatch variables") + } + var spec apiv1.PodSpec if !util.ValidateJsonStr(tmpl.PodSpecPatch, spec) { diff --git a/workflow/controller/workflowpod_test.go b/workflow/controller/workflowpod_test.go index af3978758cc5..d0b7e226cee2 100644 --- a/workflow/controller/workflowpod_test.go +++ b/workflow/controller/workflowpod_test.go @@ -898,12 +898,63 @@ spec: args: ["hello world"] ` +var helloWorldWfWithWFPatch = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: hello-world +spec: + entrypoint: whalesay + podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "800m"}}}]}' + templates: + - name: whalesay + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] +` + +var helloWorldWfWithWFYMALPatch = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: hello-world +spec: + entrypoint: whalesay + podSpecPatch: | + containers: + - name: main + resources: + limits: + cpu: "800m" + templates: + - name: whalesay + podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"memory": "100Mi"}}}]}' + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] +` + func TestPodSpecPatch(t *testing.T) { wf := unmarshalWF(helloWorldWfWithPatch) woc := newWoc(*wf) mainCtr := woc.wf.Spec.Templates[0].Container - //mainCtr.Args = append(mainCtr.Args, common.ExecutorScriptSourcePath) pod, _ := woc.createWorkflowPod(wf.Name, *mainCtr, &wf.Spec.Templates[0], false) - fmt.Println(pod.Spec.Containers[1].Resources.Limits.Cpu().AsInt64()) assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String()) + + wf = unmarshalWF(helloWorldWfWithWFPatch) + woc = newWoc(*wf) + mainCtr = woc.wf.Spec.Templates[0].Container + pod, _ = woc.createWorkflowPod(wf.Name, *mainCtr, &wf.Spec.Templates[0], false) + assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String()) + + wf = unmarshalWF(helloWorldWfWithWFYMALPatch) + woc = newWoc(*wf) + mainCtr = woc.wf.Spec.Templates[0].Container + pod, _ = woc.createWorkflowPod(wf.Name, *mainCtr, &wf.Spec.Templates[0], false) + + assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String()) + assert.Equal(t, "104857600", pod.Spec.Containers[1].Resources.Limits.Memory().AsDec().String()) + } diff --git a/workflow/util/util.go b/workflow/util/util.go index 9980083c1c4c..b279f9b10c77 100644 --- a/workflow/util/util.go +++ b/workflow/util/util.go @@ -672,13 +672,13 @@ func ReadManifest(manifestPaths ...string) ([][]byte, error) { return manifestContents, err } -func IsJsonStr(str string) bool { +func IsJSONStr(str string) bool { str = strings.TrimSpace(str) return len(str) > 0 && str[0] == '{' } func ConvertYAMLToJSON(str string) (string, error) { - if !IsJsonStr(str) { + if !IsJSONStr(str) { jsonStr, err := yaml.YAMLToJSON([]byte(str)) if err != nil { return str, err @@ -689,31 +689,35 @@ func ConvertYAMLToJSON(str string) (string, error) { } // PodSpecPatchMerge will do strategic merge the workflow level PodSpecPatch and template level PodSpecPatch -func PodSpecPatchMerge(wf *wfv1.Workflow, tmpl *wfv1.Template) error { +func PodSpecPatchMerge(wf *wfv1.Workflow, tmpl *wfv1.Template) (string, error) { var wfPatch, tmplPatch, mergedPatch string var err error if wf.Spec.HasPodSpecPatch() { wfPatch, err = ConvertYAMLToJSON(wf.Spec.PodSpecPatch) if err != nil { - return err + return "", err } } if tmpl.HasPodSpecPatch() { tmplPatch, err = ConvertYAMLToJSON(tmpl.PodSpecPatch) if err != nil { - return err + return "", err } - mergedByte, err := strategicpatch.StrategicMergePatch([]byte(wfPatch), []byte(tmplPatch), apiv1.PodSpec{}) - if err != nil { - return err + + if wfPatch != "" { + mergedByte, err := strategicpatch.StrategicMergePatch([]byte(wfPatch), []byte(tmplPatch), apiv1.PodSpec{}) + if err != nil { + return "", err + } + mergedPatch = string(mergedByte) + } else { + mergedPatch = tmplPatch } - mergedPatch = string(mergedByte) } else { mergedPatch = wfPatch } - tmpl.PodSpecPatch = mergedPatch - return nil + return mergedPatch, nil } func ValidateJsonStr(jsonStr string, schema interface{}) bool { diff --git a/workflow/util/util_test.go b/workflow/util/util_test.go index e76f76a7a426..ec48c6fc8bd6 100644 --- a/workflow/util/util_test.go +++ b/workflow/util/util_test.go @@ -1,7 +1,9 @@ package util import ( + "encoding/json" "io/ioutil" + v1 "k8s.io/api/core/v1" "os" "path/filepath" "testing" @@ -168,3 +170,29 @@ func unmarshalWF(yamlStr string) *wfv1.Workflow { } return &wf } + +var ymal = ` +containers: + - name: main + resources: + limits: + cpu: 1000m +` + +func TestPodSpecPatchMerge(t *testing.T) { + tmpl := wfv1.Template{PodSpecPatch: "{\"containers\":[{\"name\":\"main\", \"resources\":{\"limits\":{\"cpu\": \"1000m\"}}}]}"} + wf := wfv1.Workflow{Spec: wfv1.WorkflowSpec{PodSpecPatch: "{\"containers\":[{\"name\":\"main\", \"resources\":{\"limits\":{\"memory\": \"100Mi\"}}}]}"}} + merged, _ := PodSpecPatchMerge(&wf, &tmpl) + var spec v1.PodSpec + json.Unmarshal([]byte(merged), &spec) + assert.Equal(t, "1.000", spec.Containers[0].Resources.Limits.Cpu().AsDec().String()) + assert.Equal(t, "104857600", spec.Containers[0].Resources.Limits.Memory().AsDec().String()) + + tmpl = wfv1.Template{PodSpecPatch: ymal} + wf = wfv1.Workflow{Spec: wfv1.WorkflowSpec{PodSpecPatch: "{\"containers\":[{\"name\":\"main\", \"resources\":{\"limits\":{\"memory\": \"100Mi\"}}}]}"}} + merged, _ = PodSpecPatchMerge(&wf, &tmpl) + json.Unmarshal([]byte(merged), &spec) + assert.Equal(t, "1.000", spec.Containers[0].Resources.Limits.Cpu().AsDec().String()) + assert.Equal(t, "104857600", spec.Containers[0].Resources.Limits.Memory().AsDec().String()) + +} \ No newline at end of file From f1e49aec0c1b1d1e2129e6dedc743b1ac260aabe Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Sun, 20 Oct 2019 23:02:15 -0700 Subject: [PATCH 09/10] Update workflowpod_test.go --- workflow/controller/workflowpod_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflow/controller/workflowpod_test.go b/workflow/controller/workflowpod_test.go index d0b7e226cee2..2d5aa7ca9fa8 100644 --- a/workflow/controller/workflowpod_test.go +++ b/workflow/controller/workflowpod_test.go @@ -914,7 +914,7 @@ spec: args: ["hello world"] ` -var helloWorldWfWithWFYMALPatch = ` +var helloWorldWfWithWFYAMLPatch = ` apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: @@ -949,7 +949,7 @@ func TestPodSpecPatch(t *testing.T) { pod, _ = woc.createWorkflowPod(wf.Name, *mainCtr, &wf.Spec.Templates[0], false) assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String()) - wf = unmarshalWF(helloWorldWfWithWFYMALPatch) + wf = unmarshalWF(helloWorldWfWithWFYAMLPatch) woc = newWoc(*wf) mainCtr = woc.wf.Spec.Templates[0].Container pod, _ = woc.createWorkflowPod(wf.Name, *mainCtr, &wf.Spec.Templates[0], false) From 32d335e5b52bcb60f3b4da8eb29bd024aa3529c6 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Sun, 20 Oct 2019 23:21:34 -0700 Subject: [PATCH 10/10] Update util_test.go --- workflow/util/util_test.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/workflow/util/util_test.go b/workflow/util/util_test.go index ec48c6fc8bd6..f10b365bda52 100644 --- a/workflow/util/util_test.go +++ b/workflow/util/util_test.go @@ -3,16 +3,15 @@ package util import ( "encoding/json" "io/ioutil" - v1 "k8s.io/api/core/v1" "os" "path/filepath" "testing" - "github.com/ghodss/yaml" - wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" fakeClientset "github.com/argoproj/argo/pkg/client/clientset/versioned/fake" + "github.com/ghodss/yaml" "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -171,7 +170,7 @@ func unmarshalWF(yamlStr string) *wfv1.Workflow { return &wf } -var ymal = ` +var yamlStr = ` containers: - name: main resources: @@ -188,11 +187,11 @@ func TestPodSpecPatchMerge(t *testing.T) { assert.Equal(t, "1.000", spec.Containers[0].Resources.Limits.Cpu().AsDec().String()) assert.Equal(t, "104857600", spec.Containers[0].Resources.Limits.Memory().AsDec().String()) - tmpl = wfv1.Template{PodSpecPatch: ymal} + tmpl = wfv1.Template{PodSpecPatch: yamlStr} wf = wfv1.Workflow{Spec: wfv1.WorkflowSpec{PodSpecPatch: "{\"containers\":[{\"name\":\"main\", \"resources\":{\"limits\":{\"memory\": \"100Mi\"}}}]}"}} merged, _ = PodSpecPatchMerge(&wf, &tmpl) json.Unmarshal([]byte(merged), &spec) assert.Equal(t, "1.000", spec.Containers[0].Resources.Limits.Cpu().AsDec().String()) assert.Equal(t, "104857600", spec.Containers[0].Resources.Limits.Memory().AsDec().String()) -} \ No newline at end of file +}