Skip to content

Commit

Permalink
Support for using volumes within multiple steps in a workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
jessesuen committed Nov 1, 2017
1 parent 4b4dc4a commit be3ad92
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 12 deletions.
7 changes: 4 additions & 3 deletions api/workflow/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ type WorkflowList struct {
}

type WorkflowSpec struct {
Templates []Template `json:"templates"`
Entrypoint string `json:"entrypoint"`
Arguments Arguments `json:"arguments,omitempty"`
Templates []Template `json:"templates"`
Entrypoint string `json:"entrypoint"`
Arguments Arguments `json:"arguments,omitempty"`
Volumes []corev1.Volume `json:"volumes,omitempty"`
}

type Template struct {
Expand Down
48 changes: 48 additions & 0 deletions examples/volumes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: workdir
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi

---
apiVersion: argoproj.io/v1
kind: Workflow
metadata:
generateName: argo-wf-
spec:
entrypoint: volume-example
volumes:
- name: workdir
persistentVolumeClaim:
claimName: workdir
templates:
-
name: volume-example
steps:
- COWSAY:
template: cowsay
- PRINT:
template: print-message
-
name: cowsay
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt"]
volumeMounts:
- name: workdir
mountPath: /mnt/vol
-
name: print-message
container:
image: alpine:latest
command: [sh, -c]
args: ["echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt"]
volumeMounts:
- name: workdir
mountPath: /mnt/vol
47 changes: 38 additions & 9 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ func envFromField(envVarName, fieldPath string) corev1.EnvVar {

func (woc *wfOperationCtx) createWorkflowPod(nodeName string, tmpl *wfv1.Template) error {
woc.log.Infof("Creating Pod: %s", nodeName)
initCtr, err := woc.newInitContainer(tmpl)
if err != nil {
return err
}
waitCtr, err := woc.newWaitContainer(tmpl)
if err != nil {
return err
Expand Down Expand Up @@ -126,9 +122,6 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, tmpl *wfv1.Templat
},
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
InitContainers: []corev1.Container{
*initCtr,
},
Containers: []corev1.Container{
*waitCtr,
mainCtr,
Expand All @@ -139,6 +132,18 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, tmpl *wfv1.Templat
},
},
}

// Add init container only if it needs input artifacts
if len(mainCtrTmpl.Inputs.Artifacts) > 0 {
initCtr := woc.newInitContainer(tmpl)
pod.Spec.InitContainers = []corev1.Container{initCtr}
}

err = woc.addVolumeReferences(&pod, mainCtrTmpl)
if err != nil {
return err
}

err = addInputArtifactsVolumes(&pod, mainCtrTmpl)
if err != nil {
return err
Expand Down Expand Up @@ -172,15 +177,15 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, tmpl *wfv1.Templat
return nil
}

func (woc *wfOperationCtx) newInitContainer(tmpl *wfv1.Template) (*corev1.Container, error) {
func (woc *wfOperationCtx) newInitContainer(tmpl *wfv1.Template) corev1.Container {
ctr := woc.newExecContainer(common.InitContainerName, false)
ctr.Command = []string{"sh", "-c"}
argoExecCmd := fmt.Sprintf("echo sleeping; cat %s; sleep 10; find /argo; echo done", common.PodMetadataAnnotationsPath)
ctr.Args = []string{argoExecCmd}
ctr.VolumeMounts = []corev1.VolumeMount{
volumeMountPodMetadata,
}
return ctr, nil
return *ctr
}

func (woc *wfOperationCtx) newWaitContainer(tmpl *wfv1.Template) (*corev1.Container, error) {
Expand Down Expand Up @@ -217,6 +222,30 @@ func (woc *wfOperationCtx) newExecContainer(name string, privileged bool) *corev
return &exec
}

// addVolumeReferences adds any volumeMounts that a container referencing, to the pod spec
func (woc *wfOperationCtx) addVolumeReferences(pod *corev1.Pod, tmpl *wfv1.Template) error {
for _, volMnt := range tmpl.Container.VolumeMounts {
vol := getVolByName(volMnt.Name, woc.wf.Spec.Volumes)
if vol == nil {
return errors.Errorf(errors.CodeBadRequest, "volume '%s' not found in workflow spec", volMnt.Name)
}
if len(pod.Spec.Volumes) == 0 {
pod.Spec.Volumes = make([]corev1.Volume, 0)
}
pod.Spec.Volumes = append(pod.Spec.Volumes, *vol)
}
return nil
}

func getVolByName(name string, vols []corev1.Volume) *corev1.Volume {
for _, vol := range vols {
if vol.Name == name {
return &vol
}
}
return nil
}

// addInputArtifactVolumes sets up the artifacts volume to the pod if the user's container requires input artifacts.
// To support input artifacts, the init container shares a empty dir volume with the main container.
// It is the responsibility of the init container to load all artifacts to the mounted emptydir location.
Expand Down

0 comments on commit be3ad92

Please sign in to comment.