Skip to content

Commit

Permalink
Argo Executor init container
Browse files Browse the repository at this point in the history
- Argo executor init container first commit
- Init container is responsible for downloading input artifacts for the user container
- Reading the Kubernetes downward api to interpret the workflow template (annotation file)
- Using minio go sdk to support private (minio), aws s3 and google api.
  • Loading branch information
Tianhe Zhang authored and sytianhe committed Nov 7, 2017
1 parent efe4392 commit 621d7ca
Show file tree
Hide file tree
Showing 12 changed files with 306 additions and 53 deletions.
26 changes: 25 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,11 @@
[[constraint]]
name = "github.com/stretchr/testify"
version = "1.1.4"

[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = "1.12.19"

[[constraint]]
name = "github.com/minio/minio-go"
version = "3.0.2"
1 change: 0 additions & 1 deletion api/workflow/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package v1

import (
"fmt"

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down
141 changes: 141 additions & 0 deletions cmd/argoexec/commands/artifacts.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
package commands

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"strings"

wfv1 "github.com/argoproj/argo/api/workflow/v1"
"github.com/argoproj/argo/errors"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/executor"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
//"k8s.io/client-go/tools/clientcmd"
//b64 "encoding/base64"
)

func init() {
Expand All @@ -23,6 +39,131 @@ var artifactsLoadCmd = &cobra.Command{
Run: loadArtifacts,
}

// Open the Kubernetes downward api file and
// read the pod annotation file that contains template and then
// unmarshal the template
func GetTemplateFromPodAnnotations(annotationsPath string, template *wfv1.Template) error {
// Read the annotation file
file, err := os.Open(annotationsPath)
if err != nil {
fmt.Printf("ERROR opening annotation file from %s\n", annotationsPath)
return errors.InternalWrapError(err)
}

defer file.Close()
reader := bufio.NewReader(file)

// Prefix of template property in the annotation file
prefix := fmt.Sprintf("%s=", common.PodAnnotationsTemplatePropertyName)

for {
// Read line-by-line
var buffer bytes.Buffer

var l []byte
var isPrefix bool
for {
l, isPrefix, err = reader.ReadLine()
buffer.Write(l)

// If we've reached the end of the line, stop reading.
if !isPrefix {
break
}

// If we're just at the EOF, break
if err != nil {
break
}
}

// The end of the annotation file
if err == io.EOF {
break
}

line := buffer.String()

// Read template property
if strings.HasPrefix(line, prefix) {
// Trim the prefix
templateContent := strings.TrimPrefix(line, prefix)

// This part is a bit tricky in terms of unmarshalling
// The content in the file will be something like,
// `"{\"type\":\"container\",\"inputs\":{},\"outputs\":{}}"`
// which is required to unmarshal twice

// First unmarshal to a string without escaping characters
var templateString string
err = json.Unmarshal([]byte(templateContent), &templateString)
if err != nil {
fmt.Printf("Error unmarshalling annotation into template string, %s, %v\n", templateContent, err)
return errors.InternalWrapError(err)
}

// Second unmarshal to a template
err = json.Unmarshal([]byte(templateString), template)
if err != nil {
fmt.Printf("Error unmarshalling annotation into template, %s, %v\n", templateString, err)
return errors.InternalWrapError(err)
}
return nil
}
}

if err != io.EOF {
return errors.InternalWrapError(err)
}

// If reaching here, then no template prefix in the file
return errors.Errorf(errors.CodeInternal, "No template property found from annotation file: %s", annotationsPath)
}

func LoadInputArtifacts(template *wfv1.Template) error {
return nil
}

func loadArtifacts(cmd *cobra.Command, args []string) {
podAnnotationsPath := common.PodMetadataAnnotationsPath

// Use the path specified from the flag
if GlobalArgs.podAnnotationsPath != "" {
podAnnotationsPath = GlobalArgs.podAnnotationsPath
}

var wfTemplate wfv1.Template

// Read template
err := GetTemplateFromPodAnnotations(podAnnotationsPath, &wfTemplate)
if err != nil {
fmt.Printf("Error getting template %v\n", err)
os.Exit(1)
}

// Initialize in-cluster Kubernetes client
config, err := rest.InClusterConfig()
//config, err := clientcmd.BuildConfigFromFlags("", "/Users/Tianhe/.kube/cluster_minikube.conf")
if err != nil {
panic(err.Error())
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}

// Initialize workflow executor
wfExecutor := executor.WorkflowExecutor{
Template: wfTemplate,
ClientSet: clientset,
}

// Download input artifacts
err = wfExecutor.LoadArtifacts()
if err != nil {
fmt.Printf("Error downloading input artifacts, %v\n", err)
os.Exit(1)
}

os.Exit(0)
}
6 changes: 3 additions & 3 deletions cmd/argoexec/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var (
argoexec *executor.WorkflowExecutor

// Global CLI flags
globalArgs globalFlags
GlobalArgs globalFlags
)

func init() {
Expand All @@ -40,8 +40,8 @@ type globalFlags struct {
}

func init() {
RootCmd.PersistentFlags().StringVar(&globalArgs.hostIP, "host-ip", common.EnvVarHostIP, fmt.Sprintf("IP of host. (Default: %s)", common.EnvVarHostIP))
RootCmd.PersistentFlags().StringVar(&globalArgs.podAnnotationsPath, "pod-annotations", common.PodMetadataAnnotationsPath, fmt.Sprintf("Pod annotations fiel from k8s downward API. (Default: %s)", common.PodMetadataAnnotationsPath))
RootCmd.PersistentFlags().StringVar(&GlobalArgs.hostIP, "host-ip", common.EnvVarHostIP, fmt.Sprintf("IP of host. (Default: %s)", common.EnvVarHostIP))
RootCmd.PersistentFlags().StringVar(&GlobalArgs.podAnnotationsPath, "pod-annotations", common.PodMetadataAnnotationsPath, fmt.Sprintf("Pod annotations fiel from k8s downward API. (Default: %s)", common.PodMetadataAnnotationsPath))
}

// initExecutor is a helper to initialize the global argoexec instance
Expand Down
6 changes: 3 additions & 3 deletions examples/input-artifact-s3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ spec:
- name: CODE
path: /src
s3:
endpoint: https://storage.googleapis.com
bucket: my-bucket-name
key: path/in/bucket
endpoint: storage.googleapis.com
bucket: tianhe-test
key: testfile
accessKeySecret:
name: my-s3-credentials
key: accessKey
Expand Down
63 changes: 32 additions & 31 deletions examples/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,44 @@ metadata:
name: workflow-controller-configmap
data:
config: |
executorImage: argoproj/argoexec:latest
executorImage: sytianhe/argoexec:latest
artifactRepository:
s3:
bucket: my-bucket
endpoint: https://storage.googleapis.com
accessKeySecret:
name: artifacts-s3-credentials
name: my-s3-credentials
key: accessKey
secretKeySecret:
name: artifacts-s3-credentials
name: my-s3-credentials
key: secretKey
---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: workflow-controller-deployment
spec:
selector:
matchLabels:
app: workflow-controller
template:
metadata:
labels:
app: workflow-controller
spec:
containers:
- name: workflow-controller
image: argoproj/workflow-controller:latest
command: [/bin/workflow-controller]
args: [--configmap, workflow-controller-configmap]

---
apiVersion: v1
kind: Secret
metadata:
name: artifacts-s3-credentials
data:
accessKey: AAABBBCCC01234567890
secretKey: abc123XYZ456ABC123aaabbbcccddd1112223334
#
#---
#apiVersion: apps/v1beta1
#kind: Deployment
#metadata:
# name: workflow-controller-deployment
#spec:
# selector:
# matchLabels:
# app: workflow-controller
# template:
# metadata:
# labels:
# app: workflow-controller
# spec:
# containers:
# - name: workflow-controller
# image: argoproj/workflow-controller:latest
# command: [/bin/workflow-controller]
# args: [--configmap, workflow-controller-configmap]
#
#---
#apiVersion: v1
#kind: Secret
#metadata:
# name: artifacts-s3-credentials
#data:
# accessKey: AAABBBCCC01234567890
# secretKey: abc123XYZ456ABC123aaabbbcccddd1112223334
6 changes: 5 additions & 1 deletion workflow/artifacts/artifacts.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package executor

import (
wfv1 "github.com/argoproj/argo/api/workflow/v1"
)

// ArtifactDriver is the interface for loading and saving of artifacts
type ArtifactDriver interface {
// Load accepts an artifact source URL and places it at path
Load(sourceURL string, path string) error
Load(inputArtifact *wfv1.Artifact) error

// Save uploads the path to a destination URL
Save(path string, destURL string) (string, error)
Expand Down
Loading

0 comments on commit 621d7ca

Please sign in to comment.