From 130e0b4a6b8f433b23a6cb01f6d3ea969c95f8f2 Mon Sep 17 00:00:00 2001 From: QianChenglong Date: Mon, 22 Jun 2020 09:45:53 +0800 Subject: [PATCH] refactor: cluster controller --- api/platform/v1/cluster.go | 8 + go.mod | 1 - .../controller/cluster/cluster_cache.go | 94 ----- .../controller/cluster/cluster_controller.go | 393 ++++++++---------- .../controller/cluster/cluster_health.go | 270 ------------ pkg/platform/provider/cluster/interface.go | 123 +++--- 6 files changed, 236 insertions(+), 653 deletions(-) delete mode 100644 pkg/platform/controller/cluster/cluster_cache.go delete mode 100644 pkg/platform/controller/cluster/cluster_health.go diff --git a/api/platform/v1/cluster.go b/api/platform/v1/cluster.go index 04fd416c6..46569c708 100644 --- a/api/platform/v1/cluster.go +++ b/api/platform/v1/cluster.go @@ -104,6 +104,14 @@ func (in *Cluster) SetCondition(newCondition ClusterCondition) { } in.Status.Conditions = conditions + switch newCondition.Status { + case ConditionFalse: + in.Status.Reason = newCondition.Reason + in.Status.Message = newCondition.Message + default: + in.Status.Reason = "" + in.Status.Message = "" + } } func (in *Cluster) Host() (string, error) { diff --git a/go.mod b/go.mod index 1d82ae158..2aefcdc23 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,6 @@ require ( github.com/Masterminds/semver v1.5.0 github.com/aws/aws-sdk-go v1.29.32 github.com/bitly/go-simplejson v0.5.0 - github.com/blang/semver v3.5.1+incompatible github.com/casbin/casbin/v2 v2.2.1 github.com/chartmuseum/storage v0.8.0 github.com/coreos/go-oidc v2.2.1+incompatible diff --git a/pkg/platform/controller/cluster/cluster_cache.go b/pkg/platform/controller/cluster/cluster_cache.go deleted file mode 100644 index 4e390e391..000000000 --- a/pkg/platform/controller/cluster/cluster_cache.go +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Tencent is pleased to support the open source community by making TKEStack - * available. - * - * Copyright (C) 2012-2019 Tencent. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * https://opensource.org/licenses/Apache-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package cluster - -import ( - "sync" - - v1 "tkestack.io/tke/api/platform/v1" -) - -type cachedCluster struct { - // The cached state of the cluster - state *v1.Cluster -} - -type clusterCache struct { - mu sync.Mutex // protects clusterMap - clusterMap map[string]*cachedCluster -} - -// ListKeys implements the interface required by DeltaFIFO to list the keys we -// already know about. -func (s *clusterCache) ListKeys() []string { - s.mu.Lock() - defer s.mu.Unlock() - keys := make([]string, 0, len(s.clusterMap)) - for k := range s.clusterMap { - keys = append(keys, k) - } - return keys -} - -// GetByKey returns the value stored in the clusterMap under the given key -func (s *clusterCache) GetByKey(key string) (interface{}, bool, error) { - s.mu.Lock() - defer s.mu.Unlock() - if v, ok := s.clusterMap[key]; ok { - return v, true, nil - } - return nil, false, nil -} - -func (s *clusterCache) get(clusterName string) (*cachedCluster, bool) { - s.mu.Lock() - defer s.mu.Unlock() - cluster, ok := s.clusterMap[clusterName] - return cluster, ok -} - -func (s *clusterCache) Exist(clusterName string) bool { - s.mu.Lock() - defer s.mu.Unlock() - _, ok := s.clusterMap[clusterName] - return ok -} - -func (s *clusterCache) getOrCreate(clusterName string) *cachedCluster { - s.mu.Lock() - defer s.mu.Unlock() - cluster, ok := s.clusterMap[clusterName] - if !ok { - cluster = &cachedCluster{} - s.clusterMap[clusterName] = cluster - } - return cluster -} - -func (s *clusterCache) set(clusterName string, cluster *cachedCluster) { - s.mu.Lock() - defer s.mu.Unlock() - s.clusterMap[clusterName] = cluster -} - -func (s *clusterCache) delete(clusterName string) { - s.mu.Lock() - defer s.mu.Unlock() - delete(s.clusterMap, clusterName) -} diff --git a/pkg/platform/controller/cluster/cluster_controller.go b/pkg/platform/controller/cluster/cluster_controller.go index d0a8b8657..9de91153e 100644 --- a/pkg/platform/controller/cluster/cluster_controller.go +++ b/pkg/platform/controller/cluster/cluster_controller.go @@ -22,14 +22,17 @@ import ( "context" "errors" "fmt" + "math/rand" "reflect" "time" + mapset "github.com/deckarep/golang-set" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -41,50 +44,29 @@ import ( "tkestack.io/tke/pkg/platform/controller/cluster/deletion" clusterprovider "tkestack.io/tke/pkg/platform/provider/cluster" typesv1 "tkestack.io/tke/pkg/platform/types/v1" + "tkestack.io/tke/pkg/platform/util" "tkestack.io/tke/pkg/util/log" "tkestack.io/tke/pkg/util/metrics" + "tkestack.io/tke/pkg/util/strategicpatch" ) const ( - clusterClientRetryCount = 5 - clusterClientRetryInterval = 5 * time.Second + conditionTypeHealthCheck = "HealthCheck" + failedHealthCheckReason = "FailedHealthCheck" - reasonFailedInit = "FailedInit" - reasonFailedUpdate = "FailedUpdate" + healthCheckInterval = 5 * time.Minute ) // Controller is responsible for performing actions dependent upon a cluster phase. type Controller struct { - platformClient platformversionedclient.PlatformV1Interface - cache *clusterCache - health *clusterHealth - queue workqueue.RateLimitingInterface - lister platformv1lister.ClusterLister - listerSynced cache.InformerSynced - stopCh <-chan struct{} - clusterDeleter deletion.ClusterDeleterInterface -} - -// obj could be an *platformv1.Cluster, or a DeletionFinalStateUnknown marker item. -func (c *Controller) enqueueCluster(obj interface{}) { - key, err := controllerutil.KeyFunc(obj) - if err != nil { - runtime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) - return - } - c.queue.Add(key) -} - -func (c *Controller) needsUpdate(old *platformv1.Cluster, new *platformv1.Cluster) bool { - if !reflect.DeepEqual(old.Spec, new.Spec) { - return true - } - - if !reflect.DeepEqual(old.Status, new.Status) { - return true - } + queue workqueue.RateLimitingInterface + lister platformv1lister.ClusterLister + listerSynced cache.InformerSynced - return false + log log.Logger + platformClient platformversionedclient.PlatformV1Interface + healthCache mapset.Set + deleter deletion.ClusterDeleterInterface } // NewController creates a new Controller object. @@ -93,13 +75,13 @@ func NewController( clusterInformer platformv1informer.ClusterInformer, resyncPeriod time.Duration, finalizerToken platformv1.FinalizerName) *Controller { - // create the controller so we can inject the enqueue function - controller := &Controller{ + c := &Controller{ + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cluster"), + + log: log.WithName("cluster-controller"), platformClient: platformClient, - cache: &clusterCache{clusterMap: make(map[string]*cachedCluster)}, - health: &clusterHealth{clusterMap: make(map[string]*platformv1.Cluster)}, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cluster"), - clusterDeleter: deletion.NewClusterDeleter(platformClient.Clusters(), + healthCache: mapset.NewSet(), + deleter: deletion.NewClusterDeleter(platformClient.Clusters(), platformClient, finalizerToken, true), @@ -109,32 +91,60 @@ func NewController( _ = metrics.RegisterMetricAndTrackRateLimiterUsage("cluster_controller", platformClient.RESTClient().GetRateLimiter()) } - // configure the namespace informer event handlers clusterInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueCluster, - UpdateFunc: func(oldObj, newObj interface{}) { - oldCluster, ok1 := oldObj.(*platformv1.Cluster) - curCluster, ok2 := newObj.(*platformv1.Cluster) - if ok1 && ok2 && controller.needsUpdate(oldCluster, curCluster) { - controller.enqueueCluster(newObj) - } else { - log.Debug("Update new cluster not to add", log.String("clusterName", curCluster.Name), log.String("resourceversion", curCluster.ResourceVersion), log.String("old-resourceversion", oldCluster.ResourceVersion), log.String("cur-resourceversion", curCluster.ResourceVersion)) - } - }, + AddFunc: c.addCluster, + UpdateFunc: c.updateCluster, }, resyncPeriod, ) - controller.lister = clusterInformer.Lister() - controller.listerSynced = clusterInformer.Informer().HasSynced + c.lister = clusterInformer.Lister() + c.listerSynced = clusterInformer.Informer().HasSynced + + return c +} + +func (c *Controller) addCluster(obj interface{}) { + cluster := obj.(*platformv1.Cluster) + c.log.Info("Adding cluster", "clusterName", cluster.Name) + c.enqueue(cluster) +} - return controller +func (c *Controller) updateCluster(old, obj interface{}) { + oldCluster := old.(*platformv1.Cluster) + cluster := obj.(*platformv1.Cluster) + if !c.needsUpdate(oldCluster, cluster) { + return + } + c.log.Info("Updating cluster", "clusterName", cluster.Name) + c.enqueue(cluster) +} + +func (c *Controller) enqueue(obj *platformv1.Cluster) { + key, err := controllerutil.KeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) + return + } + c.queue.Add(key) +} + +func (c *Controller) needsUpdate(old *platformv1.Cluster, new *platformv1.Cluster) bool { + if !reflect.DeepEqual(old.Spec, new.Spec) { + return true + } + + if !reflect.DeepEqual(old.Status, new.Status) { + return true + } + + return false } // Run will set up the event handlers for types we are interested in, as well // as syncing informer caches and starting workers. func (c *Controller) Run(workers int, stopCh <-chan struct{}) error { - defer runtime.HandleCrash() + defer utilruntime.HandleCrash() defer c.queue.ShutDown() // Start the informer factories to begin populating the informer caches @@ -149,8 +159,6 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error { return fmt.Errorf("failed to wait for cluster caches to sync") } - c.stopCh = stopCh - for i := 0; i < workers; i++ { go wait.Until(c.worker, time.Second, stopCh) } @@ -186,7 +194,7 @@ func (c *Controller) processNextWorkItem() bool { return true } - runtime.HandleError(fmt.Errorf("error processing cluster %v (will retry): %v", key, err)) + utilruntime.HandleError(fmt.Errorf("error processing cluster %v (will retry): %v", key, err)) c.queue.AddRateLimited(key) return true } @@ -196,10 +204,11 @@ func (c *Controller) processNextWorkItem() bool { // namespaces created or deleted. This function is not meant to be invoked // concurrently with the same key. func (c *Controller) syncCluster(key string) error { + logger := c.log.WithValues("cluster", key) + startTime := time.Now() - var cachedCluster *cachedCluster defer func() { - log.Info("Finished syncing cluster", log.String("clusterName", key), log.Duration("processTime", time.Since(startTime))) + logger.Info("Finished syncing cluster", "processTime", time.Since(startTime).String()) }() _, name, err := cache.SplitMetaNamespaceKey(key) @@ -208,155 +217,38 @@ func (c *Controller) syncCluster(key string) error { } cluster, err := c.lister.Get(name) - - switch { - case apierrors.IsNotFound(err): - log.Info("Cluster has been deleted. Attempting to cleanup resources", log.String("clusterName", key)) - err = c.processClusterDeletion(key) - case err != nil: - log.Warn("Unable to retrieve cluster from store", log.String("clusterName", key), log.Err(err)) - default: - if (cluster.Status.Phase == platformv1.ClusterRunning) || (cluster.Status.Phase == platformv1.ClusterFailed) || (cluster.Status.Phase == platformv1.ClusterInitializing) { - cachedCluster = c.cache.getOrCreate(key) - err = c.processClusterUpdate(context.Background(), cachedCluster, cluster, key) - } else if cluster.Status.Phase == platformv1.ClusterTerminating { - log.Info("Cluster has been terminated. Attempting to cleanup resources", log.String("clusterName", key)) - _ = c.processClusterDeletion(key) - err = c.clusterDeleter.Delete(context.Background(), key) - } else { - log.Debug(fmt.Sprintf("Cluster %s status is %s, not to process", key, cluster.Status.Phase), log.String("clusterName", key)) - } - } - return err -} - -func (c *Controller) processClusterUpdate(ctx context.Context, cachedCluster *cachedCluster, cluster *platformv1.Cluster, key string) error { - if cachedCluster.state != nil { - if cachedCluster.state.UID != cluster.UID { - err := c.processClusterDelete(key) - if err != nil { - return err - } - } + if apierrors.IsNotFound(err) { + logger.Info("cluster has been deleted") } - - // start update cluster if needed - err := c.handlePhase(ctx, key, cachedCluster, cluster) if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to retrieve cluster %v from store: %v", key, err)) return err } - cachedCluster.state = cluster - // Always update the cache upon success. - c.cache.set(key, cachedCluster) - - return nil -} - -func (c *Controller) processClusterDeletion(key string) error { - _, ok := c.cache.get(key) - if !ok { - log.Debug("Cluster not in cache even though the watcher thought it was. Ignoring the deletion", log.String("clusterName", key)) - return nil - } - return c.processClusterDelete(key) + return c.reconcile(context.Background(), key, cluster) } -func (c *Controller) processClusterDelete(key string) error { - log.Info("Cluster will be dropped", log.String("clusterName", key)) - - if c.cache.Exist(key) { - log.Info("Delete the cluster cache", log.String("clusterName", key)) - c.cache.delete(key) - } - - if c.health.Exist(key) { - log.Info("Delete the cluster health cache", log.String("clusterName", key)) - c.health.Del(key) - } - - return nil -} - -func (c *Controller) handlePhase(ctx context.Context, key string, cachedCluster *cachedCluster, cluster *platformv1.Cluster) error { - var err error +func (c *Controller) reconcile(ctx context.Context, key string, cluster *platformv1.Cluster) error { + logger := c.log.WithValues("cluster", cluster.Name) if err := c.ensureSyncOldClusterCredential(context.Background(), cluster); err != nil { return fmt.Errorf("sync old ClusterCredential error: %w", err) } + var err error switch cluster.Status.Phase { case platformv1.ClusterInitializing: err = c.onCreate(ctx, cluster) - log.Info("cluster_controller.onCreate", log.String("clusterName", cluster.Name), log.Err(err)) case platformv1.ClusterRunning, platformv1.ClusterFailed: err = c.onUpdate(ctx, cluster) - log.Info("cluster_controller.onUpdate", log.String("clusterName", cluster.Name), log.Err(err)) + case platformv1.ClusterTerminating: + logger.Info("Cluster has been terminated. Attempting to cleanup resources") + err = c.deleter.Delete(context.Background(), key) if err == nil { - c.ensureHealthCheck(ctx, key, cluster) // after update to avoid version conflict + logger.Info("Machine has been successfully deleted") } default: - err = fmt.Errorf("no handler for %q", cluster.Status.Phase) - } - - return err -} - -func (c *Controller) addOrUpdateCondition(cluster *platformv1.Cluster, newCondition platformv1.ClusterCondition) { - var conditions []platformv1.ClusterCondition - exist := false - for _, condition := range cluster.Status.Conditions { - if condition.Type == newCondition.Type { - exist = true - if newCondition.Status != condition.Status { - condition.Status = newCondition.Status - } - if newCondition.Message != condition.Message { - condition.Message = newCondition.Message - } - if newCondition.Reason != condition.Reason { - condition.Reason = newCondition.Reason - } - if !newCondition.LastProbeTime.IsZero() && newCondition.LastProbeTime != condition.LastProbeTime { - condition.LastProbeTime = newCondition.LastProbeTime - } - if !newCondition.LastTransitionTime.IsZero() && newCondition.LastTransitionTime != condition.LastTransitionTime { - condition.LastTransitionTime = newCondition.LastTransitionTime - } - } - conditions = append(conditions, condition) - } - if !exist { - if newCondition.LastProbeTime.IsZero() { - newCondition.LastProbeTime = metav1.Now() - } - if newCondition.LastTransitionTime.IsZero() { - newCondition.LastTransitionTime = metav1.Now() - } - conditions = append(conditions, newCondition) - } - cluster.Status.Conditions = conditions -} - -func (c *Controller) persistUpdate(ctx context.Context, cluster *platformv1.Cluster) error { - var err error - for i := 0; i < clusterClientRetryCount; i++ { - _, err = c.platformClient.Clusters().UpdateStatus(ctx, cluster, metav1.UpdateOptions{}) - if err == nil { - return nil - } - // if the object no longer exists, we don't want to recreate it. Just bail - // out so that we can process the delete, which we should soon be receiving - // if we haven't already - if apierrors.IsNotFound(err) { - log.Info("Not persisting update to cluster set that no longer exists", log.String("clusterName", cluster.Name), log.Err(err)) - return nil - } - if apierrors.IsConflict(err) { - return fmt.Errorf("not persisting update to cluster '%s' that has been changed since we received it: %v", cluster.ClusterName, err) - } - log.Warn("Failed to persist updated status of cluster", log.String("clusterName", cluster.ClusterName), log.Err(err)) - time.Sleep(clusterClientRetryInterval) + logger.Info("unknown cluster phase", "status.phase", cluster.Status.Phase) } return err @@ -375,30 +267,11 @@ func (c *Controller) onCreate(ctx context.Context, cluster *platformv1.Cluster) return err } + // If any error happens, return error for retry. for clusterWrapper.Status.Phase == platformv1.ClusterInitializing { err = provider.OnCreate(ctx, clusterWrapper) - if err != nil { - clusterWrapper.Status.Message = err.Error() - clusterWrapper.Status.Reason = reasonFailedInit - _, _ = c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{}) - return err - } - condition := clusterWrapper.Status.Conditions[len(clusterWrapper.Status.Conditions)-1] - if condition.Status == platformv1.ConditionFalse { // means current condition run into error - clusterWrapper.Status.Message = condition.Message - clusterWrapper.Status.Reason = condition.Reason - _, _ = c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{}) - return fmt.Errorf("Provider.OnCreate.%s [Failed] reason: %s message: %s", - condition.Type, condition.Reason, condition.Message) - } - clusterWrapper.Status.Message = "" - clusterWrapper.Status.Reason = "" - - clusterWrapper.ClusterCredential, err = c.platformClient.ClusterCredentials().Update(ctx, clusterWrapper.ClusterCredential, metav1.UpdateOptions{}) - if err != nil { - return err - } - clusterWrapper.Cluster, err = c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{}) + _, err = c.platformClient.ClusterCredentials().Update(ctx, clusterWrapper.ClusterCredential, metav1.UpdateOptions{}) + _, err = c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{}) if err != nil { return err } @@ -417,20 +290,12 @@ func (c *Controller) onUpdate(ctx context.Context, cluster *platformv1.Cluster) if err != nil { return err } + + // If any error happens, return error for retry. err = provider.OnUpdate(ctx, clusterWrapper) + _, err = c.platformClient.ClusterCredentials().Update(ctx, clusterWrapper.ClusterCredential, metav1.UpdateOptions{}) + _, err = c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{}) if err != nil { - cluster.Status.Message = err.Error() - cluster.Status.Reason = reasonFailedUpdate - _, _ = c.platformClient.Clusters().Update(ctx, cluster, metav1.UpdateOptions{}) - return err - } - clusterWrapper.Status.Message = "" - clusterWrapper.Status.Reason = "" - - if _, err := c.platformClient.ClusterCredentials().Update(ctx, clusterWrapper.ClusterCredential, metav1.UpdateOptions{}); err != nil { - return err - } - if _, err := c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{}); err != nil { return err } @@ -501,3 +366,85 @@ func (c *Controller) ensureClusterCredential(ctx context.Context, cluster *platf return nil } + +func (c *Controller) ensureStartHealthCheck(ctx context.Context, key string) { + if c.healthCache.Contains(key) { + return + } + logger := c.log.WithName("health-check").WithValues("cluster", key) + logger.Info("Start health check loop") + time.Sleep(time.Duration(rand.Intn(100)) * time.Microsecond) + go wait.PollImmediateInfinite(healthCheckInterval, c.watchHealth(ctx, key)) + c.healthCache.Add(key) +} + +// watchHealth check cluster health when phase in Running or Failed. +// Avoid affecting state machine operation. +func (c *Controller) watchHealth(ctx context.Context, key string) func() (bool, error) { + return func() (bool, error) { + logger := c.log.WithName("health-check").WithValues("cluster", key) + + cluster, err := c.lister.Get(key) + if err != nil { + if apierrors.IsNotFound(err) { + logger.Info("Stop health check because cluster has been deleted") + c.healthCache.Remove(key) + return true, nil + } + return false, nil + } + + if !(cluster.Status.Phase == platformv1.ClusterRunning || cluster.Status.Phase == platformv1.ClusterFailed) { + return false, nil + } + + err = c.checkHealth(ctx, cluster) + if err != nil { + logger.Error(err, "Check health error") + } + + return false, nil + } +} + +func (c *Controller) checkHealth(ctx context.Context, cluster *platformv1.Cluster) error { + oldCluster := cluster.DeepCopy() + + healthCheckCondition := platformv1.ClusterCondition{ + Type: conditionTypeHealthCheck, + Status: platformv1.ConditionFalse, + } + client, err := util.BuildExternalClientSet(ctx, cluster, c.platformClient) + if err != nil { + cluster.Status.Phase = platformv1.ClusterFailed + + healthCheckCondition.Reason = failedHealthCheckReason + healthCheckCondition.Message = err.Error() + } else { + version, err := client.Discovery().ServerVersion() + if err != nil { + cluster.Status.Phase = platformv1.ClusterFailed + + healthCheckCondition.Reason = failedHealthCheckReason + healthCheckCondition.Message = err.Error() + } else { + cluster.Status.Phase = platformv1.ClusterRunning + cluster.Status.Version = version.String() + + healthCheckCondition.Status = platformv1.ConditionTrue + } + } + + cluster.SetCondition(healthCheckCondition) + + patchBytes, err := strategicpatch.GetPatchBytes(oldCluster, cluster) + if err != nil { + return fmt.Errorf("GetPatchBytes error: %w", err) + } + _, err = c.platformClient.Clusters().Patch(ctx, cluster.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("update cluster health status error: %w", err) + } + + return nil +} diff --git a/pkg/platform/controller/cluster/cluster_health.go b/pkg/platform/controller/cluster/cluster_health.go deleted file mode 100644 index 643b28021..000000000 --- a/pkg/platform/controller/cluster/cluster_health.go +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Tencent is pleased to support the open source community by making TKEStack - * available. - * - * Copyright (C) 2012-2019 Tencent. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * https://opensource.org/licenses/Apache-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package cluster - -import ( - "context" - "math/rand" - "sync" - "time" - - "github.com/blang/semver" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - v1 "tkestack.io/tke/api/platform/v1" - "tkestack.io/tke/pkg/platform/util" - "tkestack.io/tke/pkg/util/log" -) - -const conditionTypeHealthCheck = "HealthCheck" -const conditionTypeSyncVersion = "SyncVersion" -const reasonHealthCheckFail = "HealthCheckFail" - -type clusterHealth struct { - mu sync.Mutex - clusterMap map[string]*v1.Cluster -} - -func (s *clusterHealth) Exist(key string) bool { - s.mu.Lock() - defer s.mu.Unlock() - _, ok := s.clusterMap[key] - return ok -} - -func (s *clusterHealth) Set(key string, cluster *v1.Cluster) { - s.mu.Lock() - defer s.mu.Unlock() - s.clusterMap[key] = cluster -} - -func (s *clusterHealth) Del(key string) { - s.mu.Lock() - defer s.mu.Unlock() - delete(s.clusterMap, key) -} - -func (c *Controller) ensureHealthCheck(ctx context.Context, key string, cluster *v1.Cluster) { - if c.health.Exist(key) { - return - } - log.Info("start health check for cluster", log.String("clusterName", key), log.String("phase", string(cluster.Status.Phase))) - c.health.Set(key, cluster) - time.Sleep(time.Duration(rand.Intn(100)) * time.Microsecond) - go wait.PollImmediateUntil(5*time.Minute, c.watchClusterHealth(ctx, cluster.Name), c.stopCh) -} - -func (c *Controller) checkClusterHealth(ctx context.Context, cluster *v1.Cluster) error { - // wait for create clustercredential, optimize first health check for user experience - if cluster.Status.Phase == v1.ClusterInitializing { - err := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { - _, err := util.GetClusterCredentialV1(ctx, c.platformClient, cluster) - if err != nil { - return false, nil - } - return true, nil - }) - if err != nil { // not return! execute next steps to show reason for user - log.Warn("wait for create clustercredential error", log.String("clusterName", cluster.Name)) - } - } - kubeClient, err := util.BuildExternalClientSet(ctx, cluster, c.platformClient) - if err != nil { - cluster.Status.Phase = v1.ClusterFailed - cluster.Status.Message = err.Error() - cluster.Status.Reason = reasonHealthCheckFail - now := metav1.Now() - c.addOrUpdateCondition(cluster, v1.ClusterCondition{ - Type: conditionTypeHealthCheck, - Status: v1.ConditionFalse, - Message: err.Error(), - Reason: reasonHealthCheckFail, - LastTransitionTime: now, - LastProbeTime: now, - }) - if err1 := c.persistUpdate(ctx, cluster); err1 != nil { - log.Warn("Update cluster status failed", log.String("clusterName", cluster.Name), log.Err(err1)) - return err1 - } - log.Warn("Failed to build the cluster client", log.String("clusterName", cluster.Name), log.Err(err)) - return err - } - - _, err = kubeClient.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) - if err != nil { - cluster.Status.Phase = v1.ClusterFailed - cluster.Status.Message = err.Error() - cluster.Status.Reason = reasonHealthCheckFail - c.addOrUpdateCondition(cluster, v1.ClusterCondition{ - Type: conditionTypeHealthCheck, - Status: v1.ConditionFalse, - Message: err.Error(), - Reason: reasonHealthCheckFail, - LastProbeTime: metav1.Now(), - }) - } else { - cluster.Status.Phase = v1.ClusterRunning - cluster.Status.Message = "" - cluster.Status.Reason = "" - c.addOrUpdateCondition(cluster, v1.ClusterCondition{ - Type: conditionTypeHealthCheck, - Status: v1.ConditionTrue, - Message: "", - Reason: "", - LastProbeTime: metav1.Now(), - }) - - // update version info - if cluster.Status.Version == "" { - log.Debug("Update version info", log.String("clusterName", cluster.Name)) - if version, err := kubeClient.ServerVersion(); err == nil { - entireVersion, err := semver.ParseTolerant(version.GitVersion) - if err != nil { - return err - } - pureVersion := semver.Version{Major: entireVersion.Major, Minor: entireVersion.Minor, Patch: entireVersion.Patch} - log.Info("Set cluster version", log.String("clusterName", cluster.Name), log.String("version", pureVersion.String()), log.String("entireVersion", entireVersion.String())) - cluster.Status.Version = pureVersion.String() - now := metav1.Now() - c.addOrUpdateCondition(cluster, v1.ClusterCondition{ - Type: conditionTypeSyncVersion, - Status: v1.ConditionTrue, - Message: "", - Reason: "", - LastProbeTime: now, - LastTransitionTime: now, - }) - } - } - } - - if err := c.persistUpdate(ctx, cluster); err != nil { - log.Error("Update cluster status failed", log.String("clusterName", cluster.Name), log.Err(err)) - return err - } - return err -} - -// -// // cal the cluster's capacity , allocatable and allocated resource -// func (c *Controller) caclClusterResource(kubeClient *kubernetes.Clientset) (*v1.ClusterResource, error) { -// // cal the node's capacity and allocatable -// var cpuCapacity, memoryCapcity, cpuAllocatable, memoryAllocatable, cpuAllocated, memoryAllocated resource.Quantity -// -// for { -// nodeList, err := kubeClient.CoreV1().Nodes().List(metav1.ListOptions{Limit: int64(300)}) -// if err != nil { -// return &v1.ClusterResource{}, err -// } -// -// for _, node := range nodeList.Items { -// for resourceName, capacity := range node.Status.Capacity { -// if resourceName.String() == string(resourceutil.CPU) { -// cpuCapacity.Add(capacity) -// } -// if resourceName.String() == string(resourceutil.Memory) { -// memoryCapcity.Add(capacity) -// } -// } -// -// for resourceName, allocatable := range node.Status.Allocatable { -// if resourceName.String() == string(resourceutil.CPU) { -// cpuAllocatable.Add(allocatable) -// } -// if resourceName.String() == string(resourceutil.Memory) { -// memoryAllocatable.Add(allocatable) -// } -// } -// } -// -// if nodeList.Continue == "" { -// break -// } -// } -// -// // cal the pods's request resource as allocated resource -// for { -// podsList, err := kubeClient.CoreV1().Pods("").List(metav1.ListOptions{Limit: int64(500)}) -// if err != nil { -// return &v1.ClusterResource{}, err -// } -// -// for _, pod := range podsList.Items { -// // same with kubectl skip those pods in failed or succeeded status -// if pod.Status.Phase == coreV1.PodFailed || pod.Status.Phase == coreV1.PodSucceeded { -// continue -// } -// for _, container := range pod.Spec.Containers { -// for resourceName, allocated := range container.Resources.Requests { -// if resourceName.String() == string(resourceutil.CPU) { -// cpuAllocated.Add(allocated) -// } -// if resourceName.String() == string(resourceutil.Memory) { -// memoryAllocated.Add(allocated) -// } -// } -// } -// } -// -// if podsList.Continue == "" { -// break -// } -// } -// result := &v1.ClusterResource{ -// Capacity: v1.ResourceList{ -// string(resourceutil.CPU): cpuCapacity, -// string(resourceutil.Memory): memoryCapcity, -// }, -// Allocatable: v1.ResourceList{ -// string(resourceutil.CPU): cpuAllocatable, -// string(resourceutil.Memory): memoryAllocatable, -// }, -// Allocated: v1.ResourceList{ -// string(resourceutil.CPU): cpuAllocated, -// string(resourceutil.Memory): memoryAllocated, -// }, -// } -// return result, nil -// } - -// for PollImmediateUntil, when return true ,an err while exit -func (c *Controller) watchClusterHealth(ctx context.Context, clusterName string) func() (bool, error) { - return func() (bool, error) { - log.Info("Check cluster health", log.String("clusterName", clusterName)) - - cluster, err := c.platformClient.Clusters().Get(ctx, clusterName, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - log.Warn("Cluster not found, to exit the health check loop", log.String("clusterName", clusterName)) - return true, nil - } - log.Error("Check cluster health, cluster get failed", log.String("clusterName", clusterName), log.Err(err)) - return false, nil - } - - if cluster.Status.Phase == v1.ClusterTerminating { - log.Warn("Cluster status is Terminating, to exit the health check loop", log.String("clusterName", cluster.Name)) - return true, nil - } - - _ = c.checkClusterHealth(ctx, cluster) - return false, nil - } -} diff --git a/pkg/platform/provider/cluster/interface.go b/pkg/platform/provider/cluster/interface.go index 2d46d3b5e..e53833ac6 100644 --- a/pkg/platform/provider/cluster/interface.go +++ b/pkg/platform/provider/cluster/interface.go @@ -27,7 +27,6 @@ import ( "strings" "github.com/thoas/go-funk" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apiserver/pkg/server/mux" platformv1 "tkestack.io/tke/api/platform/v1" @@ -37,10 +36,11 @@ import ( ) const ( - ReasonFailedProcess = "FailedProcess" - ReasonWaitingProcess = "WaitingProcess" - ReasonSuccessfulProcess = "SuccessfulProcess" - ReasonSkipProcess = "SkipProcess" + ReasonWaiting = "Waiting" + ReasonSkip = "Skip" + ReasonFailedInit = "FailedInit" + ReasonFailedUpdate = "FailedUpdate" + ReasonFailedDelete = "FailedDelete" ConditionTypeDone = "EnsureDone" ) @@ -79,6 +79,15 @@ var _ Provider = &DelegateProvider{} type Handler func(context.Context, *v1.Cluster) error +func (h Handler) Name() string { + name := runtime.FuncForPC(reflect.ValueOf(h).Pointer()).Name() + i := strings.LastIndex(name, ".") + if i == -1 { + return "Unknown" + } + return strings.TrimSuffix(name[i+1:], "-fm") +} + type DelegateProvider struct { ProviderName string @@ -139,43 +148,34 @@ func (p *DelegateProvider) OnCreate(ctx context.Context, cluster *v1.Cluster) er return err } - now := metav1.Now() if cluster.Spec.Features.SkipConditions != nil && funk.ContainsString(cluster.Spec.Features.SkipConditions, condition.Type) { cluster.SetCondition(platformv1.ClusterCondition{ - Type: condition.Type, - Status: platformv1.ConditionTrue, - LastProbeTime: now, - LastTransitionTime: now, - Reason: ReasonSkipProcess, + Type: condition.Type, + Status: platformv1.ConditionTrue, + Reason: ReasonSkip, + Message: "Skip current condition", }) } else { - f := p.getCreateHandler(condition.Type) - if f == nil { + handler := p.getCreateHandler(condition.Type) + if handler == nil { return fmt.Errorf("can't get handler by %s", condition.Type) } - log.Infow("OnCreate", "handler", runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), - "clusterName", cluster.Name) - err = f(ctx, cluster) + log.Infow("OnCreate", "handler", handler.Name(), "cluster", cluster.Name) + err = handler(ctx, cluster) if err != nil { cluster.SetCondition(platformv1.ClusterCondition{ - Type: condition.Type, - Status: platformv1.ConditionFalse, - LastProbeTime: now, - Message: err.Error(), - Reason: ReasonFailedProcess, + Type: condition.Type, + Status: platformv1.ConditionFalse, + Message: err.Error(), + Reason: ReasonFailedInit, }) - cluster.Status.Reason = ReasonFailedProcess - cluster.Status.Message = err.Error() return nil } cluster.SetCondition(platformv1.ClusterCondition{ - Type: condition.Type, - Status: platformv1.ConditionTrue, - LastProbeTime: now, - LastTransitionTime: now, - Reason: ReasonSuccessfulProcess, + Type: condition.Type, + Status: platformv1.ConditionTrue, }) } @@ -187,12 +187,10 @@ func (p *DelegateProvider) OnCreate(ctx context.Context, cluster *v1.Cluster) er } } else { cluster.SetCondition(platformv1.ClusterCondition{ - Type: nextConditionType, - Status: platformv1.ConditionUnknown, - LastProbeTime: now, - LastTransitionTime: now, - Message: "waiting process", - Reason: ReasonWaitingProcess, + Type: nextConditionType, + Status: platformv1.ConditionUnknown, + Message: "waiting execute", + Reason: ReasonWaiting, }) } @@ -200,27 +198,33 @@ func (p *DelegateProvider) OnCreate(ctx context.Context, cluster *v1.Cluster) er } func (p *DelegateProvider) OnUpdate(ctx context.Context, cluster *v1.Cluster) error { - for _, f := range p.UpdateHandlers { - log.Infow("OnUpdate", "handler", runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), - "clusterName", cluster.Name) - err := f(ctx, cluster) + for _, handler := range p.UpdateHandlers { + log.Infow("OnUpdate", "handler", handler.Name(), "cluster", cluster.Name) + err := handler(ctx, cluster) if err != nil { + cluster.Status.Reason = ReasonFailedUpdate + cluster.Status.Message = fmt.Sprintf("%s error: %v", handler.Name(), err) return err } } + cluster.Status.Reason = "" + cluster.Status.Message = "" return nil } func (p *DelegateProvider) OnDelete(ctx context.Context, cluster *v1.Cluster) error { - for _, f := range p.DeleteHandlers { - log.Infow("OnDelete", "handler", runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), - "clusterName", cluster.Name) - err := f(ctx, cluster) + for _, handler := range p.DeleteHandlers { + log.Infow("OnDelete", "handler", handler.Name(), "cluster", cluster.Name) + err := handler(ctx, cluster) if err != nil { + cluster.Status.Reason = ReasonFailedDelete + cluster.Status.Message = fmt.Sprintf("%s error: %v", handler.Name(), err) return err } } + cluster.Status.Reason = "" + cluster.Status.Message = "" return nil } @@ -229,23 +233,13 @@ func (p *DelegateProvider) OnRunning(ctx context.Context, cluster *v1.Cluster) e return nil } -func (h Handler) name() string { - name := runtime.FuncForPC(reflect.ValueOf(h).Pointer()).Name() - i := strings.Index(name, "Ensure") - if i == -1 { - return "" - } - return strings.TrimSuffix(name[i:], "-fm") -} - func (p *DelegateProvider) getNextConditionType(conditionType string) string { var ( - i int - f Handler + i int + handler Handler ) - for i, f = range p.CreateHandlers { - name := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() - if strings.Contains(name, conditionType+"-fm") { + for i, handler = range p.CreateHandlers { + if handler.Name() == conditionType { break } } @@ -254,13 +248,13 @@ func (p *DelegateProvider) getNextConditionType(conditionType string) string { } next := p.CreateHandlers[i+1] - return next.name() + return next.Name() } func (p *DelegateProvider) getCreateHandler(conditionType string) Handler { - for _, f := range p.CreateHandlers { - if conditionType == f.name() { - return f + for _, handler := range p.CreateHandlers { + if conditionType == handler.Name() { + return handler } } @@ -277,11 +271,10 @@ func (p *DelegateProvider) getCreateCurrentCondition(c *v1.Cluster) (*platformv1 if len(c.Status.Conditions) == 0 { return &platformv1.ClusterCondition{ - Type: p.CreateHandlers[0].name(), - Status: platformv1.ConditionUnknown, - LastProbeTime: metav1.Now(), - Message: "waiting process", - Reason: ReasonWaitingProcess, + Type: p.CreateHandlers[0].Name(), + Status: platformv1.ConditionUnknown, + Message: "waiting process", + Reason: ReasonWaiting, }, nil }