Skip to content

Commit

Permalink
feat(platform): controller support crd mode (#2026)
Browse files Browse the repository at this point in the history
  • Loading branch information
Leo Ryu committed Jul 26, 2022
1 parent 3ce92b2 commit 3d8d46d
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 15 deletions.
2 changes: 2 additions & 0 deletions api/platform/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ const (
AnywhereLocalizationsAnno = "tkestack.io/anywhere-localizations"
// AnywhereMachinesAnno contains base64 machines json data
AnywhereMachinesAnno = "tkestack.io/anywhere-machines"
// ClusterNameLable contains related cluster's name for no-cluster resources
ClusterNameLable = "tkestack.io/cluster-name"
)

// KubeVendorType describe the kubernetes provider of the cluster
Expand Down
2 changes: 2 additions & 0 deletions api/platform/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ const (
AnywhereLocalizationsAnno = "tkestack.io/anywhere-localizations"
// AnywhereMachinesAnno contains base64 machines json data
AnywhereMachinesAnno = "tkestack.io/anywhere-machines"
// ClusterNameLable contains related cluster's name for no-cluster resources
ClusterNameLable = "tkestack.io/cluster-name"
)

// KubeVendorType describe the kubernetes provider of the cluster
Expand Down
8 changes: 8 additions & 0 deletions cmd/tke-platform-controller/app/options/clustercontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
flagUpperLimitofRandomHealthCheckPeriod = "upper-limit-random-healthcheck-period"
flagClusterRateLimiterLimit = "cluster-rate-limiter-limit"
flagClusterRateLimiterBurst = "cluster-rate-limiter-burst"
flagClusterIsCRDMode = "cluster-is-crd-mode"
)

const (
Expand All @@ -43,6 +44,7 @@ const (
configUpperLimitofRandomHealthCheckPeriod = "controller.upper-limit-random-healthcheck-period"
configClusterRateLimiterLimit = "controller.cluster_rate_limiter_limit"
configClusterRateLimiterBurst = "controller.cluster_rate_limiter_burst"
configClusterIsCRDMode = "controller.cluster_is_crd_mode"
)

// ClusterControllerOptions holds the ClusterController options.
Expand All @@ -61,6 +63,7 @@ func NewClusterControllerOptions() *ClusterControllerOptions {
RandomeRangeUpperLimitForHealthCheckPeriod: defaultRandomeRangeUpperLimitForHealthCheckPeriod,
BucketRateLimiterLimit: defaultBucketRateLimiterLimit,
BucketRateLimiterBurst: defaultBucketRateLimiterBurst,
IsCRDMode: false,
},
}
}
Expand Down Expand Up @@ -91,6 +94,9 @@ func (o *ClusterControllerOptions) AddFlags(fs *pflag.FlagSet) {

fs.IntVar(&o.BucketRateLimiterBurst, flagClusterRateLimiterBurst, o.BucketRateLimiterBurst, "The number of bursts of at most b tokens.")
_ = viper.BindPFlag(configClusterRateLimiterBurst, fs.Lookup(flagClusterRateLimiterBurst))

fs.BoolVar(&o.IsCRDMode, flagClusterIsCRDMode, o.IsCRDMode, "Whether the controller is using CRD mode")
_ = viper.BindPFlag(configClusterIsCRDMode, fs.Lookup(flagClusterIsCRDMode))
}

// ApplyTo fills up ClusterController config with options.
Expand All @@ -106,6 +112,7 @@ func (o *ClusterControllerOptions) ApplyTo(cfg *clusterconfig.ClusterControllerC
cfg.RandomeRangeUpperLimitForHealthCheckPeriod = o.RandomeRangeUpperLimitForHealthCheckPeriod
cfg.BucketRateLimiterLimit = o.BucketRateLimiterLimit
cfg.BucketRateLimiterBurst = o.BucketRateLimiterBurst
cfg.IsCRDMode = o.IsCRDMode

return nil
}
Expand All @@ -130,5 +137,6 @@ func (o *ClusterControllerOptions) ApplyFlags() []error {
o.RandomeRangeUpperLimitForHealthCheckPeriod = viper.GetDuration(configUpperLimitofRandomHealthCheckPeriod)
o.BucketRateLimiterLimit = viper.GetInt(configClusterRateLimiterLimit)
o.BucketRateLimiterBurst = viper.GetInt(configClusterRateLimiterBurst)
o.IsCRDMode = viper.GetBool(configClusterIsCRDMode)
return nil
}
14 changes: 11 additions & 3 deletions pkg/controller/config/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package config

import (
"fmt"

"k8s.io/client-go/rest"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"tkestack.io/tke/pkg/controller/options"
Expand All @@ -29,9 +31,15 @@ import (
func BuildClientConfig(opts *options.APIServerClientOptions) (cfg *restclient.Config, ok bool, err error) {
if opts.Required {
if opts.Server == "" && opts.ServerClientConfig == "" {
err = fmt.Errorf("either %s or %s should be specified",
options.FlagAPIClientServer(opts.Name),
options.FlagAPIClientServerClientConfig(opts.Name))
cfg, err = rest.InClusterConfig()
if err != nil {
err = fmt.Errorf("%s or %s is not specified, try to use in cluster config failed %v",
options.FlagAPIClientServer(opts.Name),
options.FlagAPIClientServerClientConfig(opts.Name),
err)
} else {
ok = true
}
return
}
}
Expand Down
7 changes: 1 addition & 6 deletions pkg/controller/options/apiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package options

import (
"fmt"

"github.com/spf13/pflag"
"github.com/spf13/viper"
)
Expand Down Expand Up @@ -113,12 +114,6 @@ func (o *APIServerClientOptions) ApplyFlags() []error {
o.QPS = float32(viper.GetFloat64(o.configAPIClientQPS))
o.ContentType = viper.GetString(o.configAPIClientContentType)

if o.Required {
if o.ServerClientConfig == "" && o.Server == "" {
errs = append(errs, fmt.Errorf("must specify either `%s` or `%s`", FlagAPIClientServer(o.Name), FlagAPIClientServerClientConfig(o.Name)))
}
}

return errs
}

Expand Down
49 changes: 43 additions & 6 deletions pkg/platform/controller/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Controller struct {
healthCheckPeriod time.Duration
randomeRangeLowerLimitForHealthCheckPeriod time.Duration
randomeRangeUpperLimitForHealthCheckPeriod time.Duration
isCRDMode bool
}

// NewController creates a new Controller object.
Expand All @@ -88,6 +89,7 @@ func NewController(
platformClient,
finalizerToken,
true),
isCRDMode: configuration.IsCRDMode,
}
rateLimit := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
Expand Down Expand Up @@ -345,6 +347,10 @@ func (c *Controller) reconcile(ctx context.Context, key string, cluster *platfor
var err error

switch cluster.Status.Phase {
// empty string is for crd without mutating webhook
case "":
cluster.Status.Phase = platformv1.ClusterInitializing
err = c.onCreate(ctx, cluster)
case platformv1.ClusterInitializing:
err = c.onCreate(ctx, cluster)
case platformv1.ClusterRunning, platformv1.ClusterFailed:
Expand Down Expand Up @@ -389,18 +395,37 @@ func (c *Controller) onCreate(ctx context.Context, cluster *platformv1.Cluster)
if err != nil {
// Update status, ignore failure
_, _ = c.platformClient.ClusterCredentials().Update(ctx, clusterWrapper.ClusterCredential, metav1.UpdateOptions{})
_, _ = c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{})
updatedCls, _ := c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{})
// if using crd, cluster status cannot be updated through update cluster
if c.isCRDMode {
var clsStatus *platformv1.Cluster
if updatedCls == nil {
clsStatus = clusterWrapper.Cluster
} else {
clsStatus = updatedCls
clsStatus.Status = clusterWrapper.Cluster.Status
}
_, _ = c.platformClient.Clusters().UpdateStatus(ctx, clsStatus, metav1.UpdateOptions{})
}
return err
}
clusterWrapper.ClusterCredential, err = c.platformClient.ClusterCredentials().Update(ctx, clusterWrapper.ClusterCredential, metav1.UpdateOptions{})
if err != nil {
return err
}
clusterWrapper.RegisterRestConfig(clusterWrapper.ClusterCredential.RESTConfig(cluster))
clusterWrapper.Cluster, err = c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{})
cls, err := c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{})
if err != nil {
return err
}
// if using crd, cluster status cannot be updated through update cluster
if c.isCRDMode {
cls.Status = clusterWrapper.Cluster.Status
clusterWrapper.Cluster, err = c.platformClient.Clusters().UpdateStatus(ctx, cls, metav1.UpdateOptions{})
if err != nil {
return err
}
}
}

return nil
Expand Down Expand Up @@ -487,10 +512,20 @@ func (c *Controller) ensureCreateClusterCredential(ctx context.Context, cluster
}

// TODO use informer search by labels.
fieldSelector := fields.OneTermEqualSelector("clusterName", cluster.Name).String()
clustercredentials, err := c.platformClient.ClusterCredentials().List(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
if err != nil {
return nil, err
var clustercredentials *platformv1.ClusterCredentialList
var err error
if c.isCRDMode {
labelSelector := fields.OneTermEqualSelector(platformv1.ClusterNameLable, cluster.Name).String()
clustercredentials, err = c.platformClient.ClusterCredentials().List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
if err != nil {
return nil, err
}
} else {
fieldSelector := fields.OneTermEqualSelector("clusterName", cluster.Name).String()
clustercredentials, err = c.platformClient.ClusterCredentials().List(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
if err != nil {
return nil, err
}
}

// [Idempotent] if not found cluster credentials, create one for next logic
Expand All @@ -500,6 +535,8 @@ func (c *Controller) ensureCreateClusterCredential(ctx context.Context, cluster
TenantID: cluster.Spec.TenantID,
ClusterName: cluster.Name,
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{platformv1.ClusterNameLable: cluster.Name},
GenerateName: "cc-",
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(cluster, platformv1.SchemeGroupVersion.WithKind("Cluster"))},
},
Expand Down
2 changes: 2 additions & 0 deletions pkg/platform/controller/cluster/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ type ClusterControllerConfiguration struct {
BucketRateLimiterLimit int
// BucketRateLimiterBurst bursts of at most b tokens.
BucketRateLimiterBurst int
// IsCRDMode Whether the controller is using CRD mode
IsCRDMode bool
}

0 comments on commit 3d8d46d

Please sign in to comment.