Skip to content

Commit

Permalink
refactor(platform): refactor api validatiion logic (tkestack#1467)
Browse files Browse the repository at this point in the history
* refactor(platform): refactor api validatiion logic

* refactor(platform): refactor api validatiion logic

* refactor(platform): refactor api validatiion logic

* refactor(platform): refactor api validatiion logic

* refactor(platform): refactor api validatiion logic

Co-authored-by: pavleli <[email protected]>
  • Loading branch information
Pavle Lee and pavleli committed Oct 26, 2021
1 parent e030f1b commit 0f02d70
Show file tree
Hide file tree
Showing 13 changed files with 394 additions and 356 deletions.
128 changes: 12 additions & 116 deletions api/platform/validation/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,14 @@ import (
"os"
"path"
"path/filepath"
"reflect"
"strings"

apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/validation/field"
"tkestack.io/tke/api/platform"
clusterutil "tkestack.io/tke/pkg/platform/provider/baremetal/cluster"
clusterprovider "tkestack.io/tke/pkg/platform/provider/cluster"
"tkestack.io/tke/pkg/platform/types"
utilmath "tkestack.io/tke/pkg/util/math"
"tkestack.io/tke/pkg/util/ssh"
utilvalidation "tkestack.io/tke/pkg/util/validation"
)

Expand All @@ -43,144 +39,44 @@ func ValidateCluster(cluster *types.Cluster) field.ErrorList {
allErrs := apimachineryvalidation.ValidateObjectMeta(&cluster.ObjectMeta, false, apimachineryvalidation.NameIsDNSLabel, field.NewPath("metadata"))

allErrs = append(allErrs, ValidatClusterSpec(&cluster.Spec, field.NewPath("spec"), true)...)
allErrs = append(allErrs, ValidateClusterByProvider(cluster)...)
p, err := clusterprovider.GetProvider(cluster.Spec.Type)
if err != nil {
return append(allErrs, field.NotFound(field.NewPath("spec").Child("type"), cluster.Spec.Type))
}
allErrs = append(allErrs, p.Validate(cluster)...)

return allErrs
}

// ValidateClusterUpdate tests if an update to a cluster is valid.
func ValidateClusterUpdate(cluster *types.Cluster, oldCluster *types.Cluster) field.ErrorList {
fldPath := field.NewPath("spec")

allErrs := apimachineryvalidation.ValidateObjectMetaUpdate(&cluster.ObjectMeta, &oldCluster.ObjectMeta, field.NewPath("metadata"))

allErrs = append(allErrs, apimachineryvalidation.ValidateImmutableField(cluster.Spec.Type, oldCluster.Spec.Type, fldPath.Child("type"))...)
allErrs = append(allErrs, apimachineryvalidation.ValidateImmutableField(cluster.Spec.NetworkDevice, oldCluster.Spec.NetworkDevice, fldPath.Child("networkDevice"))...)
allErrs = append(allErrs, apimachineryvalidation.ValidateImmutableField(cluster.Spec.ClusterCIDR, oldCluster.Spec.ClusterCIDR, fldPath.Child("clusterCIDR"))...)
allErrs = append(allErrs, apimachineryvalidation.ValidateImmutableField(cluster.Spec.DNSDomain, oldCluster.Spec.DNSDomain, fldPath.Child("dnsDomain"))...)
allErrs = append(allErrs, apimachineryvalidation.ValidateImmutableField(cluster.Spec.DockerExtraArgs, oldCluster.Spec.DockerExtraArgs, fldPath.Child("dockerExtraArgs"))...)
allErrs = append(allErrs, apimachineryvalidation.ValidateImmutableField(cluster.Spec.KubeletExtraArgs, oldCluster.Spec.KubeletExtraArgs, fldPath.Child("kubeletExtraArgs"))...)
allErrs = append(allErrs, apimachineryvalidation.ValidateImmutableField(cluster.Spec.APIServerExtraArgs, oldCluster.Spec.APIServerExtraArgs, fldPath.Child("apiServerExtraArgs"))...)
allErrs = append(allErrs, apimachineryvalidation.ValidateImmutableField(cluster.Spec.ControllerManagerExtraArgs, oldCluster.Spec.ControllerManagerExtraArgs, fldPath.Child("controllerManagerExtraArgs"))...)
allErrs = append(allErrs, apimachineryvalidation.ValidateImmutableField(cluster.Spec.SchedulerExtraArgs, oldCluster.Spec.SchedulerExtraArgs, fldPath.Child("schedulerExtraArgs"))...)

allErrs = append(allErrs, ValidatClusterSpec(&cluster.Spec, field.NewPath("spec"), false)...)
allErrs = append(allErrs, ValidateClusterByProvider(cluster)...)
allErrs = append(allErrs, ValidateClusterScale(cluster.Cluster, oldCluster.Cluster, fldPath.Child("machines"))...)
allErrs = append(allErrs, ValidateBootstrapApps(cluster.Cluster, oldCluster.Cluster, fldPath.Child("bootstrapApps"))...)

return allErrs
}

func ValidateBootstrapApps(cluster *platform.Cluster, oldCluster *platform.Cluster, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if !reflect.DeepEqual(cluster.Spec.BootstrapApps, oldCluster.Spec.BootstrapApps) {
allErrs = append(allErrs, field.Invalid(fldPath, "bootstrapApps", "bootstrap apps are not allowed be edited"))
}
return allErrs
}

// ValidateClusterScale tests if master scale up/down to a cluster is valid.
func ValidateClusterScale(cluster *platform.Cluster, oldCluster *platform.Cluster, fldPath *field.Path) field.ErrorList {

allErrs := field.ErrorList{}
if len(cluster.Spec.Machines) == len(oldCluster.Spec.Machines) {
return allErrs
}
ha := cluster.Spec.Features.HA
if ha == nil {
allErrs = append(allErrs, field.Invalid(fldPath, cluster.Spec.Machines, "HA configuration should enabled for master scale"))
return allErrs
}
if ha.TKEHA == nil && ha.ThirdPartyHA == nil {
allErrs = append(allErrs, field.Invalid(fldPath, cluster.Spec.Machines, "tkestack HA or third party HA should enabled for master scale"))
return allErrs
}
_, err := clusterutil.PrepareClusterScale(cluster, oldCluster)
p, err := clusterprovider.GetProvider(cluster.Spec.Type)
if err != nil {
allErrs = append(allErrs, field.Invalid(fldPath, cluster.Spec.Machines, err.Error()))
return append(allErrs, field.NotFound(field.NewPath("spec").Child("type"), cluster.Spec.Type))
}
allErrs = append(allErrs, p.ValidateUpdate(cluster, oldCluster)...)

return allErrs
}

// ValidateCluster validates a given ClusterSpec.
func ValidatClusterSpec(spec *platform.ClusterSpec, fldPath *field.Path, validateMachine bool) field.ErrorList {
allErrs := field.ErrorList{}
allErrs = append(allErrs, ValidateClusteType(spec.Type, fldPath.Child("type"))...)
if validateMachine {
allErrs = append(allErrs, ValidateClusterMachines(spec.Machines, fldPath.Child("machines"))...)
}
allErrs = append(allErrs, ValidateClusterType(spec.Type, fldPath.Child("type"))...)
allErrs = append(allErrs, ValidateClusterFeature(&spec.Features, fldPath.Child("features"))...)

return allErrs
}

// ValidateClusteType validates a given type.
func ValidateClusteType(clusterType string, fldPath *field.Path) field.ErrorList {
// ValidateClusterType validates a given type.
func ValidateClusterType(clusterType string, fldPath *field.Path) field.ErrorList {
return utilvalidation.ValidateEnum(clusterType, fldPath, clusterprovider.Providers())
}

// ValidateClusterByProvider validates a given cluster by cluster provider.
func ValidateClusterByProvider(cluster *types.Cluster) field.ErrorList {
p, err := clusterprovider.GetProvider(cluster.Spec.Type)
if err != nil {
return nil
}

return p.Validate(cluster)
}

// ValidateClusterMachines validates a given CluterMachines.
func ValidateClusterMachines(machines []platform.ClusterMachine, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}

if machines == nil {
return allErrs
}

var masters []*ssh.SSH
for i, one := range machines {
sshErrors := ValidateSSH(fldPath.Index(i), one.IP, int(one.Port), one.Username, one.Password, one.PrivateKey, one.PassPhrase)
if sshErrors != nil {
allErrs = append(allErrs, sshErrors...)
} else {
master, _ := one.SSH()
masters = append(masters, master)
}
}

if len(masters) == len(machines) {
allErrs = append(allErrs, ValidateMasterTimeOffset(fldPath, masters)...)

}

return allErrs
}

// ValidateMasterTimeOffset validates a given master time offset.
func ValidateMasterTimeOffset(fldPath *field.Path, masters []*ssh.SSH) field.ErrorList {
allErrs := field.ErrorList{}

times := make([]float64, 0, len(masters))
for _, one := range masters {
t, err := ssh.Timestamp(one)
if err != nil {
allErrs = append(allErrs, field.InternalError(fldPath, err))
return allErrs
}
times = append(times, float64(t))
}
maxIndex, maxTime := utilmath.Max(times)
minIndex, minTime := utilmath.Min(times)
offset := int(*maxTime) - int(*minTime)
if offset > MaxTimeOffset {
allErrs = append(allErrs, field.Invalid(fldPath, "",
fmt.Sprintf("the time offset(%v-%v=%v) between node(%v) with node(%v) exceeds %d seconds, please unify machine time between nodes by using ntp or manual", int(*maxTime), int(*minTime), offset, masters[*maxIndex].Host, masters[*minIndex].Host, MaxTimeOffset)))
}

return allErrs
}

// ValidateClusterFeature validates a given ClusterFeature.
func ValidateClusterFeature(feature *platform.ClusterFeature, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
Expand Down
174 changes: 15 additions & 159 deletions api/platform/validation/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,46 +20,40 @@ package validation

import (
"context"
"fmt"
"math"
"net"
"strings"
"time"

"k8s.io/apimachinery/pkg/api/errors"
apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/validation/field"
utilsnet "k8s.io/utils/net"
platforminternalclient "tkestack.io/tke/api/client/clientset/internalversion/typed/platform/internalversion"
"tkestack.io/tke/api/platform"
machineprovider "tkestack.io/tke/pkg/platform/provider/machine"
utilmath "tkestack.io/tke/pkg/util/math"
"tkestack.io/tke/pkg/util/ssh"
utilvalidation "tkestack.io/tke/pkg/util/validation"
)

const MaxTimeOffset = 5 * 300

// ValidateMachine validates a given machine.
func ValidateMachine(ctx context.Context, machine *platform.Machine, platformClient platforminternalclient.PlatformInterface) field.ErrorList {
allErrs := apimachineryvalidation.ValidateObjectMeta(&machine.ObjectMeta, false, apimachineryvalidation.NameIsDNSLabel, field.NewPath("metadata"))
allErrs = append(allErrs, ValidateMachineSpec(ctx, &machine.Spec, field.NewPath("spec"), platformClient)...)
allErrs = append(allErrs, ValidateMachineByProvider(machine)...)
p, err := machineprovider.GetProvider(machine.Spec.Type)
if err != nil {
return append(allErrs, field.NotFound(field.NewPath("spec").Child("type"), machine.Spec.Type))
}
allErrs = append(allErrs, p.Validate(machine)...)

return allErrs
}

// ValidateMachineUpdate tests if an update to a machine is valid.
func ValidateMachineUpdate(ctx context.Context, machine *platform.Machine, oldMachine *platform.Machine) field.ErrorList {
allErrs := apimachineryvalidation.ValidateObjectMetaUpdate(&machine.ObjectMeta, &oldMachine.ObjectMeta, field.NewPath("metadata"))
func ValidateMachineUpdate(ctx context.Context, machine *platform.Machine, oldMachine *platform.Machine, platformClient platforminternalclient.PlatformInterface) field.ErrorList {
fldPath := field.NewPath("spec")
allErrs := apimachineryvalidation.ValidateObjectMetaUpdate(&machine.ObjectMeta, &oldMachine.ObjectMeta, field.NewPath("metadata"))
allErrs = append(allErrs, apimachineryvalidation.ValidateImmutableField(machine.Spec.Type, oldMachine.Spec.Type, fldPath.Child("type"))...)
allErrs = append(allErrs, apimachineryvalidation.ValidateImmutableField(machine.Spec.ClusterName, oldMachine.Spec.ClusterName, fldPath.Child("clusterName"))...)
allErrs = append(allErrs, apimachineryvalidation.ValidateImmutableField(machine.Spec.IP, oldMachine.Spec.IP, fldPath.Child("ip"))...)
allErrs = append(allErrs, apimachineryvalidation.ValidateImmutableField(machine.Spec.Labels, oldMachine.Spec.Labels, fldPath.Child("labels"))...)
allErrs = append(allErrs, apimachineryvalidation.ValidateImmutableField(machine.Spec.Taints, oldMachine.Spec.Taints, fldPath.Child("taints"))...)
allErrs = append(allErrs, ValidateMachineSpec(ctx, &machine.Spec, field.NewPath("spec"), platformClient)...)
p, err := machineprovider.GetProvider(machine.Spec.Type)
if err != nil {
return append(allErrs, field.NotFound(field.NewPath("spec").Child("type"), machine.Spec.Type))
}
allErrs = append(allErrs, p.ValidateUpdate(machine, oldMachine)...)

return allErrs
}
Expand All @@ -69,23 +63,7 @@ func ValidateMachineSpec(ctx context.Context, spec *platform.MachineSpec, fldPat
allErrs := field.ErrorList{}

allErrs = append(allErrs, ValidateMachineSpecType(spec.Type, fldPath.Child("type"))...)
cluster := new(platform.Cluster)
allErrs = append(allErrs, ValidateClusterName(ctx, spec.ClusterName, fldPath.Child("clusterName"), cluster, platformClient)...)
if cluster.Name != "" {
allErrs = append(allErrs, ValidateMachineWithCluster(ctx, spec.IP, fldPath.Child("ip"), cluster, platformClient)...)
}
sshErrors := ValidateSSH(fldPath, spec.IP, int(spec.Port), spec.Username, spec.Password, spec.PrivateKey, spec.PassPhrase)
if sshErrors != nil {
allErrs = append(allErrs, sshErrors...)
} else {
var masters []*ssh.SSH
worker, _ := spec.SSH()
for _, one := range cluster.Spec.Machines {
master, _ := one.SSH()
masters = append(masters, master)
}
allErrs = append(allErrs, ValidateWorkerTimeOffset(fldPath, worker, masters)...)
}
allErrs = append(allErrs, ValidateClusterName(ctx, spec.ClusterName, fldPath.Child("clusterName"))...)

return allErrs
}
Expand All @@ -105,134 +83,12 @@ func ValidateMachineSpecType(machineType string, fldPath *field.Path) field.Erro
return utilvalidation.ValidateEnum(machineType, fldPath, machineprovider.Providers())
}

// ValidateWorkerTimeOffset validates a given worker time offset with masters.
func ValidateWorkerTimeOffset(fldPath *field.Path, worker *ssh.SSH, masters []*ssh.SSH) field.ErrorList {
allErrs := field.ErrorList{}

workerTimestamp, err := ssh.Timestamp(worker)
if err != nil {
allErrs = append(allErrs, field.InternalError(fldPath, err))
return allErrs
}

times := make([]float64, 0, len(masters))
for _, one := range masters {
t, err := ssh.Timestamp(one)
if err != nil {
allErrs = append(allErrs, field.InternalError(fldPath, err))
return allErrs
}
times = append(times, float64(t))
}
minIndex, minTime := utilmath.Min(times)
offset := workerTimestamp - int(*minTime)
if offset > MaxTimeOffset {
allErrs = append(allErrs, field.Invalid(fldPath, worker.Host,
fmt.Sprintf("the time offset(%v-%v=%v) between node(%v) with node(%v) exceeds %d seconds, please unify machine time between nodes by using ntp or manual", workerTimestamp, int(*minTime), offset, worker.Host, masters[*minIndex].Host, MaxTimeOffset)))
}

return allErrs
}

// ValidateSSH validates a given ssh config.
func ValidateSSH(fldPath *field.Path, ip string, port int, user string, password []byte, privateKey []byte, passPhrase []byte) field.ErrorList {
allErrs := field.ErrorList{}

for _, msg := range validation.IsValidIP(ip) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("ip"), ip, msg))

}
for _, msg := range validation.IsValidPortNum(port) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("port"), port, msg))
}
if password == nil && privateKey == nil {
allErrs = append(allErrs, field.Required(fldPath, "must specify password or privateKey"))
}

if len(allErrs) != 0 {
return allErrs
}

sshConfig := &ssh.Config{
User: user,
Host: ip,
Port: port,
Password: string(password),
PrivateKey: privateKey,
PassPhrase: passPhrase,
DialTimeOut: time.Second,
Retry: 0,
}
s, err := ssh.New(sshConfig)
if err != nil {
allErrs = append(allErrs, field.Invalid(fldPath, "", err.Error()))
} else {
output, err := s.CombinedOutput("whoami")
if err != nil {
allErrs = append(allErrs, field.Invalid(fldPath, "", err.Error()))
}
if strings.TrimSpace(string(output)) != "root" {
allErrs = append(allErrs, field.Invalid(fldPath.Child("user"), user, `must be root or set sudo without password`))
}
}

return allErrs
}

// ValidateMachineWithCluster validates a given machine by ip with cluster.
func ValidateMachineWithCluster(ctx context.Context, ip string, fldPath *field.Path, cluster *platform.Cluster, platformClient platforminternalclient.PlatformInterface) field.ErrorList {
allErrs := field.ErrorList{}
for _, machine := range cluster.Spec.Machines {
if machine.IP == ip {
allErrs = append(allErrs, field.Duplicate(fldPath, ip))
}
}
cidrs := strings.Split(cluster.Spec.ClusterCIDR, ",")
for _, cidr := range cidrs {
if utilsnet.IsIPv6CIDRString(cidr) {
return allErrs
}
}

_, cidr, _ := net.ParseCIDR(cluster.Spec.ClusterCIDR)
ones, _ := cidr.Mask.Size()
maxNode := math.Exp2(float64(cluster.Status.NodeCIDRMaskSize - int32(ones)))

fieldSelector := fmt.Sprintf("spec.clusterName=%s", cluster.Name)
machineList, err := platformClient.Machines().List(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
if err != nil {
allErrs = append(allErrs, field.InternalError(fldPath, err))
} else {
machineSize := len(machineList.Items)
if machineSize >= int(maxNode) {
allErrs = append(allErrs, field.Forbidden(fldPath, fmt.Sprintf("the cluster's machine upper limit(%d) has been reached", int(maxNode))))
}
}
for _, machine := range machineList.Items {
if machine.Spec.IP == ip {
allErrs = append(allErrs, field.Duplicate(fldPath, ip))
}
}

return allErrs
}

// ValidateClusterName validates a given clusterName and return cluster if exists.
func ValidateClusterName(ctx context.Context, clusterName string, fldPath *field.Path, cluster *platform.Cluster, platformClient platforminternalclient.PlatformInterface) field.ErrorList {
func ValidateClusterName(ctx context.Context, clusterName string, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}

if clusterName == "" {
allErrs = append(allErrs, field.Required(fldPath, "must specify cluster name"))
} else {
c, err := platformClient.Clusters().Get(ctx, clusterName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
allErrs = append(allErrs, field.NotFound(fldPath, clusterName))
} else {
allErrs = append(allErrs, field.InternalError(fldPath, err))
}
}
*cluster = *c
}

return allErrs
Expand Down
Loading

0 comments on commit 0f02d70

Please sign in to comment.