Skip to content

Commit

Permalink
Add ability to reference global parameters in spec level fields (reso…
Browse files Browse the repository at this point in the history
…lves argoproj#749)
  • Loading branch information
jessesuen committed Mar 5, 2018
1 parent cd73a9c commit a3441d3
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 33 deletions.
7 changes: 7 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,13 @@
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.Template"
}
},
"tolerations": {
"description": "Tolerations to apply to workflow pods.",
"type": "array",
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.Toleration"
}
},
"volumeClaimTemplates": {
"description": "VolumeClaimTemplates is a list of claims that containers are allowed to reference. The Workflow controller will create the claims at the beginning of the workflow and delete the claims upon completion of the workflow",
"type": "array",
Expand Down
15 changes: 14 additions & 1 deletion pkg/apis/workflow/v1alpha1/openapi_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,19 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA
Ref: ref("k8s.io/api/core/v1.Affinity"),
},
},
"tolerations": {
SchemaProps: spec.SchemaProps{
Description: "Tolerations to apply to workflow pods.",
Type: []string{"array"},
Items: &spec.SchemaOrArray{
Schema: &spec.Schema{
SchemaProps: spec.SchemaProps{
Ref: ref("k8s.io/api/core/v1.Toleration"),
},
},
},
},
},
"imagePullSecrets": {
SchemaProps: spec.SchemaProps{
Description: "ImagePullSecrets is a list of references to secrets in the same namespace to use for pulling any images in pods that reference this ServiceAccount. ImagePullSecrets are distinct from Secrets because Secrets can be mounted in the pod, but ImagePullSecrets are only accessed by the kubelet. More info: https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod",
Expand All @@ -1532,7 +1545,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA
},
},
Dependencies: []string{
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Arguments", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Template", "k8s.io/api/core/v1.Affinity", "k8s.io/api/core/v1.LocalObjectReference", "k8s.io/api/core/v1.PersistentVolumeClaim", "k8s.io/api/core/v1.Volume"},
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Arguments", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Template", "k8s.io/api/core/v1.Affinity", "k8s.io/api/core/v1.LocalObjectReference", "k8s.io/api/core/v1.PersistentVolumeClaim", "k8s.io/api/core/v1.Toleration", "k8s.io/api/core/v1.Volume"},
},
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.WorkflowStep": {
Schema: spec.Schema{
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/workflow/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ type WorkflowSpec struct {
// Can be overridden by an affinity specified in the template
Affinity *apiv1.Affinity `json:"affinity,omitempty"`

// Tolerations to apply to workflow pods.
Tolerations []apiv1.Toleration `json:"tolerations,omitempty"`

// ImagePullSecrets is a list of references to secrets in the same namespace to use for pulling any images
// in pods that reference this ServiceAccount. ImagePullSecrets are distinct from Secrets because Secrets
// can be mounted in the pod, but ImagePullSecrets are only accessed by the kubelet.
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,13 @@ func (in *WorkflowSpec) DeepCopyInto(out *WorkflowSpec) {
(*in).DeepCopyInto(*out)
}
}
if in.Tolerations != nil {
in, out := &in.Tolerations, &out.Tolerations
*out = make([]v1.Toleration, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.ImagePullSecrets != nil {
in, out := &in.ImagePullSecrets, &out.ImagePullSecrets
*out = make([]v1.LocalObjectReference, len(*in))
Expand Down
9 changes: 4 additions & 5 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,10 @@ func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOper
return &woc
}

// operate is the main operator logic of a workflow.
// It evaluates the current state of the workflow, and its pods
// and decides how to proceed down the execution path.
// TODO: an error returned by this method should result in requeing the
// workflow to be retried at a later time
// operate is the main operator logic of a workflow. It evaluates the current state of the workflow,
// and its pods and decides how to proceed down the execution path.
// TODO: an error returned by this method should result in requeing the workflow to be retried at a
// later time
func (woc *wfOperationCtx) operate() {
defer woc.persistUpdates()
defer func() {
Expand Down
58 changes: 58 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,3 +652,61 @@ func TestSuspendTemplate(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, 1, len(pods.Items))
}

var volumeWithParam = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: volume-with-param
spec:
entrypoint: append-to-accesslog
arguments:
parameters:
- name: volname
value: my-volume
- name: node-selctor
value: my-node
nodeSelector:
kubernetes.io/hostname: my-host
volumes:
- name: workdir
persistentVolumeClaim:
claimName: "{{workflow.parameters.volname}}"
templates:
- name: append-to-accesslog
container:
image: alpine:latest
command: [sh, -c]
args: ["echo accessed at: $(date) | tee -a /mnt/vol/accesslog"]
volumeMounts:
- name: workdir
mountPath: /mnt/vol
`

// Tests ability to reference workflow parameters from within top level spec fields (e.g. spec.volumes)
func TestWorkflowSpecParam(t *testing.T) {
controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

wf := unmarshalWF(volumeWithParam)
wf, err := wfcset.Create(wf)
assert.Nil(t, err)
woc := newWorkflowOperationCtx(wf, controller)

woc.operate()
pod, err := controller.kubeclientset.CoreV1().Pods("").Get(wf.Name, metav1.GetOptions{})
assert.Nil(t, err)
found := false
for _, vol := range pod.Spec.Volumes {
if vol.Name == "workdir" {
assert.Equal(t, "my-volume", vol.PersistentVolumeClaim.ClaimName)
found = true
}
}
assert.True(t, found)

assert.Equal(t, "my-host", pod.Spec.NodeSelector["kubernetes.io/hostname"])
}
84 changes: 57 additions & 27 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
nodeID := woc.wf.NodeID(nodeName)
woc.log.Debugf("Creating Pod: %s (%s)", nodeName, nodeID)
tmpl = tmpl.DeepCopy()
wfSpec, err := substituteGlobals(&woc.wf.Spec, woc.globalParams)
if err != nil {
return nil, err
}
mainCtr.Name = common.MainContainerName
pod := apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -133,9 +137,9 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
volumeDockerSock,
},
ActiveDeadlineSeconds: tmpl.ActiveDeadlineSeconds,
ServiceAccountName: woc.wf.Spec.ServiceAccountName,
ImagePullSecrets: woc.wf.Spec.ImagePullSecrets,
Tolerations: tmpl.Tolerations,
// TODO: consider allowing service account and image pull secrets to reference global vars
ServiceAccountName: woc.wf.Spec.ServiceAccountName,
ImagePullSecrets: woc.wf.Spec.ImagePullSecrets,
},
}
if woc.controller.Config.InstanceID != "" {
Expand All @@ -160,9 +164,9 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
pod.Spec.InitContainers = []apiv1.Container{initCtr}
}

woc.addSchedulingConstraints(&pod, tmpl)
addSchedulingConstraints(&pod, wfSpec, tmpl)

err := woc.addVolumeReferences(&pod, tmpl)
err = addVolumeReferences(&pod, wfSpec, tmpl, woc.wf.Status.PersistentVolumeClaims)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -218,6 +222,25 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
return created, nil
}

// substituteGlobals returns a workflow spec with global parameter references substituted
func substituteGlobals(wfSpec *wfv1.WorkflowSpec, globalParams map[string]string) (*wfv1.WorkflowSpec, error) {
specBytes, err := json.Marshal(wfSpec)
if err != nil {
return nil, err
}
fstTmpl := fasttemplate.New(string(specBytes), "{{", "}}")
newSpecBytes, err := common.Replace(fstTmpl, globalParams, true)
if err != nil {
return nil, err
}
var newWfSpec wfv1.WorkflowSpec
err = json.Unmarshal([]byte(newSpecBytes), &newWfSpec)
if err != nil {
return nil, errors.InternalWrapError(err)
}
return &newWfSpec, nil
}

func (woc *wfOperationCtx) newInitContainer(tmpl *wfv1.Template) apiv1.Container {
ctr := woc.newExecContainer(common.InitContainerName, false)
ctr.Command = []string{"argoexec"}
Expand Down Expand Up @@ -256,30 +279,52 @@ func (woc *wfOperationCtx) newExecContainer(name string, privileged bool) *apiv1
}

// addSchedulingConstraints applies any node selectors or affinity rules to the pod, either set in the workflow or the template
func (woc *wfOperationCtx) addSchedulingConstraints(pod *apiv1.Pod, tmpl *wfv1.Template) {
func addSchedulingConstraints(pod *apiv1.Pod, wfSpec *wfv1.WorkflowSpec, tmpl *wfv1.Template) {
// Set nodeSelector (if specified)
if len(tmpl.NodeSelector) > 0 {
pod.Spec.NodeSelector = tmpl.NodeSelector
} else if len(woc.wf.Spec.NodeSelector) > 0 {
pod.Spec.NodeSelector = woc.wf.Spec.NodeSelector
} else if len(wfSpec.NodeSelector) > 0 {
pod.Spec.NodeSelector = wfSpec.NodeSelector
}
// Set affinity (if specified)
if tmpl.Affinity != nil {
pod.Spec.Affinity = tmpl.Affinity
} else if woc.wf.Spec.Affinity != nil {
pod.Spec.Affinity = woc.wf.Spec.Affinity
} else if wfSpec.Affinity != nil {
pod.Spec.Affinity = wfSpec.Affinity
}
// Set tolerations (if specified)
if len(tmpl.Tolerations) > 0 {
pod.Spec.Tolerations = tmpl.Tolerations
} else if len(wfSpec.Tolerations) > 0 {
pod.Spec.Tolerations = wfSpec.Tolerations
}
}

// addVolumeReferences adds any volumeMounts that a container/sidecar is referencing, to the pod.spec.volumes
// These are either specified in the workflow.spec.volumes or the workflow.spec.volumeClaimTemplate section
func (woc *wfOperationCtx) addVolumeReferences(pod *apiv1.Pod, tmpl *wfv1.Template) error {
func addVolumeReferences(pod *apiv1.Pod, wfSpec *wfv1.WorkflowSpec, tmpl *wfv1.Template, pvcs []apiv1.Volume) error {
if tmpl.Container == nil && len(tmpl.Sidecars) == 0 {
return nil
}

// getVolByName is a helper to retrieve a volume by its name, either from the volumes or claims section
getVolByName := func(name string) *apiv1.Volume {
for _, vol := range wfSpec.Volumes {
if vol.Name == name {
return &vol
}
}
for _, pvc := range pvcs {
if pvc.Name == name {
return &pvc
}
}
return nil
}

addVolumeRef := func(volMounts []apiv1.VolumeMount) error {
for _, volMnt := range volMounts {
vol := getVolByName(volMnt.Name, woc.wf)
vol := getVolByName(volMnt.Name)
if vol == nil {
return errors.Errorf(errors.CodeBadRequest, "volume '%s' not found in workflow spec", volMnt.Name)
}
Expand Down Expand Up @@ -314,21 +359,6 @@ func (woc *wfOperationCtx) addVolumeReferences(pod *apiv1.Pod, tmpl *wfv1.Templa
return nil
}

// getVolByName is a helper to retrieve a volume by its name, either from the volumes or claims section
func getVolByName(name string, wf *wfv1.Workflow) *apiv1.Volume {
for _, vol := range wf.Spec.Volumes {
if vol.Name == name {
return &vol
}
}
for _, pvc := range wf.Status.PersistentVolumeClaims {
if pvc.Name == name {
return &pvc
}
}
return nil
}

// addInputArtifactVolumes sets up the artifacts volume to the pod to support input artifacts to containers.
// In order support input artifacts, the init container shares a emptydir volume with the main container.
// It is the responsibility of the init container to load all artifacts to the mounted emptydir location.
Expand Down

0 comments on commit a3441d3

Please sign in to comment.