Skip to content

Commit

Permalink
feat(platform): update anywhere provider process (#2036)
Browse files Browse the repository at this point in the history
  • Loading branch information
wl-chen committed Jul 26, 2022
1 parent 3d8d46d commit f153fe0
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 0 deletions.
4 changes: 4 additions & 0 deletions api/platform/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ const (
AnywhereMachinesAnno = "tkestack.io/anywhere-machines"
// ClusterNameLable contains related cluster's name for no-cluster resources
ClusterNameLable = "tkestack.io/cluster-name"
// HubAPIServerAnno describe hub cluster api server url
HubAPIServerAnno = "tkestack.io/hub-api-server"
// cluster credential token
CredentialTokenAnno = "tkestack.io/credential-token"
)

// KubeVendorType describe the kubernetes provider of the cluster
Expand Down
4 changes: 4 additions & 0 deletions api/platform/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ const (
AnywhereMachinesAnno = "tkestack.io/anywhere-machines"
// ClusterNameLable contains related cluster's name for no-cluster resources
ClusterNameLable = "tkestack.io/cluster-name"
// HubAPIServerAnno describe hub cluster api server url
HubAPIServerAnno = "tkestack.io/hub-api-server"
// cluster credential token
CredentialTokenAnno = "tkestack.io/credential-token"
)

// KubeVendorType describe the kubernetes provider of the cluster
Expand Down
223 changes: 223 additions & 0 deletions pkg/platform/provider/baremetal/cluster/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
"fmt"
"io/ioutil"
"math/rand"
"net/url"
"os"
"path"
"reflect"
"strconv"
"strings"
"time"

Expand All @@ -40,8 +42,10 @@ import (
"github.com/thoas/go-funk"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
bootstraputil "k8s.io/cluster-bootstrap/token/util"
kubeaggregatorclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
Expand Down Expand Up @@ -389,6 +393,10 @@ func completeServiceIP(cluster *v1.Cluster) error {
}

func completeAddresses(cluster *v1.Cluster) error {
return completePlatformClusterAddresses(cluster.Cluster)
}

func completePlatformClusterAddresses(cluster *platformv1.Cluster) error {
for _, m := range cluster.Spec.Machines {
cluster.AddAddress(platformv1.AddressReal, m.IP, 6443)
}
Expand Down Expand Up @@ -1649,3 +1657,218 @@ func (p *Provider) EnsureCheckAnywhereSubscription(ctx context.Context, c *v1.Cl
return nil

}

// update cluster to connect remote cluster apiserver
func (p *Provider) EnsureModifyCluster(ctx context.Context, c *v1.Cluster) error {
var hubAPIServerURL *url.URL
var err error
if urlValue, ok := c.Annotations[platformv1.HubAPIServerAnno]; ok {
hubAPIServerURL, err = url.Parse(urlValue)
if err != nil {
return err
}
} else {
return fmt.Errorf("cluster %s annotation %s dont exist", c.Name, platformv1.HubAPIServerAnno)
}

config, err := rest.InClusterConfig()
if err != nil {
return err
}
hubClient, err := clusternet.GetHubClient(config)
if err != nil {
return err
}
currentManagerCluster, err := clusternet.GetManagedCluster(hubClient, c.Name)
if err != nil {
return err
}
hubAPIServerPort, err := strconv.ParseInt(hubAPIServerURL.Port(), 10, 32)
if err != nil {
return err
}
address := platformv1.ClusterAddress{
Type: platformv1.AddressReal,
Host: hubAPIServerURL.Hostname(),
Port: int32(hubAPIServerPort),
Path: fmt.Sprintf("/apis/proxies.clusternet.io/v1alpha1/sockets/%s/proxy/direct", currentManagerCluster.Spec.ClusterID),
}
c.Status.Addresses = make([]platformv1.ClusterAddress, 0)
c.Status.Addresses = append(c.Status.Addresses, address)
return nil
}

// update cluster credential to connect remote cluster apiserver
func (p *Provider) EnsureModifyClusterCredential(ctx context.Context, c *v1.Cluster) error {
config, err := rest.InClusterConfig()
if err != nil {
return err
}
hubClient, err := clusternet.GetHubClient(config)
if err != nil {
return err
}
currentManagerCluster, err := clusternet.GetManagedCluster(hubClient, c.Name)
if err != nil {
return err
}

inClusterClient, err := kubernetes.NewForConfig(config)
if err != nil {
return err
}
mclsSecret, err := inClusterClient.CoreV1().Secrets(currentManagerCluster.Namespace).Get(ctx, "child-cluster-deployer", metav1.GetOptions{})
if err != nil {
return err
}
// token is decode data
token, ok := mclsSecret.Data["token"]
if !ok {
return fmt.Errorf("mcls %s dont have token data in child-cluster-deployer secret in %s namespace", currentManagerCluster.Name, currentManagerCluster.Namespace)
}

cc := c.ClusterCredential
if cc.Annotations == nil {
cc.Annotations = make(map[string]string)
}
if _, ok := cc.Annotations[platformv1.CredentialTokenAnno]; !ok {
credentialToken := *cc.Token
cc.Annotations[platformv1.CredentialTokenAnno] = base64.StdEncoding.EncodeToString(([]byte)(credentialToken))
}
cc.Token = nil
cc.Impersonate = "clusternet"
cc.Username = "system:anonymous"
cc.ImpersonateUserExtra = platformv1.ImpersonateUserExtra{
"clusternet-token": string(token),
}
c.RegisterRestConfig(c.ClusterCredential.RESTConfig(c.Cluster))
return nil
}

func (p *Provider) EnsureKubeAPIServerRestart(ctx context.Context, c *v1.Cluster) error {
if c.Spec.Machines == nil || len(c.Spec.Machines) == 0 {
return fmt.Errorf("cluster %s dont have machine info", c.Name)
}

for _, machine := range c.Spec.Machines {
machineSSH, err := machine.SSH()
if err != nil {
return err
}

podName := fmt.Sprintf("kube-apiserver-%s", machine.IP)
cmd := fmt.Sprintf("kubectl delete pod %s -n kube-system", podName)
_, err = machineSSH.CombinedOutput(cmd)
if err != nil {
return err
}
}

clientSet, err := c.ClientsetForBootstrap()
if err != nil {
return err
}

return wait.PollImmediate(5*time.Second, 10*time.Minute, func() (bool, error) {
ok, err := apiclient.CheckPodReadyWithLabel(ctx, clientSet, "kube-system", "component=kube-apiserver")
if err != nil {
return false, nil
}
return ok, nil
})
}

func (p *Provider) EnsureRegisterGlobalCluster(ctx context.Context, c *v1.Cluster) error {
var err error
platformClient, err := c.PlatformClientsetForBootstrap()
if err != nil {
return err
}

// ensure api ready
err = wait.PollImmediate(5*time.Second, 10*time.Minute, func() (bool, error) {
_, err = platformClient.Clusters().List(ctx, metav1.ListOptions{})
if err != nil {
return false, nil
}
_, err = platformClient.ClusterCredentials().List(ctx, metav1.ListOptions{})
if err != nil {
return false, nil
}
return true, nil
})
if err != nil {
return err
}

globalCluster := c.DeepCopy()
globalClusterCredential := c.ClusterCredential.DeepCopy()
globalClusterName := "global"
globalClusterCredentialName := fmt.Sprintf("cc-%s", globalClusterName)

globalCluster.Name = globalClusterName
globalCluster.ResourceVersion = ""
globalCluster.UID = ""
globalCluster.Status.Phase = platformv1.ClusterRunning
if globalCluster.Spec.ClusterCredentialRef == nil {
return fmt.Errorf("cluster %s dont have credential reference", globalCluster.Name)
}
globalCluster.Spec.ClusterCredentialRef.Name = globalClusterCredentialName
globalCluster.Status.Addresses = make([]platformv1.ClusterAddress, 0)
if err = completePlatformClusterAddresses(globalCluster); err != nil {
return err
}

globalClusterCredential.Name = globalClusterCredentialName
globalClusterCredential.ResourceVersion = ""
globalClusterCredential.UID = ""
globalClusterCredential.OwnerReferences = nil
globalClusterCredential.ClusterName = globalClusterName
globalClusterCredential.Username = ""
globalClusterCredential.Impersonate = ""
globalClusterCredential.ImpersonateUserExtra = nil
if token, ok := globalClusterCredential.Annotations[platformv1.CredentialTokenAnno]; ok {
tokenBytes, err := base64.StdEncoding.DecodeString(token)
if err != nil {
return err
}
tokenStr := string(tokenBytes)
globalClusterCredential.Token = &tokenStr
delete(globalClusterCredential.Annotations, platformv1.CredentialTokenAnno)
} else {
return fmt.Errorf("cluster %s credential %s dont have token annotation", c.Name, c.ClusterCredential.Name)
}

globalCluster.SetCondition(platformv1.ClusterCondition{
Type: "EnsureGlobalClusterRegistration",
Status: platformv1.ConditionTrue,
Message: "",
Reason: "",
}, false)

_, err = platformClient.ClusterCredentials().Get(ctx, globalClusterCredential.Name, metav1.GetOptions{})
if err == nil {
err := platformClient.ClusterCredentials().Delete(ctx, globalClusterCredential.Name, metav1.DeleteOptions{})
if err != nil {
return err
}
}
_, err = platformClient.ClusterCredentials().Create(ctx, globalClusterCredential, metav1.CreateOptions{})
if err != nil {
return err
}

_, err = platformClient.Clusters().Get(ctx, globalCluster.Name, metav1.GetOptions{})
if err == nil {
err := platformClient.Clusters().Delete(ctx, globalCluster.Name, metav1.DeleteOptions{})
if err != nil {
return err
}
}
_, err = platformClient.Clusters().Create(ctx, globalCluster, metav1.CreateOptions{})
if err != nil {
return err
}

return nil
}

0 comments on commit f153fe0

Please sign in to comment.