Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CSE-11] adding config file loader #10

Merged
merged 2 commits into from
Mar 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ pipeline {
IMAGE_TAG=IMAGE_REF.split(':').last()
GIT_BRANCH = env.BRANCH_NAME.replace('/', '').replace('_', '').replace('-', '')

if (env.BRANCH_NAME == 'master') {
VERSION = env.BUILD_ID}
else {
VERSION = env.BUILD_ID + GIT_BRANCH
}
def baseVersionTag = readFile "VERSION"
baseVersionTag = baseVersionTag.trim();
VERSION = "${baseVersionTag}-cyrus-${GIT_BRANCH}"

println "Version tag for this build is ${VERSION}"
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.2.1
2.2.2.dev1
5 changes: 4 additions & 1 deletion cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func NewRootCommand() *cobra.Command {
var (
clientConfig clientcmd.ClientConfig
configMap string // --configmap
configFile string // --config-file
executorImage string // --executor-image
executorImagePullPolicy string // --executor-image-pull-policy
logLevel string // --loglevel
Expand All @@ -57,6 +58,7 @@ func NewRootCommand() *cobra.Command {
if err != nil {
return err
}

config.Burst = 30
config.QPS = 20.0

Expand All @@ -69,7 +71,7 @@ func NewRootCommand() *cobra.Command {
wflientset := wfclientset.NewForConfigOrDie(config)

// start a controller on instances of our custom resource
wfController := controller.NewWorkflowController(config, kubeclientset, wflientset, namespace, executorImage, executorImagePullPolicy, configMap)
wfController := controller.NewWorkflowController(config, kubeclientset, wflientset, namespace, executorImage, executorImagePullPolicy, configMap, configFile)
err = wfController.ResyncConfig()
if err != nil {
return err
Expand All @@ -91,6 +93,7 @@ func NewRootCommand() *cobra.Command {
clientConfig = cli.AddKubectlFlagsToCmd(&command)
command.AddCommand(cmdutil.NewVersionCmd(CLIName))
command.Flags().StringVar(&configMap, "configmap", "workflow-controller-configmap", "Name of K8s configmap to retrieve workflow controller configuration")
command.Flags().StringVar(&configFile, "config-file", "", "Path to a yaml config file. Cannot be specified at the same time as --configmap")
command.Flags().StringVar(&executorImage, "executor-image", "", "Executor image to use (overrides value in configmap)")
command.Flags().StringVar(&executorImagePullPolicy, "executor-image-pull-policy", "", "Executor imagePullPolicy to use (overrides value in configmap)")
command.Flags().StringVar(&logLevel, "loglevel", "info", "Set the logging level. One of: debug|info|warn|error")
Expand Down
47 changes: 35 additions & 12 deletions workflow/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"context"
"fmt"
"io/ioutil"

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -94,28 +95,50 @@ type GCSArtifactRepository struct {
wfv1.GCSBucket `json:",inline"`
}

// ResyncConfig reloads the controller config from the configmap
// ResyncConfig reloads the controller config from the configmap or configFile
func (wfc *WorkflowController) ResyncConfig() error {
cmClient := wfc.kubeclientset.CoreV1().ConfigMaps(wfc.namespace)
cm, err := cmClient.Get(wfc.configMap, metav1.GetOptions{})
if err != nil {
return errors.InternalWrapError(err)

if wfc.configFile != "" {
log.Infof("Loading configfile from %s", wfc.configFile)
return wfc.updateConfigFromFile(wfc.configFile)
} else {
cmClient := wfc.kubeclientset.CoreV1().ConfigMaps(wfc.namespace)
cm, err := cmClient.Get(wfc.configMap, metav1.GetOptions{})
if err != nil {
return errors.InternalWrapError(err)
}
return wfc.updateConfigFromConfigMap(cm)
}
return wfc.updateConfig(cm)
}

func (wfc *WorkflowController) updateConfig(cm *apiv1.ConfigMap) error {
configStr, ok := cm.Data[common.WorkflowControllerConfigMapKey]
func (wfc *WorkflowController) updateConfigFromConfigMap(cm *apiv1.ConfigMap) error {
configString, ok := cm.Data[common.WorkflowControllerConfigMapKey]
if !ok {
log.Warnf("ConfigMap '%s' does not have key '%s'", wfc.configMap, common.WorkflowControllerConfigMapKey)
return nil
}

return wfc.updateConfig(configString)
}

func (wfc *WorkflowController) updateConfigFromFile(filePath string) error {
fileData, err := ioutil.ReadFile(filePath)
if err != nil {
log.Errorf("Error reading config file %s", filePath)
return err
}
return wfc.updateConfig(string(fileData))

}

func (wfc *WorkflowController) updateConfig(configString string) error {

var config WorkflowControllerConfig
err := yaml.Unmarshal([]byte(configStr), &config)
err := yaml.Unmarshal([]byte(configString), &config)
if err != nil {
return errors.InternalWrapError(err)
}
log.Printf("workflow controller configuration from %s:\n%s", wfc.configMap, configStr)
log.Printf("workflow controller configuration from %s:\n%s", wfc.configMap, configString)
if wfc.cliExecutorImage == "" && config.ExecutorImage == "" {
return errors.Errorf(errors.CodeBadRequest, "ConfigMap '%s' does not have executorImage", wfc.configMap)
}
Expand Down Expand Up @@ -152,7 +175,7 @@ func (wfc *WorkflowController) watchControllerConfigMap(ctx context.Context) (ca
AddFunc: func(obj interface{}) {
if cm, ok := obj.(*apiv1.ConfigMap); ok {
log.Infof("Detected ConfigMap update. Updating the controller config.")
err := wfc.updateConfig(cm)
err := wfc.updateConfigFromConfigMap(cm)
if err != nil {
log.Errorf("Update of config failed due to: %v", err)
}
Expand All @@ -166,7 +189,7 @@ func (wfc *WorkflowController) watchControllerConfigMap(ctx context.Context) (ca
}
if newCm, ok := new.(*apiv1.ConfigMap); ok {
log.Infof("Detected ConfigMap update. Updating the controller config.")
err := wfc.updateConfig(newCm)
err := wfc.updateConfigFromConfigMap(newCm)
if err != nil {
log.Errorf("Update of config failed due to: %v", err)
}
Expand Down
20 changes: 15 additions & 5 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type WorkflowController struct {
namespace string
// configMap is the name of the config map in which to derive configuration of the controller from
configMap string
// configFile is the path to a configuration file
configFile string
// Config is the workflow controller's configuration
Config WorkflowControllerConfig

Expand Down Expand Up @@ -73,12 +75,15 @@ func NewWorkflowController(
executorImage,
executorImagePullPolicy,
configMap string,
configFile string,
) *WorkflowController {

wfc := WorkflowController{
restConfig: restConfig,
kubeclientset: kubeclientset,
wfclientset: wfclientset,
configMap: configMap,
configFile: configFile,
namespace: namespace,
cliExecutorImage: executorImage,
cliExecutorImagePullPolicy: executorImagePullPolicy,
Expand Down Expand Up @@ -128,11 +133,16 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in

log.Infof("Workflow Controller (version: %s) starting", argo.GetVersion())
log.Infof("Workers: workflow: %d, pod: %d", wfWorkers, podWorkers)
log.Info("Watch Workflow controller config map updates")
_, err := wfc.watchControllerConfigMap(ctx)
if err != nil {
log.Errorf("Failed to register watch for controller config map: %v", err)
return

if wfc.configFile != "" {
log.Info("A config file was specified. Ignoring the k8s configmap resource")
} else {
log.Info("Watch Workflow controller config map updates")
_, err := wfc.watchControllerConfigMap(ctx)
if err != nil {
log.Errorf("Failed to register watch for controller config map: %v", err)
return
}
}

wfc.wfInformer = util.NewWorkflowInformer(wfc.restConfig, wfc.Config.Namespace, workflowResyncPeriod, wfc.tweakWorkflowlist)
Expand Down