Skip to content

Commit

Permalink
[CSE-11] adding config file loader (#10)
Browse files Browse the repository at this point in the history
* adding configmap loader
  • Loading branch information
decarboxy committed Mar 7, 2019
1 parent 7d6b331 commit a28821e
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 19 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.2.1
2.2.2.dev1
5 changes: 4 additions & 1 deletion cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func NewRootCommand() *cobra.Command {
var (
clientConfig clientcmd.ClientConfig
configMap string // --configmap
configFile string // --config-file
executorImage string // --executor-image
executorImagePullPolicy string // --executor-image-pull-policy
logLevel string // --loglevel
Expand All @@ -57,6 +58,7 @@ func NewRootCommand() *cobra.Command {
if err != nil {
return err
}

config.Burst = 30
config.QPS = 20.0

Expand All @@ -69,7 +71,7 @@ func NewRootCommand() *cobra.Command {
wflientset := wfclientset.NewForConfigOrDie(config)

// start a controller on instances of our custom resource
wfController := controller.NewWorkflowController(config, kubeclientset, wflientset, namespace, executorImage, executorImagePullPolicy, configMap)
wfController := controller.NewWorkflowController(config, kubeclientset, wflientset, namespace, executorImage, executorImagePullPolicy, configMap, configFile)
err = wfController.ResyncConfig()
if err != nil {
return err
Expand All @@ -91,6 +93,7 @@ func NewRootCommand() *cobra.Command {
clientConfig = cli.AddKubectlFlagsToCmd(&command)
command.AddCommand(cmdutil.NewVersionCmd(CLIName))
command.Flags().StringVar(&configMap, "configmap", "workflow-controller-configmap", "Name of K8s configmap to retrieve workflow controller configuration")
command.Flags().StringVar(&configFile, "config-file", "", "Path to a yaml config file. Cannot be specified at the same time as --configmap")
command.Flags().StringVar(&executorImage, "executor-image", "", "Executor image to use (overrides value in configmap)")
command.Flags().StringVar(&executorImagePullPolicy, "executor-image-pull-policy", "", "Executor imagePullPolicy to use (overrides value in configmap)")
command.Flags().StringVar(&logLevel, "loglevel", "info", "Set the logging level. One of: debug|info|warn|error")
Expand Down
47 changes: 35 additions & 12 deletions workflow/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"context"
"fmt"
"io/ioutil"

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -94,28 +95,50 @@ type GCSArtifactRepository struct {
wfv1.GCSBucket `json:",inline"`
}

// ResyncConfig reloads the controller config from the configmap
// ResyncConfig reloads the controller config from the configmap or configFile
func (wfc *WorkflowController) ResyncConfig() error {
cmClient := wfc.kubeclientset.CoreV1().ConfigMaps(wfc.namespace)
cm, err := cmClient.Get(wfc.configMap, metav1.GetOptions{})
if err != nil {
return errors.InternalWrapError(err)

if wfc.configFile != "" {
log.Infof("Loading configfile from %s", wfc.configFile)
return wfc.updateConfigFromFile(wfc.configFile)
} else {
cmClient := wfc.kubeclientset.CoreV1().ConfigMaps(wfc.namespace)
cm, err := cmClient.Get(wfc.configMap, metav1.GetOptions{})
if err != nil {
return errors.InternalWrapError(err)
}
return wfc.updateConfigFromConfigMap(cm)
}
return wfc.updateConfig(cm)
}

func (wfc *WorkflowController) updateConfig(cm *apiv1.ConfigMap) error {
configStr, ok := cm.Data[common.WorkflowControllerConfigMapKey]
func (wfc *WorkflowController) updateConfigFromConfigMap(cm *apiv1.ConfigMap) error {
configString, ok := cm.Data[common.WorkflowControllerConfigMapKey]
if !ok {
log.Warnf("ConfigMap '%s' does not have key '%s'", wfc.configMap, common.WorkflowControllerConfigMapKey)
return nil
}

return wfc.updateConfig(configString)
}

func (wfc *WorkflowController) updateConfigFromFile(filePath string) error {
fileData, err := ioutil.ReadFile(filePath)
if err != nil {
log.Errorf("Error reading config file %s", filePath)
return err
}
return wfc.updateConfig(string(fileData))

}

func (wfc *WorkflowController) updateConfig(configString string) error {

var config WorkflowControllerConfig
err := yaml.Unmarshal([]byte(configStr), &config)
err := yaml.Unmarshal([]byte(configString), &config)
if err != nil {
return errors.InternalWrapError(err)
}
log.Printf("workflow controller configuration from %s:\n%s", wfc.configMap, configStr)
log.Printf("workflow controller configuration from %s:\n%s", wfc.configMap, configString)
if wfc.cliExecutorImage == "" && config.ExecutorImage == "" {
return errors.Errorf(errors.CodeBadRequest, "ConfigMap '%s' does not have executorImage", wfc.configMap)
}
Expand Down Expand Up @@ -152,7 +175,7 @@ func (wfc *WorkflowController) watchControllerConfigMap(ctx context.Context) (ca
AddFunc: func(obj interface{}) {
if cm, ok := obj.(*apiv1.ConfigMap); ok {
log.Infof("Detected ConfigMap update. Updating the controller config.")
err := wfc.updateConfig(cm)
err := wfc.updateConfigFromConfigMap(cm)
if err != nil {
log.Errorf("Update of config failed due to: %v", err)
}
Expand All @@ -166,7 +189,7 @@ func (wfc *WorkflowController) watchControllerConfigMap(ctx context.Context) (ca
}
if newCm, ok := new.(*apiv1.ConfigMap); ok {
log.Infof("Detected ConfigMap update. Updating the controller config.")
err := wfc.updateConfig(newCm)
err := wfc.updateConfigFromConfigMap(newCm)
if err != nil {
log.Errorf("Update of config failed due to: %v", err)
}
Expand Down
20 changes: 15 additions & 5 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type WorkflowController struct {
namespace string
// configMap is the name of the config map in which to derive configuration of the controller from
configMap string
// configFile is the path to a configuration file
configFile string
// Config is the workflow controller's configuration
Config WorkflowControllerConfig

Expand Down Expand Up @@ -73,12 +75,15 @@ func NewWorkflowController(
executorImage,
executorImagePullPolicy,
configMap string,
configFile string,
) *WorkflowController {

wfc := WorkflowController{
restConfig: restConfig,
kubeclientset: kubeclientset,
wfclientset: wfclientset,
configMap: configMap,
configFile: configFile,
namespace: namespace,
cliExecutorImage: executorImage,
cliExecutorImagePullPolicy: executorImagePullPolicy,
Expand Down Expand Up @@ -128,11 +133,16 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in

log.Infof("Workflow Controller (version: %s) starting", argo.GetVersion())
log.Infof("Workers: workflow: %d, pod: %d", wfWorkers, podWorkers)
log.Info("Watch Workflow controller config map updates")
_, err := wfc.watchControllerConfigMap(ctx)
if err != nil {
log.Errorf("Failed to register watch for controller config map: %v", err)
return

if wfc.configFile != "" {
log.Info("A config file was specified. Ignoring the k8s configmap resource")
} else {
log.Info("Watch Workflow controller config map updates")
_, err := wfc.watchControllerConfigMap(ctx)
if err != nil {
log.Errorf("Failed to register watch for controller config map: %v", err)
return
}
}

wfc.wfInformer = util.NewWorkflowInformer(wfc.restConfig, wfc.Config.Namespace, workflowResyncPeriod, wfc.tweakWorkflowlist)
Expand Down

0 comments on commit a28821e

Please sign in to comment.