Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PodSpecPatch functionality #1687

Merged
merged 10 commits into from
Oct 21, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactored as per Jesse comments
  • Loading branch information
sarabala1979 committed Oct 18, 2019
commit 418459a98b39f445a27aa12eef0fbc9b008363ff
4 changes: 2 additions & 2 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@
"format": "int64"
},
"podSpecPatch": {
"description": "PodSpecPatch holds json patch string to merge on Pod spec. Controller is using StrategicMergePatch to merge it.",
"description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of container fields which are not strings (e.g. resource limits).",
"type": "string"
},
"priority": {
Expand Down Expand Up @@ -1494,7 +1494,7 @@
"type": "string"
},
"podSpecPatch": {
"description": "PodSpecPatch holds json patch string to merge on Pod spec. Controller is using StrategicMergePatch to merge it.",
"description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of container fields which are not strings (e.g. resource limits).",
"type": "string"
},
"priority": {
Expand Down
25 changes: 25 additions & 0 deletions examples/pod-spec-patch-wf-tmpl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: pod-spec-patch-
spec:
entrypoint: whalesay
arguments:
parameters:
- name: cpu-limit
value: 100m
- name: mem-limit
value: 100Mi
podSpecPatch: |
containers:
- name: main
resources:
limits:
memory: "{{workflow.parameters.mem-limit}}"
templates:
- name: whalesay
podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "{{workflow.parameters.cpu-limit}}" }}}]}'
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
5 changes: 2 additions & 3 deletions examples/pod-spec-patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ spec:
entrypoint: whalesay
arguments:
parameters:
- name: message
- name: cpu-limit
value: 100m

templates:
- name: whalesay
podspecpatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "{{workflow.parameters.message}}"}}}]}'
podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "{{workflow.parameters.cpu-limit}}" }}}]}'
container:
image: docker/whalesay:latest
command: [cowsay]
Expand Down
22 changes: 22 additions & 0 deletions examples/pod-spec-yaml-patch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: pod-spec-patch-
spec:
entrypoint: whalesay
arguments:
parameters:
- name: mem-limit
value: 100Mi
podSpecPatch: |
containers:
- name: main
resources:
limits:
memory: "{{workflow.parameters.mem-limit}}"
templates:
- name: whalesay
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
4 changes: 2 additions & 2 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 14 additions & 2 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,16 @@ type WorkflowSpec struct {
// +optional
SecurityContext *apiv1.PodSecurityContext `json:"securityContext,omitempty"`

// PodSpecPatch holds json patch string to merge on Pod spec. Controller is using StrategicMergePatch to merge it.
// PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of
// container fields which are not strings (e.g. resource limits).
PodSpecPatch string `json:"podSpecPatch,omitempty"`
}


func (wfs *WorkflowSpec) HasPodSpecPatch() bool {
return wfs.PodSpecPatch != ""
}

// Template is a reusable and composable unit of execution in a workflow
type Template struct {
// Name is the name of the template
Expand Down Expand Up @@ -355,7 +361,8 @@ type Template struct {
// +optional
SecurityContext *apiv1.PodSecurityContext `json:"securityContext,omitempty"`

// PodSpecPatch holds json patch string to merge on Pod spec. Controller is using StrategicMergePatch to merge it.
// PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of
// container fields which are not strings (e.g. resource limits).
PodSpecPatch string `json:"podSpecPatch,omitempty"`
}

Expand All @@ -380,6 +387,11 @@ func (tmpl *Template) GetBaseTemplate() *Template {
return baseTemplate
}

func (tmpl *Template) HasPodSpecPatch() bool {
return tmpl.PodSpecPatch != ""
}


// Inputs are the mechanism for passing parameters, artifacts, volumes from one template to another
type Inputs struct {
// Parameters are a list of parameters passed as inputs
Expand Down
9 changes: 9 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,10 +1128,19 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
}

newTmplCtx, basedTmpl, err := woc.getResolvedTemplate(node, orgTmpl, tmplCtx, args)

if err != nil {
return woc.initializeNodeOrMarkError(node, nodeName, wfv1.NodeTypeSkipped, orgTmpl, boundaryID, err), err
}

if woc.wf.Spec.HasPodSpecPatch() {
err = util.PodSpecPatchMerge(woc.wf, basedTmpl)
sarabala1979 marked this conversation as resolved.
Show resolved Hide resolved
fmt.Println(basedTmpl.PodSpecPatch)
sarabala1979 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return woc.initializeNodeOrMarkError(node, nodeName, wfv1.NodeTypeSkipped, orgTmpl, boundaryID, err), err
}
}

localParams := make(map[string]string)
if basedTmpl.IsPodType() {
localParams[common.LocalVarPodName] = woc.wf.NodeID(nodeName)
Expand Down
23 changes: 9 additions & 14 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/argoproj/argo/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/util"
log "github.com/sirupsen/logrus"
"github.com/valyala/fasttemplate"
apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -80,6 +81,10 @@ func (woc *wfOperationCtx) getVolumeDockerSock() apiv1.Volume {
}
}

func (woc *wfOperationCtx) hasPodSpecPatch(tmpl *wfv1.Template) bool {
return woc.wf.Spec.HasPodSpecPatch() || tmpl.HasPodSpecPatch()
}

func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Container, tmpl *wfv1.Template, includeScriptOutput bool) (*apiv1.Pod, error) {
nodeID := woc.wf.NodeID(nodeName)
woc.log.Debugf("Creating Pod: %s (%s)", nodeName, nodeID)
Expand Down Expand Up @@ -223,32 +228,22 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
}

// Apply the patch string from template
if woc.wf.Spec.PodSpecPatch != "" || tmpl.PodSpecPatch != "" {

if tmpl.PodSpecPatch == "" {
tmpl.PodSpecPatch = woc.wf.Spec.PodSpecPatch
pod, err = substitutePodParams(pod, woc.globalParams, tmpl)
if err != nil {
return nil, err
}
}
if woc.hasPodSpecPatch(tmpl) {
jsonstr, err := json.Marshal(pod.Spec)
sarabala1979 marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
return nil, errors.Wrap(err, "", "Fail to marshal the Pod spec")
}

var spec apiv1.PodSpec
err = json.Unmarshal([]byte(tmpl.PodSpecPatch), &spec)

if err != nil {
return nil, errors.Wrap(err, "", "Invalid PodSpecPatch String")
if !util.ValidateJsonStr(tmpl.PodSpecPatch, spec) {
return nil, errors.New("", "Invalid PodSpecPatch String")
}

modJson, err := strategicpatch.StrategicMergePatch(jsonstr, []byte(tmpl.PodSpecPatch), apiv1.PodSpec{})

if err != nil {
return nil, errors.Wrap(err, "", "Error occured during strategicpatch")
return nil, errors.Wrap(err, "", "Error occurred during strategic merge patch")
}
err = json.Unmarshal(modJson, &pod.Spec)
if err != nil {
Expand Down
55 changes: 55 additions & 0 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"time"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/strategicpatch"
apiv1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -669,3 +671,56 @@ func ReadManifest(manifestPaths ...string) ([][]byte, error) {
}
return manifestContents, err
}

func IsJsonStr(str string) bool {
sarabala1979 marked this conversation as resolved.
Show resolved Hide resolved
str = strings.TrimSpace(str)
return len(str) > 0 && str[0] == '{'
}

func ConvertYAMLToJSON(str string) (string, error) {
if !IsJsonStr(str) {
jsonStr, err := yaml.YAMLToJSON([]byte(str))
if err != nil {
return str, err
}
return string(jsonStr), nil
}
return str, nil
}

// PodSpecPatchMerge will do strategic merge the workflow level PodSpecPatch and template level PodSpecPatch
func PodSpecPatchMerge(wf *wfv1.Workflow, tmpl *wfv1.Template) error {
sarabala1979 marked this conversation as resolved.
Show resolved Hide resolved
var wfPatch, tmplPatch, mergedPatch string
var err error

if wf.Spec.HasPodSpecPatch() {
wfPatch, err = ConvertYAMLToJSON(wf.Spec.PodSpecPatch)
if err != nil {
return err
}
}
if tmpl.HasPodSpecPatch() {
tmplPatch, err = ConvertYAMLToJSON(tmpl.PodSpecPatch)
if err != nil {
return err
}
mergedByte, err := strategicpatch.StrategicMergePatch([]byte(wfPatch), []byte(tmplPatch), apiv1.PodSpec{})
if err != nil {
return err
}
mergedPatch = string(mergedByte)
} else {
mergedPatch = wfPatch
}
tmpl.PodSpecPatch = mergedPatch
sarabala1979 marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

func ValidateJsonStr(jsonStr string, schema interface{}) bool {
err := json.Unmarshal([]byte(jsonStr), &schema)
if err != nil {
return false
}
return true

}