Skip to content

Commit

Permalink
Annotate the template used by executor to include destination artifac…
Browse files Browse the repository at this point in the history
…t information
  • Loading branch information
jessesuen committed Oct 27, 2017
1 parent 52f8db2 commit 2058342
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 12 deletions.
13 changes: 13 additions & 0 deletions api/workflow/v1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,16 @@ func (wf *Workflow) NodeID(name string) string {
h.Write([]byte(name))
return fmt.Sprintf("%s-%v", wf.ObjectMeta.Name, h.Sum32())
}

func (t *Template) DeepCopy() *Template {
tBytes, err := json.Marshal(t)
if err != nil {
panic(err)
}
var copy Template
err = json.Unmarshal(tBytes, &copy)
if err != nil {
panic(err)
}
return &copy
}
16 changes: 16 additions & 0 deletions examples/output-artifact.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: argoproj.io/v1
kind: Workflow
metadata:
generateName: argo-wf-
spec:
entrypoint: cowsay
templates:
- name: cowsay
type: container
image: docker/whalesay:latest
command: [sh, -c]
args: ["cowsay hello world | tee /tmp/hello_world.txt"]
outputs:
artifacts:
MESSAGE:
path: /tmp/hello_world.txt
File renamed without changes.
59 changes: 47 additions & 12 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,10 @@ func (wfc *WorkflowController) createWorkflowPod(wf *wfv1.Workflow, nodeName str
if err != nil {
return err
}
mainCtr := tmpl.Container.DeepCopy()
mainCtr.Name = common.MainContainerName
mainCtrTmpl := tmpl.DeepCopy()
mainCtrTmpl.Name = common.MainContainerName
t := true

tmplBytes, err := json.Marshal(tmpl)
if err != nil {
return err
}

pod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: wf.NodeID(nodeName),
Expand All @@ -107,7 +102,6 @@ func (wfc *WorkflowController) createWorkflowPod(wf *wfv1.Workflow, nodeName str
},
Annotations: map[string]string{
common.AnnotationKeyNodeName: nodeName,
common.AnnotationKeyTemplate: string(tmplBytes),
},
OwnerReferences: []metav1.OwnerReference{
metav1.OwnerReference{
Expand All @@ -126,18 +120,26 @@ func (wfc *WorkflowController) createWorkflowPod(wf *wfv1.Workflow, nodeName str
},
Containers: []corev1.Container{
*waitCtr,
*mainCtr,
*mainCtrTmpl.Container,
},
Volumes: []corev1.Volume{
volumePodMetadata,
volumeDockerLib,
},
},
}
err = addInputArtifactVolumes(&pod, tmpl)
err = addInputArtifactsVolumes(&pod, mainCtrTmpl)
if err != nil {
return err
}
wfc.addOutputArtifactsRepoMetaData(&pod, mainCtrTmpl)

// Set the container template JSON in pod annotations, which executor will look to for artifact
tmplBytes, err := json.Marshal(mainCtrTmpl)
if err != nil {
return err
}
pod.ObjectMeta.Annotations[common.AnnotationKeyTemplate] = string(tmplBytes)

created, err := wfc.podCl.Create(&pod)
if err != nil {
Expand Down Expand Up @@ -200,8 +202,11 @@ func (wfc *WorkflowController) newExecContainer(name string, privileged bool) *c
return &exec
}

// addInputArtifactVolumes adds an artifacts volume to the pod if the template requires input artifacts
func addInputArtifactVolumes(pod *corev1.Pod, tmpl *wfv1.Template) error {
// addInputArtifactVolumes sets up the artifacts volume to the pod if the user's container requires input artifacts.
// To support input artifacts, the init container shares a empty dir volume with the main container.
// It is the responsibility of the init container to load all artifacts to the mounted emptydir location.
// (e.g. /inputs/artifacts/CODE). The shared emptydir is mapped to the correspoding location in the main container.
func addInputArtifactsVolumes(pod *corev1.Pod, tmpl *wfv1.Template) error {
if len(tmpl.Inputs.Artifacts) == 0 {
return nil
}
Expand Down Expand Up @@ -268,3 +273,33 @@ func addInputArtifactVolumes(pod *corev1.Pod, tmpl *wfv1.Template) error {
pod.Spec.Containers[mainCtrIndex] = *mainCtr
return nil
}

// addOutputArtifactsRepoMetaData updates the template with artifact repository information configured in the controller.
// This is skipped for artifacts which have explicitly set an output artifact location in the template
func (wfc *WorkflowController) addOutputArtifactsRepoMetaData(pod *corev1.Pod, tmpl *wfv1.Template) {
for artName, art := range tmpl.Outputs.Artifacts {
if art.Destination != nil {
// The artifact destination was explicitly set in the template. Skip
continue
}
if wfc.Config.ArtifactRepository.S3 != nil {
// artifacts are stored in S3 using the following formula:
// <repo_key_prefix>/<worflow_name>/<node_id>/<artifact_name>
// (e.g. myworkflowartifacts/argo-wf-fhljp/argo-wf-fhljp-123291312382/CODE)
// TODO: will need to support more advanced organization of artifacts such as dated
// (e.g. myworkflowartifacts/2017/10/31/... )
keyPrefix := ""
if wfc.Config.ArtifactRepository.S3.KeyPrefix != "" {
keyPrefix = wfc.Config.ArtifactRepository.S3.KeyPrefix + "/"
}
artLocationKey := fmt.Sprintf("%s%s/%s/%s", keyPrefix, pod.Labels[common.LabelKeyWorkflow], pod.ObjectMeta.Name, artName)
art.Destination = &wfv1.ArtifactDestination{
S3: &wfv1.S3ArtifactDestination{
S3Bucket: wfc.Config.ArtifactRepository.S3.S3Bucket,
Key: artLocationKey,
},
}
}
tmpl.Outputs.Artifacts[artName] = art
}
}

0 comments on commit 2058342

Please sign in to comment.