Skip to content

Commit

Permalink
fix: compat with 1.18 for cluster controller
Browse files Browse the repository at this point in the history
  • Loading branch information
choujimmy committed May 12, 2020
1 parent 8a7babb commit de3e051
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 40 deletions.
12 changes: 6 additions & 6 deletions pkg/monitor/services/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (h *processor) Create(req *restful.Request, resp *restful.Response) {
ruleGroup.Rules = append(ruleGroup.Rules, rule)
}
}
err = h.prometheusProcessor.CreateGroup(clusterName, entityName, ruleGroup)
err = h.prometheusProcessor.CreateGroup(req.Request.Context(), clusterName, entityName, ruleGroup)
if err != nil {
result.Err = err.Error()
return
Expand Down Expand Up @@ -234,7 +234,7 @@ func (h *processor) Delete(req *restful.Request, resp *restful.Response) {

entityName = alarmPolicyName

err := h.prometheusProcessor.DeleteGroup(clusterName, alarmPolicyName)
err := h.prometheusProcessor.DeleteGroup(req.Request.Context(), clusterName, alarmPolicyName)
if err != nil {
result.Err = err.Error()
return
Expand Down Expand Up @@ -271,7 +271,7 @@ func (h *processor) Update(req *restful.Request, resp *restful.Response) {

entityName = alarmPolicyName

oldRuleGroup, err := h.prometheusProcessor.GetGroup(clusterName, alarmPolicyName)
oldRuleGroup, err := h.prometheusProcessor.GetGroup(req.Request.Context(), clusterName, alarmPolicyName)
if err != nil {
result.Err = err.Error()
return
Expand Down Expand Up @@ -320,7 +320,7 @@ func (h *processor) Update(req *restful.Request, resp *restful.Response) {
}
}

err = h.prometheusProcessor.UpdateGroup(clusterName, alarmPolicyName, ruleGroup)
err = h.prometheusProcessor.UpdateGroup(req.Request.Context(), clusterName, alarmPolicyName, ruleGroup)
if err != nil {
result.Err = err.Error()
return
Expand Down Expand Up @@ -356,7 +356,7 @@ func (h *processor) Get(req *restful.Request, resp *restful.Response) {
}
entityName = alarmPolicyName

ruleGroup, err := h.prometheusProcessor.GetGroup(clusterName, alarmPolicyName)
ruleGroup, err := h.prometheusProcessor.GetGroup(req.Request.Context(), clusterName, alarmPolicyName)
if err != nil {
result.Err = err.Error()
return
Expand Down Expand Up @@ -400,7 +400,7 @@ func (h *processor) List(req *restful.Request, resp *restful.Response) {
log.Infof("invalid page_size: %s", req.QueryParameter("page_size"))
pageSize = 10
}
ruleGroups, err := h.prometheusProcessor.ListGroups(clusterName)
ruleGroups, err := h.prometheusProcessor.ListGroups(req.Request.Context(), clusterName)
if err != nil {
result.Err = err.Error()
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/monitor/services/prometheus/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (h *processor) UpdateRule(ctx context.Context, clusterName, groupName, reco

groups := ruleOp.SavePromRule()

err = h.saveRule(clusterName, groups)
err = h.saveRule(ctx, clusterName, groups)
if err != nil {
return errors.Wrapf(err, "failed to save configmap")
}
Expand Down
67 changes: 34 additions & 33 deletions pkg/platform/controller/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package cluster

import (
"context"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -200,7 +201,7 @@ func (c *Controller) syncCluster(key string) error {
cluster, err := c.lister.Get(name)

if err == nil {
if err := c.ensureSyncOldClusterCredential(cluster); err != nil {
if err := c.ensureSyncOldClusterCredential(context.Background(), cluster); err != nil {
return fmt.Errorf("sync old ClusterCredential error: %w", err)
}
}
Expand All @@ -214,19 +215,19 @@ func (c *Controller) syncCluster(key string) error {
default:
if (cluster.Status.Phase == platformv1.ClusterRunning) || (cluster.Status.Phase == platformv1.ClusterFailed) || (cluster.Status.Phase == platformv1.ClusterInitializing) {
cachedCluster = c.cache.getOrCreate(key)
err = c.processClusterUpdate(cachedCluster, cluster, key)
err = c.processClusterUpdate(context.Background(), cachedCluster, cluster, key)
} else if cluster.Status.Phase == platformv1.ClusterTerminating {
log.Info("Cluster has been terminated. Attempting to cleanup resources", log.String("clusterName", key))
_ = c.processClusterDeletion(key)
err = c.clusterDeleter.Delete(key)
err = c.clusterDeleter.Delete(context.Background(), key)
} else {
log.Debug(fmt.Sprintf("Cluster %s status is %s, not to process", key, cluster.Status.Phase), log.String("clusterName", key))
}
}
return err
}

func (c *Controller) processClusterUpdate(cachedCluster *cachedCluster, cluster *platformv1.Cluster, key string) error {
func (c *Controller) processClusterUpdate(ctx context.Context, cachedCluster *cachedCluster, cluster *platformv1.Cluster, key string) error {
if cachedCluster.state != nil {
if cachedCluster.state.UID != cluster.UID {
err := c.processClusterDelete(key)
Expand All @@ -237,7 +238,7 @@ func (c *Controller) processClusterUpdate(cachedCluster *cachedCluster, cluster
}

// start update cluster if needed
err := c.handlePhase(key, cachedCluster, cluster)
err := c.handlePhase(ctx, key, cachedCluster, cluster)
if err != nil {
return err
}
Expand Down Expand Up @@ -274,18 +275,18 @@ func (c *Controller) processClusterDelete(key string) error {
return nil
}

func (c *Controller) handlePhase(key string, cachedCluster *cachedCluster, cluster *platformv1.Cluster) error {
func (c *Controller) handlePhase(ctx context.Context, key string, cachedCluster *cachedCluster, cluster *platformv1.Cluster) error {
var err error

switch cluster.Status.Phase {
case platformv1.ClusterInitializing:
err = c.onCreate(cluster)
err = c.onCreate(ctx, cluster)
log.Info("cluster_controller.onCreate", log.String("clusterName", cluster.Name), log.Err(err))
case platformv1.ClusterRunning, platformv1.ClusterFailed:
err = c.onUpdate(cluster)
err = c.onUpdate(ctx, cluster)
log.Info("cluster_controller.onUpdate", log.String("clusterName", cluster.Name), log.Err(err))
if err == nil {
c.ensureHealthCheck(key, cluster) // after update to avoid version conflict
c.ensureHealthCheck(ctx, key, cluster) // after update to avoid version conflict
}
default:
err = fmt.Errorf("no handler for %q", cluster.Status.Phase)
Expand Down Expand Up @@ -330,10 +331,10 @@ func (c *Controller) addOrUpdateCondition(cluster *platformv1.Cluster, newCondit
cluster.Status.Conditions = conditions
}

func (c *Controller) persistUpdate(cluster *platformv1.Cluster) error {
func (c *Controller) persistUpdate(ctx context.Context, cluster *platformv1.Cluster) error {
var err error
for i := 0; i < clusterClientRetryCount; i++ {
_, err = c.platformClient.Clusters().UpdateStatus(cluster)
_, err = c.platformClient.Clusters().UpdateStatus(ctx, cluster, metav1.UpdateOptions{})
if err == nil {
return nil
}
Expand All @@ -354,43 +355,43 @@ func (c *Controller) persistUpdate(cluster *platformv1.Cluster) error {
return err
}

func (c *Controller) onCreate(cluster *platformv1.Cluster) error {
func (c *Controller) onCreate(ctx context.Context, cluster *platformv1.Cluster) error {
provider, err := clusterprovider.GetProvider(cluster.Spec.Type)
if err != nil {
return err
}
if err := c.ensureClusterCredential(cluster); err != nil {
if err := c.ensureClusterCredential(ctx, cluster); err != nil {
return fmt.Errorf("ensureClusterCredential error: %w", err)
}
clusterWrapper, err := typesv1.GetCluster(c.platformClient, cluster)
clusterWrapper, err := typesv1.GetCluster(ctx, c.platformClient, cluster)
if err != nil {
return err
}

for clusterWrapper.Status.Phase == platformv1.ClusterInitializing {
err = provider.OnCreate(clusterWrapper)
err = provider.OnCreate(ctx, clusterWrapper)
if err != nil {
clusterWrapper.Status.Message = err.Error()
clusterWrapper.Status.Reason = reasonFailedInit
_, _ = c.platformClient.Clusters().Update(clusterWrapper.Cluster)
_, _ = c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{})
return err
}
condition := clusterWrapper.Status.Conditions[len(clusterWrapper.Status.Conditions)-1]
if condition.Status == platformv1.ConditionFalse { // means current condition run into error
clusterWrapper.Status.Message = condition.Message
clusterWrapper.Status.Reason = condition.Reason
_, _ = c.platformClient.Clusters().Update(clusterWrapper.Cluster)
_, _ = c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{})
return fmt.Errorf("Provider.OnCreate.%s [Failed] reason: %s message: %s",
condition.Type, condition.Reason, condition.Message)
}
clusterWrapper.Status.Message = ""
clusterWrapper.Status.Reason = ""

clusterWrapper.ClusterCredential, err = c.platformClient.ClusterCredentials().Update(clusterWrapper.ClusterCredential)
clusterWrapper.ClusterCredential, err = c.platformClient.ClusterCredentials().Update(ctx, clusterWrapper.ClusterCredential, metav1.UpdateOptions{})
if err != nil {
return err
}
clusterWrapper.Cluster, err = c.platformClient.Clusters().Update(clusterWrapper.Cluster)
clusterWrapper.Cluster, err = c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{})
if err != nil {
return err
}
Expand All @@ -399,44 +400,44 @@ func (c *Controller) onCreate(cluster *platformv1.Cluster) error {
return nil
}

func (c *Controller) onUpdate(cluster *platformv1.Cluster) error {
func (c *Controller) onUpdate(ctx context.Context, cluster *platformv1.Cluster) error {
provider, err := clusterprovider.GetProvider(cluster.Spec.Type)
if err != nil {
return err
}

clusterWrapper, err := typesv1.GetCluster(c.platformClient, cluster)
clusterWrapper, err := typesv1.GetCluster(ctx, c.platformClient, cluster)
if err != nil {
return err
}
err = provider.OnUpdate(clusterWrapper)
err = provider.OnUpdate(ctx, clusterWrapper)
if err != nil {
cluster.Status.Message = err.Error()
cluster.Status.Reason = reasonFailedUpdate
_, _ = c.platformClient.Clusters().Update(cluster)
_, _ = c.platformClient.Clusters().Update(ctx, cluster, metav1.UpdateOptions{})
return err
}
clusterWrapper.Status.Message = ""
clusterWrapper.Status.Reason = ""

if _, err := c.platformClient.ClusterCredentials().Update(clusterWrapper.ClusterCredential); err != nil {
if _, err := c.platformClient.ClusterCredentials().Update(ctx, clusterWrapper.ClusterCredential, metav1.UpdateOptions{}); err != nil {
return err
}
if _, err := c.platformClient.Clusters().Update(clusterWrapper.Cluster); err != nil {
if _, err := c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{}); err != nil {
return err
}

return nil
}

// ensureSyncOldClusterCredential using for sync old cluster without ClusterCredentialRef, will remove in next release.
func (c *Controller) ensureSyncOldClusterCredential(cluster *platformv1.Cluster) error {
func (c *Controller) ensureSyncOldClusterCredential(ctx context.Context, cluster *platformv1.Cluster) error {
if cluster.Spec.ClusterCredentialRef != nil {
return nil
}

fieldSelector := fields.OneTermEqualSelector("clusterName", cluster.Name).String()
clusterCredentials, err := c.platformClient.ClusterCredentials().List(metav1.ListOptions{FieldSelector: fieldSelector})
clusterCredentials, err := c.platformClient.ClusterCredentials().List(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
if err != nil {
return err
}
Expand All @@ -450,15 +451,15 @@ func (c *Controller) ensureSyncOldClusterCredential(cluster *platformv1.Cluster)
}
credential := &clusterCredentials.Items[0]
cluster.Spec.ClusterCredentialRef = &corev1.LocalObjectReference{Name: credential.Name}
cluster, err = c.platformClient.Clusters().Update(cluster)
cluster, err = c.platformClient.Clusters().Update(ctx, cluster, metav1.UpdateOptions{})
if err != nil {
return err
}

return nil
}

func (c *Controller) ensureClusterCredential(cluster *platformv1.Cluster) error {
func (c *Controller) ensureClusterCredential(ctx context.Context, cluster *platformv1.Cluster) error {
if cluster.Spec.ClusterCredentialRef == nil {
// Deprecated: will remove in next release
if cluster.Spec.Type == "Imported" { // don't precreate ClusterCredential for Imported cluster
Expand All @@ -469,23 +470,23 @@ func (c *Controller) ensureClusterCredential(cluster *platformv1.Cluster) error
TenantID: cluster.Spec.TenantID,
ClusterName: cluster.Name,
}
credential, err := c.platformClient.ClusterCredentials().Create(credential)
credential, err := c.platformClient.ClusterCredentials().Create(ctx, credential, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
cluster.Spec.ClusterCredentialRef = &corev1.LocalObjectReference{Name: credential.Name}
cluster, err = c.platformClient.Clusters().Update(cluster)
cluster, err = c.platformClient.Clusters().Update(ctx, cluster, metav1.UpdateOptions{})
if err != nil {
return err
}
} else {
credential, err := c.platformClient.ClusterCredentials().Get(cluster.Spec.ClusterCredentialRef.Name, metav1.GetOptions{})
credential, err := c.platformClient.ClusterCredentials().Get(ctx, cluster.Spec.ClusterCredentialRef.Name, metav1.GetOptions{})
if err != nil {
return err
}
if credential.ClusterName != cluster.Name {
credential.ClusterName = cluster.Name
_, err = c.platformClient.ClusterCredentials().Update(credential)
_, err = c.platformClient.ClusterCredentials().Update(ctx, credential, metav1.UpdateOptions{})
if err != nil {
return err
}
Expand Down

0 comments on commit de3e051

Please sign in to comment.