diff --git a/Gopkg.lock b/Gopkg.lock index 1bad885666d9..bc492d1be1f3 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1466,6 +1466,7 @@ "k8s.io/apimachinery/pkg/types", "k8s.io/apimachinery/pkg/util/clock", "k8s.io/apimachinery/pkg/util/runtime", + "k8s.io/apimachinery/pkg/util/strategicpatch", "k8s.io/apimachinery/pkg/util/validation", "k8s.io/apimachinery/pkg/util/wait", "k8s.io/apimachinery/pkg/version", diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index 2c8a079133e6..e7771dc641aa 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -1083,6 +1083,10 @@ "type": "integer", "format": "int64" }, + "podSpecPatch": { + "description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of container fields which are not strings (e.g. resource limits).", + "type": "string" + }, "priority": { "description": "Priority to apply to workflow pods.", "type": "integer", @@ -1489,6 +1493,10 @@ "description": "PriorityClassName to apply to workflow pods.", "type": "string" }, + "podSpecPatch": { + "description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of container fields which are not strings (e.g. resource limits).", + "type": "string" + }, "priority": { "description": "Priority is used if controller is configured to process limited number of workflows in parallel. Workflows with higher priority are processed first.", "type": "integer", diff --git a/examples/pod-spec-patch-wf-tmpl.yaml b/examples/pod-spec-patch-wf-tmpl.yaml new file mode 100644 index 000000000000..052c7f664bb2 --- /dev/null +++ b/examples/pod-spec-patch-wf-tmpl.yaml @@ -0,0 +1,25 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: pod-spec-patch- +spec: + entrypoint: whalesay + arguments: + parameters: + - name: cpu-limit + value: 100m + - name: mem-limit + value: 100Mi + podSpecPatch: | + containers: + - name: main + resources: + limits: + memory: "{{workflow.parameters.mem-limit}}" + templates: + - name: whalesay + podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "{{workflow.parameters.cpu-limit}}" }}}]}' + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] diff --git a/examples/pod-spec-patch.yaml b/examples/pod-spec-patch.yaml new file mode 100644 index 000000000000..083c7ca4622b --- /dev/null +++ b/examples/pod-spec-patch.yaml @@ -0,0 +1,17 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: pod-spec-patch- +spec: + entrypoint: whalesay + arguments: + parameters: + - name: cpu-limit + value: 100m + templates: + - name: whalesay + podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "{{workflow.parameters.cpu-limit}}" }}}]}' + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] diff --git a/examples/pod-spec-yaml-patch.yaml b/examples/pod-spec-yaml-patch.yaml new file mode 100644 index 000000000000..0d4a13d4f2ea --- /dev/null +++ b/examples/pod-spec-yaml-patch.yaml @@ -0,0 +1,22 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: pod-spec-patch- +spec: + entrypoint: whalesay + arguments: + parameters: + - name: mem-limit + value: 100Mi + podSpecPatch: | + containers: + - name: main + resources: + limits: + memory: "{{workflow.parameters.mem-limit}}" + templates: + - name: whalesay + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] diff --git a/pkg/apis/workflow/v1alpha1/openapi_generated.go b/pkg/apis/workflow/v1alpha1/openapi_generated.go index 5a2470e62cf8..328895c74bec 100644 --- a/pkg/apis/workflow/v1alpha1/openapi_generated.go +++ b/pkg/apis/workflow/v1alpha1/openapi_generated.go @@ -2128,6 +2128,13 @@ func schema_pkg_apis_workflow_v1alpha1_Template(ref common.ReferenceCallback) co Ref: ref("k8s.io/api/core/v1.PodSecurityContext"), }, }, + "podSpecPatch": { + SchemaProps: spec.SchemaProps{ + Description: "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of container fields which are not strings (e.g. resource limits).", + Type: []string{"string"}, + Format: "", + }, + }, }, Required: []string{"name"}, }, @@ -2818,6 +2825,13 @@ func schema_pkg_apis_workflow_v1alpha1_WorkflowSpec(ref common.ReferenceCallback Ref: ref("k8s.io/api/core/v1.PodSecurityContext"), }, }, + "podSpecPatch": { + SchemaProps: spec.SchemaProps{ + Description: "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of container fields which are not strings (e.g. resource limits).", + Type: []string{"string"}, + Format: "", + }, + }, }, Required: []string{"templates", "entrypoint"}, }, diff --git a/pkg/apis/workflow/v1alpha1/workflow_types.go b/pkg/apis/workflow/v1alpha1/workflow_types.go index e10ee4c1846d..4bfe83ca632e 100644 --- a/pkg/apis/workflow/v1alpha1/workflow_types.go +++ b/pkg/apis/workflow/v1alpha1/workflow_types.go @@ -226,6 +226,14 @@ type WorkflowSpec struct { // Optional: Defaults to empty. See type description for default values of each field. // +optional SecurityContext *apiv1.PodSecurityContext `json:"securityContext,omitempty"` + + // PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of + // container fields which are not strings (e.g. resource limits). + PodSpecPatch string `json:"podSpecPatch,omitempty"` +} + +func (wfs *WorkflowSpec) HasPodSpecPatch() bool { + return wfs.PodSpecPatch != "" } // Template is a reusable and composable unit of execution in a workflow @@ -351,6 +359,10 @@ type Template struct { // Optional: Defaults to empty. See type description for default values of each field. // +optional SecurityContext *apiv1.PodSecurityContext `json:"securityContext,omitempty"` + + // PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of + // container fields which are not strings (e.g. resource limits). + PodSpecPatch string `json:"podSpecPatch,omitempty"` } var _ TemplateHolder = &Template{} @@ -374,6 +386,10 @@ func (tmpl *Template) GetBaseTemplate() *Template { return baseTemplate } +func (tmpl *Template) HasPodSpecPatch() bool { + return tmpl.PodSpecPatch != "" +} + // Inputs are the mechanism for passing parameters, artifacts, volumes from one template to another type Inputs struct { // Parameters are a list of parameters passed as inputs diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 5731552bd662..02461ea08a9f 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1128,10 +1128,10 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat } newTmplCtx, basedTmpl, err := woc.getResolvedTemplate(node, orgTmpl, tmplCtx, args) + if err != nil { return woc.initializeNodeOrMarkError(node, nodeName, wfv1.NodeTypeSkipped, orgTmpl, boundaryID, err), err } - localParams := make(map[string]string) if basedTmpl.IsPodType() { localParams[common.LocalVarPodName] = woc.wf.NodeID(nodeName) diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 36a63ac3e70e..d60335f8e837 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -12,11 +12,13 @@ import ( "github.com/argoproj/argo/pkg/apis/workflow" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/workflow/common" + "github.com/argoproj/argo/workflow/util" log "github.com/sirupsen/logrus" "github.com/valyala/fasttemplate" apiv1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/utils/pointer" ) @@ -79,6 +81,10 @@ func (woc *wfOperationCtx) getVolumeDockerSock() apiv1.Volume { } } +func (woc *wfOperationCtx) hasPodSpecPatch(tmpl *wfv1.Template) bool { + return woc.wf.Spec.HasPodSpecPatch() || tmpl.HasPodSpecPatch() +} + func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Container, tmpl *wfv1.Template, includeScriptOutput bool) (*apiv1.Pod, error) { nodeID := woc.wf.NodeID(nodeName) woc.log.Debugf("Creating Pod: %s (%s)", nodeName, nodeID) @@ -221,6 +227,45 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont } } + // Apply the patch string from template + if woc.hasPodSpecPatch(tmpl) { + jsonstr, err := json.Marshal(pod.Spec) + if err != nil { + return nil, errors.Wrap(err, "", "Fail to marshal the Pod spec") + } + + tmpl.PodSpecPatch, err = util.PodSpecPatchMerge(woc.wf, tmpl) + + if err != nil { + return nil, errors.Wrap(err, "", "Fail to marshal the Pod spec") + } + + // Final substitution for workflow level PodSpecPatch + localParams := make(map[string]string) + if tmpl.IsPodType() { + localParams[common.LocalVarPodName] = woc.wf.NodeID(nodeName) + } + tmpl, err := common.ProcessArgs(tmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false) + if err != nil { + return nil, errors.Wrap(err, "", "Fail to substitute the PodSpecPatch variables") + } + + var spec apiv1.PodSpec + + if !util.ValidateJsonStr(tmpl.PodSpecPatch, spec) { + return nil, errors.New("", "Invalid PodSpecPatch String") + } + + modJson, err := strategicpatch.StrategicMergePatch(jsonstr, []byte(tmpl.PodSpecPatch), apiv1.PodSpec{}) + + if err != nil { + return nil, errors.Wrap(err, "", "Error occurred during strategic merge patch") + } + err = json.Unmarshal(modJson, &pod.Spec) + if err != nil { + return nil, errors.Wrap(err, "", "Error in Unmarshalling after merge the patch") + } + } created, err := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.ObjectMeta.Namespace).Create(pod) if err != nil { if apierr.IsAlreadyExists(err) { diff --git a/workflow/controller/workflowpod_test.go b/workflow/controller/workflowpod_test.go index a6e7e5c11176..2d5aa7ca9fa8 100644 --- a/workflow/controller/workflowpod_test.go +++ b/workflow/controller/workflowpod_test.go @@ -10,10 +10,10 @@ import ( wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/workflow/common" - "sigs.k8s.io/yaml" "github.com/stretchr/testify/assert" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/yaml" ) func unmarshalTemplate(yamlStr string) *wfv1.Template { @@ -106,15 +106,16 @@ script: source: | ls -al ` + // TestScriptTemplateWithVolume ensure we can a script pod with input artifacts func TestScriptTemplateWithoutVolumeOptionalArtifact(t *testing.T) { volumeMount := apiv1.VolumeMount{ - Name: "input-artifacts", - ReadOnly: false, - MountPath: "/manifest", - SubPath: "manifest", + Name: "input-artifacts", + ReadOnly: false, + MountPath: "/manifest", + SubPath: "manifest", MountPropagation: nil, - SubPathExpr: "", + SubPathExpr: "", } // Ensure that volume mount is added when artifact is provided @@ -880,3 +881,80 @@ func TestTmplLevelSecurityContext(t *testing.T) { assert.NotNil(t, pod.Spec.SecurityContext) assert.Equal(t, runAsUser, *pod.Spec.SecurityContext.RunAsUser) } + +var helloWorldWfWithPatch = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: hello-world +spec: + entrypoint: whalesay + templates: + - name: whalesay + podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "800m"}}}]}' + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] +` + +var helloWorldWfWithWFPatch = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: hello-world +spec: + entrypoint: whalesay + podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "800m"}}}]}' + templates: + - name: whalesay + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] +` + +var helloWorldWfWithWFYAMLPatch = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: hello-world +spec: + entrypoint: whalesay + podSpecPatch: | + containers: + - name: main + resources: + limits: + cpu: "800m" + templates: + - name: whalesay + podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"memory": "100Mi"}}}]}' + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] +` + +func TestPodSpecPatch(t *testing.T) { + wf := unmarshalWF(helloWorldWfWithPatch) + woc := newWoc(*wf) + mainCtr := woc.wf.Spec.Templates[0].Container + pod, _ := woc.createWorkflowPod(wf.Name, *mainCtr, &wf.Spec.Templates[0], false) + assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String()) + + wf = unmarshalWF(helloWorldWfWithWFPatch) + woc = newWoc(*wf) + mainCtr = woc.wf.Spec.Templates[0].Container + pod, _ = woc.createWorkflowPod(wf.Name, *mainCtr, &wf.Spec.Templates[0], false) + assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String()) + + wf = unmarshalWF(helloWorldWfWithWFYAMLPatch) + woc = newWoc(*wf) + mainCtr = woc.wf.Spec.Templates[0].Container + pod, _ = woc.createWorkflowPod(wf.Name, *mainCtr, &wf.Spec.Templates[0], false) + + assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String()) + assert.Equal(t, "104857600", pod.Spec.Containers[1].Resources.Limits.Memory().AsDec().String()) + +} diff --git a/workflow/util/util.go b/workflow/util/util.go index decb313fa586..b279f9b10c77 100644 --- a/workflow/util/util.go +++ b/workflow/util/util.go @@ -13,6 +13,7 @@ import ( "time" log "github.com/sirupsen/logrus" + apiv1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -21,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/client-go/informers/internalinterfaces" @@ -669,3 +671,56 @@ func ReadManifest(manifestPaths ...string) ([][]byte, error) { } return manifestContents, err } + +func IsJSONStr(str string) bool { + str = strings.TrimSpace(str) + return len(str) > 0 && str[0] == '{' +} + +func ConvertYAMLToJSON(str string) (string, error) { + if !IsJSONStr(str) { + jsonStr, err := yaml.YAMLToJSON([]byte(str)) + if err != nil { + return str, err + } + return string(jsonStr), nil + } + return str, nil +} + +// PodSpecPatchMerge will do strategic merge the workflow level PodSpecPatch and template level PodSpecPatch +func PodSpecPatchMerge(wf *wfv1.Workflow, tmpl *wfv1.Template) (string, error) { + var wfPatch, tmplPatch, mergedPatch string + var err error + + if wf.Spec.HasPodSpecPatch() { + wfPatch, err = ConvertYAMLToJSON(wf.Spec.PodSpecPatch) + if err != nil { + return "", err + } + } + if tmpl.HasPodSpecPatch() { + tmplPatch, err = ConvertYAMLToJSON(tmpl.PodSpecPatch) + if err != nil { + return "", err + } + + if wfPatch != "" { + mergedByte, err := strategicpatch.StrategicMergePatch([]byte(wfPatch), []byte(tmplPatch), apiv1.PodSpec{}) + if err != nil { + return "", err + } + mergedPatch = string(mergedByte) + } else { + mergedPatch = tmplPatch + } + } else { + mergedPatch = wfPatch + } + return mergedPatch, nil +} + +func ValidateJsonStr(jsonStr string, schema interface{}) bool { + err := json.Unmarshal([]byte(jsonStr), &schema) + return err == nil +} diff --git a/workflow/util/util_test.go b/workflow/util/util_test.go index e76f76a7a426..f10b365bda52 100644 --- a/workflow/util/util_test.go +++ b/workflow/util/util_test.go @@ -1,16 +1,17 @@ package util import ( + "encoding/json" "io/ioutil" "os" "path/filepath" "testing" - "github.com/ghodss/yaml" - wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" fakeClientset "github.com/argoproj/argo/pkg/client/clientset/versioned/fake" + "github.com/ghodss/yaml" "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -168,3 +169,29 @@ func unmarshalWF(yamlStr string) *wfv1.Workflow { } return &wf } + +var yamlStr = ` +containers: + - name: main + resources: + limits: + cpu: 1000m +` + +func TestPodSpecPatchMerge(t *testing.T) { + tmpl := wfv1.Template{PodSpecPatch: "{\"containers\":[{\"name\":\"main\", \"resources\":{\"limits\":{\"cpu\": \"1000m\"}}}]}"} + wf := wfv1.Workflow{Spec: wfv1.WorkflowSpec{PodSpecPatch: "{\"containers\":[{\"name\":\"main\", \"resources\":{\"limits\":{\"memory\": \"100Mi\"}}}]}"}} + merged, _ := PodSpecPatchMerge(&wf, &tmpl) + var spec v1.PodSpec + json.Unmarshal([]byte(merged), &spec) + assert.Equal(t, "1.000", spec.Containers[0].Resources.Limits.Cpu().AsDec().String()) + assert.Equal(t, "104857600", spec.Containers[0].Resources.Limits.Memory().AsDec().String()) + + tmpl = wfv1.Template{PodSpecPatch: yamlStr} + wf = wfv1.Workflow{Spec: wfv1.WorkflowSpec{PodSpecPatch: "{\"containers\":[{\"name\":\"main\", \"resources\":{\"limits\":{\"memory\": \"100Mi\"}}}]}"}} + merged, _ = PodSpecPatchMerge(&wf, &tmpl) + json.Unmarshal([]byte(merged), &spec) + assert.Equal(t, "1.000", spec.Containers[0].Resources.Limits.Cpu().AsDec().String()) + assert.Equal(t, "104857600", spec.Containers[0].Resources.Limits.Memory().AsDec().String()) + +}