Skip to content

Commit

Permalink
fix: compat with 1.18 for all controller
Browse files Browse the repository at this point in the history
  • Loading branch information
choujimmy committed May 12, 2020
1 parent 77bee01 commit 9031cac
Show file tree
Hide file tree
Showing 24 changed files with 754 additions and 712 deletions.
69 changes: 35 additions & 34 deletions pkg/platform/controller/addon/helm/helm_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package helm

import (
"context"
"fmt"
"reflect"
"time"
Expand Down Expand Up @@ -180,25 +181,25 @@ func (c *Controller) sync(key string) error {
switch {
case errors.IsNotFound(err):
log.Info("Helm has been deleted. Attempting to cleanup resources", log.String("helm", key))
err = c.processHelmDeletion(key)
err = c.processHelmDeletion(context.Background(), key)
case err != nil:
log.Warn("Unable to retrieve helm from store", log.String("helm", key), log.Err(err))
default:
err = c.processCreateOrUpdate(helm, key)
err = c.processCreateOrUpdate(context.Background(), helm, key)
}
return err
}

func (c *Controller) processHelmDeletion(key string) error {
func (c *Controller) processHelmDeletion(ctx context.Context, key string) error {
cachedHelm, ok := c.cache.get(key)
if !ok {
log.Error("Helm not in cache even though the watcher thought it was. Ignoring the deletion", log.String("helmName", key))
return nil
}
return c.processDelete(cachedHelm, key)
return c.processDelete(ctx, cachedHelm, key)
}

func (c *Controller) processDelete(cachedHelm *cachedHelm, key string) error {
func (c *Controller) processDelete(ctx context.Context, cachedHelm *cachedHelm, key string) error {
log.Info("helm will be dropped", log.String("helm", key))
if c.cache.Exist(key) {
c.cache.delete(key)
Expand All @@ -210,25 +211,25 @@ func (c *Controller) processDelete(cachedHelm *cachedHelm, key string) error {

var provisioner Provisioner
var err error
if provisioner, err = createProvisioner(helm, c.client); err != nil {
if provisioner, err = createProvisioner(ctx, helm, c.client); err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
return provisioner.Uninstall()
return provisioner.Uninstall(ctx)
}

func (c *Controller) processCreateOrUpdate(holder *v1.Helm, key string) error {
func (c *Controller) processCreateOrUpdate(ctx context.Context, holder *v1.Helm, key string) error {
cached := c.cache.getOrCreate(key)
if cached.state != nil && cached.state.UID != holder.UID {
// if the same key holder is created, delete it
if err := c.processDelete(cached, key); err != nil {
if err := c.processDelete(ctx, cached, key); err != nil {
return err
}
}

err := c.handlePhase(key, cached, holder)
err := c.handlePhase(ctx, key, cached, holder)
if err != nil {
return err
}
Expand All @@ -239,14 +240,14 @@ func (c *Controller) processCreateOrUpdate(holder *v1.Helm, key string) error {
return nil
}

func (c *Controller) handlePhase(key string, cachedHelm *cachedHelm, holder *v1.Helm) error {
func (c *Controller) handlePhase(ctx context.Context, key string, cachedHelm *cachedHelm, holder *v1.Helm) error {
phase := holder.Status.Phase
log.Info(fmt.Sprintf("Helm is %s", string(phase)), log.String("helm", key))
switch phase {
case v1.AddonPhaseInitializing:
return c.doInitializing(key, holder)
return c.doInitializing(ctx, key, holder)
case v1.AddonPhaseReinitializing:
c.doReinitializing(key, holder)
c.doReinitializing(ctx, key, holder)
case v1.AddonPhaseChecking, v1.AddonPhaseRunning, v1.AddonPhaseFailed, v1.AddonPhaseUnhealthy:
if !c.prober.ExistByPhase(key, phase) {
c.prober.Set(key, phase)
Expand All @@ -255,7 +256,7 @@ func (c *Controller) handlePhase(key string, cachedHelm *cachedHelm, holder *v1.
return nil
}

func (c *Controller) doInitializing(key string, holder *v1.Helm) error {
func (c *Controller) doInitializing(ctx context.Context, key string, holder *v1.Helm) error {
defer controllerutil.CatchPanic("doInitializing", "Helm")

if c.prober.Exist(key) {
Expand All @@ -264,22 +265,22 @@ func (c *Controller) doInitializing(key string, holder *v1.Helm) error {

var provisioner Provisioner
var err error
if provisioner, err = createProvisioner(holder, c.client); err != nil {
if provisioner, err = createProvisioner(ctx, holder, c.client); err != nil {
return err
}
if err := provisioner.Install(); err != nil {
if err := provisioner.Install(ctx); err != nil {
// if user install his own tiller, update helm status to fail
if errors.IsConflict(err) {
return updateHelmStatus(getUpdateObj(holder, v1.AddonPhaseFailed, err.Error()), c.client)
return updateHelmStatus(ctx, getUpdateObj(holder, v1.AddonPhaseFailed, err.Error()), c.client)
}
return updateHelmStatus(getUpdateObj(holder, v1.AddonPhaseReinitializing, err.Error()), c.client)
return updateHelmStatus(ctx, getUpdateObj(holder, v1.AddonPhaseReinitializing, err.Error()), c.client)
}
newObj := getUpdateObj(holder, v1.AddonPhaseChecking, "")
newObj.Status.LastReInitializingTimestamp = metav1.Now()
return updateHelmStatus(newObj, c.client)
return updateHelmStatus(ctx, newObj, c.client)
}

func (c *Controller) doReinitializing(key string, helm *v1.Helm) {
func (c *Controller) doReinitializing(ctx context.Context, key string, helm *v1.Helm) {
var interval = time.Since(helm.Status.LastReInitializingTimestamp.Time)
var waitTime time.Duration
if interval >= helmTimeOut {
Expand All @@ -289,42 +290,42 @@ func (c *Controller) doReinitializing(key string, helm *v1.Helm) {
}
go func() {
defer controllerutil.CatchPanic("reinitialize", "Helm")
if err := wait.Poll(waitTime, helmTimeOut, c.reinitialize(key, helm)); err != nil {
if err := wait.Poll(waitTime, helmTimeOut, c.reinitialize(ctx, key, helm)); err != nil {
log.Info(fmt.Sprintf("reinitialize err: %v", err))
}
}()
}

func (c *Controller) reinitialize(key string, holder *v1.Helm) func() (bool, error) {
func (c *Controller) reinitialize(ctx context.Context, key string, holder *v1.Helm) func() (bool, error) {
// this func will always return true that keeps the poll once
return func() (bool, error) {
var provisioner Provisioner
var err error
if provisioner, err = createProvisioner(holder, c.client); err == nil {
_ = provisioner.Uninstall()
if err = provisioner.Install(); err == nil {
if provisioner, err = createProvisioner(ctx, holder, c.client); err == nil {
_ = provisioner.Uninstall(ctx)
if err = provisioner.Install(ctx); err == nil {
newObj := getUpdateObj(holder, v1.AddonPhaseChecking, "")
newObj.Status.LastReInitializingTimestamp = metav1.Now()
return true, updateHelmStatus(newObj, c.client)
return true, updateHelmStatus(ctx, newObj, c.client)
}
if errors.IsConflict(err) {
return true, updateHelmStatus(getUpdateObj(holder, v1.AddonPhaseFailed, err.Error()), c.client)
return true, updateHelmStatus(ctx, getUpdateObj(holder, v1.AddonPhaseFailed, err.Error()), c.client)
}
}
if holder.Status.RetryCount >= helmMaxRetryCount {
err := fmt.Sprintf("Install error and retried max(%d) times already.", helmMaxRetryCount)
return true, updateHelmStatus(getUpdateObj(holder, v1.AddonPhaseFailed, err), c.client)
return true, updateHelmStatus(ctx, getUpdateObj(holder, v1.AddonPhaseFailed, err), c.client)
}
return true, updateHelmStatus(getUpdateObj(holder, v1.AddonPhaseReinitializing, err.Error()), c.client)
return true, updateHelmStatus(ctx, getUpdateObj(holder, v1.AddonPhaseReinitializing, err.Error()), c.client)
}
}

func createProvisioner(helm *v1.Helm, client clientset.Interface) (Provisioner, error) {
cluster, err := client.PlatformV1().Clusters().Get(helm.Spec.ClusterName, metav1.GetOptions{})
func createProvisioner(ctx context.Context, helm *v1.Helm, client clientset.Interface) (Provisioner, error) {
cluster, err := client.PlatformV1().Clusters().Get(ctx, helm.Spec.ClusterName, metav1.GetOptions{})
if err != nil {
return nil, err
}
kubeClient, err := util.BuildExternalClientSet(cluster, client.PlatformV1())
kubeClient, err := util.BuildExternalClientSet(ctx, cluster, client.PlatformV1())
if err != nil {
return nil, err
}
Expand All @@ -338,9 +339,9 @@ func createProvisioner(helm *v1.Helm, client clientset.Interface) (Provisioner,
}

// updateHelmStatus means update status to the given object
func updateHelmStatus(obj *v1.Helm, client clientset.Interface) error {
func updateHelmStatus(ctx context.Context, obj *v1.Helm, client clientset.Interface) error {
return wait.PollImmediate(time.Second, helmClientRetryCount*time.Second, func() (done bool, err error) {
_, err = client.PlatformV1().Helms().UpdateStatus(obj)
_, err = client.PlatformV1().Helms().UpdateStatus(ctx, obj, metav1.UpdateOptions{})
if err == nil {
return true, nil
}
Expand Down
70 changes: 36 additions & 34 deletions pkg/platform/controller/addon/helm/helm_installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package helm

import (
"context"
normalerrors "errors"
"fmt"
"reflect"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"reflect"
"tkestack.io/tke/pkg/platform/controller/addon/helm/images"
"tkestack.io/tke/pkg/util/apiclient"

Expand Down Expand Up @@ -60,9 +62,9 @@ var selectorForHelm = metav1.LabelSelector{
}

type Provisioner interface {
Install() error
Uninstall() error
GetStatus() error
Install(ctx context.Context) error
Uninstall(ctx context.Context) error
GetStatus(ctx context.Context) error
}

type provisioner struct {
Expand All @@ -83,9 +85,9 @@ func NewProvisioner(kubeClient kubernetes.Interface, option *Option) Provisioner
}
}

func (p *provisioner) Install() error {
func (p *provisioner) Install(ctx context.Context) error {
// if unOfficial tiller in cluster
err := p.isOfficialTiller()
err := p.isOfficialTiller(ctx)
if err != nil {
return err
}
Expand All @@ -95,60 +97,60 @@ func (p *provisioner) Install() error {
option := p.option

// ServiceAccount Helm
if err := apiclient.CreateOrUpdateServiceAccount(kubeClient, serviceAccountHelm()); err != nil {
if err := apiclient.CreateOrUpdateServiceAccount(ctx, kubeClient, serviceAccountHelm()); err != nil {
return err
}
// ClusterRoleBinding Helm
if err := apiclient.CreateOrUpdateClusterRoleBinding(kubeClient, crbHelm()); err != nil {
if err := apiclient.CreateOrUpdateClusterRoleBinding(ctx, kubeClient, crbHelm()); err != nil {
return err
}
// Deployment Tiller
if option.isExtensionsAPIGroup {
if err := apiclient.CreateOrUpdateDeploymentExtensionsV1beta1(kubeClient, deploymentTillerExtensions(option.version)); err != nil {
if err := apiclient.CreateOrUpdateDeploymentExtensionsV1beta1(ctx, kubeClient, deploymentTillerExtensions(option.version)); err != nil {
return err
}
} else {
if err := apiclient.CreateOrUpdateDeployment(kubeClient, deploymentTiller(option.version)); err != nil {
if err := apiclient.CreateOrUpdateDeployment(ctx, kubeClient, deploymentTiller(option.version)); err != nil {
return err
}
}
// Service Tiller
if err := apiclient.CreateOrUpdateService(kubeClient, serviceTiller()); err != nil {
if err := apiclient.CreateOrUpdateService(ctx, kubeClient, serviceTiller()); err != nil {
return err
}
// Deployment Helm-api
if option.isExtensionsAPIGroup {
if err := apiclient.CreateOrUpdateDeploymentExtensionsV1beta1(kubeClient, deploymentHelmAPIExtensions(option.version)); err != nil {
if err := apiclient.CreateOrUpdateDeploymentExtensionsV1beta1(ctx, kubeClient, deploymentHelmAPIExtensions(option.version)); err != nil {
return err
}
} else {
if err := apiclient.CreateOrUpdateDeployment(kubeClient, deploymentHelmAPI(option.version)); err != nil {
if err := apiclient.CreateOrUpdateDeployment(ctx, kubeClient, deploymentHelmAPI(option.version)); err != nil {
return err
}
}
// Service Helm-api
if err := apiclient.CreateOrUpdateService(kubeClient, serviceHelmAPI()); err != nil {
if err := apiclient.CreateOrUpdateService(ctx, kubeClient, serviceHelmAPI()); err != nil {
return err
}
return nil
}

func (p *provisioner) Uninstall() error {
func (p *provisioner) Uninstall(ctx context.Context) error {
kubeClient := p.kubeClient
option := p.option

// Service Helm-api
svcHelmAPIErr := apiclient.DeleteService(kubeClient, metav1.NamespaceSystem, svcHelmAPIName)
svcHelmAPIErr := apiclient.DeleteService(ctx, kubeClient, metav1.NamespaceSystem, svcHelmAPIName)
// Deployment Helm-api
deployHelmAPIErr := apiclient.DeleteDeployment(kubeClient, metav1.NamespaceSystem, deployHelmAPIName, option.isExtensionsAPIGroup, selectorForHelm)
deployHelmAPIErr := apiclient.DeleteDeployment(ctx, kubeClient, metav1.NamespaceSystem, deployHelmAPIName, option.isExtensionsAPIGroup, selectorForHelm)
// Service Tiller
svcTillerErr := apiclient.DeleteService(kubeClient, metav1.NamespaceSystem, svcTillerName)
svcTillerErr := apiclient.DeleteService(ctx, kubeClient, metav1.NamespaceSystem, svcTillerName)
// Deployment Tiller
deployTillerErr := apiclient.DeleteDeployment(kubeClient, metav1.NamespaceSystem, deployTillerName, option.isExtensionsAPIGroup, selectorForOfficialTiller)
deployTillerErr := apiclient.DeleteDeployment(ctx, kubeClient, metav1.NamespaceSystem, deployTillerName, option.isExtensionsAPIGroup, selectorForOfficialTiller)
// ClusterRoleBinding Helm
crbHelmErr := apiclient.DeleteClusterRoleBinding(kubeClient, crbHelmName)
crbHelmErr := apiclient.DeleteClusterRoleBinding(ctx, kubeClient, crbHelmName)
// ServiceAccount Helm
svcAccountHelmErr := apiclient.DeleteServiceAccounts(kubeClient, metav1.NamespaceSystem, svcAccountHelmName)
svcAccountHelmErr := apiclient.DeleteServiceAccounts(ctx, kubeClient, metav1.NamespaceSystem, svcAccountHelmName)

if (svcHelmAPIErr != nil && !errors.IsNotFound(svcHelmAPIErr)) ||
(deployHelmAPIErr != nil && !errors.IsNotFound(deployHelmAPIErr)) ||
Expand All @@ -161,10 +163,10 @@ func (p *provisioner) Uninstall() error {
return nil
}

func (p *provisioner) GetStatus() error {
if _, err := p.kubeClient.CoreV1().Services(metav1.NamespaceSystem).ProxyGet("http", svcHelmAPIName, "http", `/tiller/v2/version/json`, nil).DoRaw(); err != nil {
func (p *provisioner) GetStatus(ctx context.Context) error {
if _, err := p.kubeClient.CoreV1().Services(metav1.NamespaceSystem).ProxyGet("http", svcHelmAPIName, "http", `/tiller/v2/version/json`, nil).DoRaw(ctx); err != nil {
// get more detailed checkErr about resource
if checkErr := p.checkRsc(p.kubeClient); checkErr != nil {
if checkErr := p.checkRsc(ctx, p.kubeClient); checkErr != nil {
return checkErr
}
return err
Expand Down Expand Up @@ -546,7 +548,7 @@ func serviceHelmAPI() *corev1.Service {
}
}

func (p *provisioner) isOfficialTiller() error {
func (p *provisioner) isOfficialTiller(ctx context.Context) error {
kubeClient := p.kubeClient
isExtensionsAPIGroup := p.option.isExtensionsAPIGroup

Expand All @@ -558,7 +560,7 @@ func (p *provisioner) isOfficialTiller() error {
}

if !isExtensionsAPIGroup {
deployList, err := kubeClient.AppsV1().Deployments(corev1.NamespaceAll).List(metav1.ListOptions{
deployList, err := kubeClient.AppsV1().Deployments(corev1.NamespaceAll).List(ctx, metav1.ListOptions{
LabelSelector: tillerLabelSelector.String(),
})
if err != nil && !errors.IsNotFound(err) {
Expand All @@ -570,7 +572,7 @@ func (p *provisioner) isOfficialTiller() error {
}
deployLabels = deployList.Items[0].Labels
} else {
deployList, err := kubeClient.ExtensionsV1beta1().Deployments(corev1.NamespaceAll).List(metav1.ListOptions{
deployList, err := kubeClient.ExtensionsV1beta1().Deployments(corev1.NamespaceAll).List(ctx, metav1.ListOptions{
LabelSelector: tillerLabelSelector.String(),
})
if err != nil && !errors.IsNotFound(err) {
Expand All @@ -593,23 +595,23 @@ func (p *provisioner) isOfficialTiller() error {
return conflictErr
}

func (p *provisioner) checkRsc(kubeClient kubernetes.Interface) error {
if _, err := apiclient.GetServiceAccount(p.kubeClient, metav1.NamespaceSystem, svcAccountHelmName); err != nil {
func (p *provisioner) checkRsc(ctx context.Context, kubeClient kubernetes.Interface) error {
if _, err := apiclient.GetServiceAccount(ctx, p.kubeClient, metav1.NamespaceSystem, svcAccountHelmName); err != nil {
return err
}
if _, err := apiclient.GetClusterRoleBinding(p.kubeClient, crbHelmName); err != nil {
if _, err := apiclient.GetClusterRoleBinding(ctx, p.kubeClient, crbHelmName); err != nil {
return err
}
if _, err := apiclient.GetService(p.kubeClient, metav1.NamespaceSystem, svcHelmAPIName); err != nil {
if _, err := apiclient.GetService(ctx, p.kubeClient, metav1.NamespaceSystem, svcHelmAPIName); err != nil {
return err
}
if _, err := apiclient.GetService(p.kubeClient, metav1.NamespaceSystem, svcTillerName); err != nil {
if _, err := apiclient.GetService(ctx, p.kubeClient, metav1.NamespaceSystem, svcTillerName); err != nil {
return err
}
if _, err := apiclient.CheckDeployment(p.kubeClient, metav1.NamespaceSystem, deployTillerName); err != nil {
if _, err := apiclient.CheckDeployment(ctx, p.kubeClient, metav1.NamespaceSystem, deployTillerName); err != nil {
return err
}
if _, err := apiclient.CheckDeployment(p.kubeClient, metav1.NamespaceSystem, deployHelmAPIName); err != nil {
if _, err := apiclient.CheckDeployment(ctx, p.kubeClient, metav1.NamespaceSystem, deployHelmAPIName); err != nil {
return err
}
return nil
Expand Down
Loading

0 comments on commit 9031cac

Please sign in to comment.