Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: calculate the time we need to wait for the first applied but not ready binding to be ready. #888

Merged
merged 4 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 80 additions & 42 deletions pkg/controllers/rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim

// pick the bindings to be updated according to the rollout plan
// staleBoundBindings is a list of "Bound" bindings and are not selected in this round because of the rollout strategy.
toBeUpdatedBindings, staleBoundBindings, needRoll, err := r.pickBindingsToRoll(ctx, allBindings, latestResourceSnapshot, &crp, matchedCRO, matchedRO)
toBeUpdatedBindings, staleBoundBindings, needRoll, waitTime, err := r.pickBindingsToRoll(ctx, allBindings, latestResourceSnapshot, &crp, matchedCRO, matchedRO)
if err != nil {
klog.ErrorS(err, "Failed to pick the bindings to roll", "clusterResourcePlacement", crpName)
return runtime.Result{}, err
Expand All @@ -159,10 +159,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
// Update all the bindings in parallel according to the rollout plan.
// We need to requeue the request regardless if the binding updates succeed or not
// to avoid the case that the rollout process stalling because the time based binding readiness does not trigger any event.
// We wait for 1/5 of the UnavailablePeriodSeconds so we can catch the next ready one early.
// TODO: only wait the time we need to wait for the first applied but not ready binding to be ready
return runtime.Result{RequeueAfter: time.Duration(*crp.Spec.Strategy.RollingUpdate.UnavailablePeriodSeconds) * time.Second / 5},
r.updateBindings(ctx, toBeUpdatedBindings)
// Wait the time we need to wait for the first applied but not ready binding to be ready
return runtime.Result{Requeue: true, RequeueAfter: waitTime}, r.updateBindings(ctx, toBeUpdatedBindings)
}

func (r *Reconciler) checkAndUpdateStaleBindingsStatus(ctx context.Context, bindings []*fleetv1beta1.ClusterResourceBinding) error {
Expand Down Expand Up @@ -299,7 +297,7 @@ func createUpdateInfo(binding *fleetv1beta1.ClusterResourceBinding, crp *fleetv1
// Thus, it also returns a bool indicating whether there are out of sync bindings to be rolled to differentiate those
// two cases.
func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*fleetv1beta1.ClusterResourceBinding, latestResourceSnapshot *fleetv1beta1.ClusterResourceSnapshot, crp *fleetv1beta1.ClusterResourcePlacement,
matchedCROs []*fleetv1alpha1.ClusterResourceOverrideSnapshot, matchedROs []*fleetv1alpha1.ResourceOverrideSnapshot) ([]toBeUpdatedBinding, []toBeUpdatedBinding, bool, error) {
matchedCROs []*fleetv1alpha1.ClusterResourceOverrideSnapshot, matchedROs []*fleetv1alpha1.ResourceOverrideSnapshot) ([]toBeUpdatedBinding, []toBeUpdatedBinding, bool, time.Duration, error) {
// Those are the bindings that are chosen by the scheduler to be applied to selected clusters.
// They include the bindings that are already applied to the clusters and the bindings that are newly selected by the scheduler.
schedulerTargetedBinds := make([]*fleetv1beta1.ClusterResourceBinding, 0)
Expand Down Expand Up @@ -333,25 +331,30 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
readyTimeCutOff := time.Now().Add(-time.Duration(*crp.Spec.Strategy.RollingUpdate.UnavailablePeriodSeconds) * time.Second)

// classify the bindings into different categories
// TODO: calculate the time we need to wait for the first applied but not ready binding to be ready.
// Wait for the first applied but not ready binding to be ready.
// return wait time longer if the rollout is stuck on failed apply/available bindings
minWaitTime := time.Duration(*crp.Spec.Strategy.RollingUpdate.UnavailablePeriodSeconds) * time.Second
allReady := true
britaniar marked this conversation as resolved.
Show resolved Hide resolved
crpKObj := klog.KObj(crp)
for idx := range allBindings {
binding := allBindings[idx]
bindingKObj := klog.KObj(binding)
switch binding.Spec.State {
case fleetv1beta1.BindingStateUnscheduled:
appliedCondition := binding.GetCondition(string(fleetv1beta1.ResourceBindingApplied))
availableCondition := binding.GetCondition(string(fleetv1beta1.ResourceBindingAvailable))
if condition.IsConditionStatusFalse(appliedCondition, binding.Generation) || condition.IsConditionStatusFalse(availableCondition, binding.Generation) {
if hasBindingFailed(binding) {
klog.V(3).InfoS("Found a failed to be ready unscheduled binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
} else {
canBeReadyBindings = append(canBeReadyBindings, binding)
}
_, bindingReady := isBindingReady(binding, readyTimeCutOff)
waitTime, bindingReady := isBindingReady(binding, readyTimeCutOff)
if bindingReady {
klog.V(3).InfoS("Found a ready unscheduled binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
readyBindings = append(readyBindings, binding)
} else {
allReady = false
if waitTime >= 0 && waitTime < minWaitTime {
minWaitTime = waitTime
}
}
if binding.DeletionTimestamp.IsZero() {
// it's not been deleted yet, so it is a removal candidate
Expand All @@ -362,29 +365,30 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
// it is being deleted, it can be removed from the cluster at any time, so it can be unavailable at any time
canBeUnavailableBindings = append(canBeUnavailableBindings, binding)
}

case fleetv1beta1.BindingStateScheduled:
// the scheduler has picked a cluster for this binding
schedulerTargetedBinds = append(schedulerTargetedBinds, binding)
// this binding has not been bound yet, so it is an update candidate
// pickFromResourceMatchedOverridesForTargetCluster always returns the ordered list of the overrides.
cro, ro, err := r.pickFromResourceMatchedOverridesForTargetCluster(ctx, binding, matchedCROs, matchedROs)
if err != nil {
return nil, nil, false, err
return nil, nil, false, minWaitTime, err
}
boundingCandidates = append(boundingCandidates, createUpdateInfo(binding, crp, latestResourceSnapshot, cro, ro))

case fleetv1beta1.BindingStateBound:
bindingFailed := false
schedulerTargetedBinds = append(schedulerTargetedBinds, binding)
if _, bindingReady := isBindingReady(binding, readyTimeCutOff); bindingReady {
if waitTime, bindingReady := isBindingReady(binding, readyTimeCutOff); bindingReady {
klog.V(3).InfoS("Found a ready bound binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
readyBindings = append(readyBindings, binding)
} else {
allReady = false
if waitTime >= 0 && waitTime < minWaitTime {
minWaitTime = waitTime
}
}
// check if the binding is failed or still on going
appliedCondition := binding.GetCondition(string(fleetv1beta1.ResourceBindingApplied))
availableCondition := binding.GetCondition(string(fleetv1beta1.ResourceBindingAvailable))
if condition.IsConditionStatusFalse(appliedCondition, binding.Generation) || condition.IsConditionStatusFalse(availableCondition, binding.Generation) {
if hasBindingFailed(binding) {
klog.V(3).InfoS("Found a failed to be ready bound binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
bindingFailed = true
} else {
Expand All @@ -393,9 +397,8 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
// pickFromResourceMatchedOverridesForTargetCluster always returns the ordered list of the overrides.
cro, ro, err := r.pickFromResourceMatchedOverridesForTargetCluster(ctx, binding, matchedCROs, matchedROs)
if err != nil {
return nil, nil, false, err
return nil, nil, false, 0, err
}

// The binding needs update if it's not pointing to the latest resource resourceBinding or the overrides.
if binding.Spec.ResourceSnapshotName != latestResourceSnapshot.Name || !equality.Semantic.DeepEqual(binding.Spec.ClusterResourceOverrideSnapshots, cro) || !equality.Semantic.DeepEqual(binding.Spec.ResourceOverrideSnapshots, ro) {
updateInfo := createUpdateInfo(binding, crp, latestResourceSnapshot, cro, ro)
Expand All @@ -408,7 +411,11 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
}
}
}
if allReady {
minWaitTime = 0
}

// Calculate target number
targetNumber := r.calculateRealTarget(crp, schedulerTargetedBinds)
klog.V(2).InfoS("Calculated the targetNumber", "clusterResourcePlacement", crpKObj,
"targetNumber", targetNumber, "readyBindingNumber", len(readyBindings), "canBeUnavailableBindingNumber", len(canBeUnavailableBindings),
Expand All @@ -418,20 +425,35 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
// the list of bindings that are to be updated by this rolling phase
toBeUpdatedBindingList := make([]toBeUpdatedBinding, 0)
if len(removeCandidates)+len(updateCandidates)+len(boundingCandidates)+len(applyFailedUpdateCandidates) == 0 {
return toBeUpdatedBindingList, nil, false, nil
return toBeUpdatedBindingList, nil, false, minWaitTime, nil
}

// calculate the max number of bindings that can be unavailable according to user specified maxUnavailable
maxUnavailableNumber, _ := intstr.GetScaledValueFromIntOrPercent(crp.Spec.Strategy.RollingUpdate.MaxUnavailable, targetNumber, true)
minAvailableNumber := targetNumber - maxUnavailableNumber
// This is the lower bound of the number of bindings that can be available during the rolling update
// Since we can't predict the number of bindings that can be unavailable after they are applied, we don't take them into account
lowerBoundAvailableNumber := len(readyBindings) - len(canBeUnavailableBindings)
maxNumberToRemove := lowerBoundAvailableNumber - minAvailableNumber
klog.V(2).InfoS("Calculated the max number of bindings to remove", "clusterResourcePlacement", crpKObj,
"maxUnavailableNumber", maxUnavailableNumber, "minAvailableNumber", minAvailableNumber,
"lowerBoundAvailableBindings", lowerBoundAvailableNumber, "maxNumberOfBindingsToRemove", maxNumberToRemove)
toBeUpdatedBindingList, staleUnselectedBinding := determineBindingsToUpdate(crp, removeCandidates, updateCandidates, boundingCandidates, applyFailedUpdateCandidates, targetNumber,
readyBindings, canBeReadyBindings, canBeUnavailableBindings)

return toBeUpdatedBindingList, staleUnselectedBinding, true, minWaitTime, nil
}

// hasBindingFailed checks if ClusterResourceBinding has failed based on its applied and available conditions.
func hasBindingFailed(binding *fleetv1beta1.ClusterResourceBinding) bool {
appliedCondition := binding.GetCondition(string(fleetv1beta1.ResourceBindingApplied))
availableCondition := binding.GetCondition(string(fleetv1beta1.ResourceBindingAvailable))
if condition.IsConditionStatusFalse(appliedCondition, binding.Generation) || condition.IsConditionStatusFalse(availableCondition, binding.Generation) {
return true
}
return false
}

// determineBindingsToUpdate determines which bindings to update
func determineBindingsToUpdate(
crp *fleetv1beta1.ClusterResourcePlacement,
removeCandidates, updateCandidates, boundingCandidates, applyFailedUpdateCandidates []toBeUpdatedBinding,
targetNumber int,
readyBindings, canBeReadyBindings, canBeUnavailableBindings []*fleetv1beta1.ClusterResourceBinding,
) ([]toBeUpdatedBinding, []toBeUpdatedBinding) {
toBeUpdatedBindingList := make([]toBeUpdatedBinding, 0)
// calculate the max number of bindings that can be unavailable according to user specified maxUnavailable
maxNumberToRemove := calculateMaxToRemove(crp, targetNumber, readyBindings, canBeUnavailableBindings)
// we can still update the bindings that are failed to apply already regardless of the maxNumberToRemove
toBeUpdatedBindingList = append(toBeUpdatedBindingList, applyFailedUpdateCandidates...)

Expand All @@ -452,16 +474,7 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
}

// calculate the max number of bindings that can be added according to user specified MaxSurge
maxSurgeNumber, _ := intstr.GetScaledValueFromIntOrPercent(crp.Spec.Strategy.RollingUpdate.MaxSurge, targetNumber, true)
maxReadyNumber := targetNumber + maxSurgeNumber
// This is the upper bound of the number of bindings that can be ready during the rolling update
// We count anything that still has work object on the hub cluster as can be ready since the member agent may have connection issue with the hub cluster
upperBoundReadyNumber := len(canBeReadyBindings)
maxNumberToAdd := maxReadyNumber - upperBoundReadyNumber

klog.V(2).InfoS("Calculated the max number of bindings to add", "clusterResourcePlacement", crpKObj,
"maxSurgeNumber", maxSurgeNumber, "maxReadyNumber", maxReadyNumber, "upperBoundReadyBindings",
upperBoundReadyNumber, "maxNumberOfBindingsToAdd", maxNumberToAdd)
maxNumberToAdd := calculateMaxToAdd(crp, targetNumber, canBeReadyBindings)

// boundingCandidatesUnselectedIndex stores the last index of the boundingCandidates which are not selected to be updated.
// The rolloutStarted condition of these elements from this index should be updated.
Expand All @@ -478,10 +491,35 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
if boundingCandidatesUnselectedIndex < len(boundingCandidates) {
staleUnselectedBinding = append(staleUnselectedBinding, boundingCandidates[boundingCandidatesUnselectedIndex:]...)
}
return toBeUpdatedBindingList, staleUnselectedBinding
}

return toBeUpdatedBindingList, staleUnselectedBinding, true, nil
func calculateMaxToRemove(crp *fleetv1beta1.ClusterResourcePlacement, targetNumber int, readyBindings, canBeUnavailableBindings []*fleetv1beta1.ClusterResourceBinding) int {
maxUnavailableNumber, _ := intstr.GetScaledValueFromIntOrPercent(crp.Spec.Strategy.RollingUpdate.MaxUnavailable, targetNumber, true)
minAvailableNumber := targetNumber - maxUnavailableNumber
// This is the lower bound of the number of bindings that can be available during the rolling update
// Since we can't predict the number of bindings that can be unavailable after they are applied, we don't take them into account
lowerBoundAvailableNumber := len(readyBindings) - len(canBeUnavailableBindings)
maxNumberToRemove := lowerBoundAvailableNumber - minAvailableNumber
klog.V(2).InfoS("Calculated the max number of bindings to remove", "clusterResourcePlacement", klog.KObj(crp),
"maxUnavailableNumber", maxUnavailableNumber, "minAvailableNumber", minAvailableNumber,
"lowerBoundAvailableBindings", lowerBoundAvailableNumber, "maxNumberOfBindingsToRemove", maxNumberToRemove)
return maxNumberToRemove
}

func calculateMaxToAdd(crp *fleetv1beta1.ClusterResourcePlacement, targetNumber int, canBeReadyBindings []*fleetv1beta1.ClusterResourceBinding) int {
maxSurgeNumber, _ := intstr.GetScaledValueFromIntOrPercent(crp.Spec.Strategy.RollingUpdate.MaxSurge, targetNumber, true)
maxReadyNumber := targetNumber + maxSurgeNumber
// This is the upper bound of the number of bindings that can be ready during the rolling update
// We count anything that still has work object on the hub cluster as can be ready since the member agent may have connection issue with the hub cluster
upperBoundReadyNumber := len(canBeReadyBindings)
maxNumberToAdd := maxReadyNumber - upperBoundReadyNumber

klog.V(2).InfoS("Calculated the max number of bindings to add", "clusterResourcePlacement", klog.KObj(crp),
"maxSurgeNumber", maxSurgeNumber, "maxReadyNumber", maxReadyNumber, "upperBoundReadyBindings",
upperBoundReadyNumber, "maxNumberOfBindingsToAdd", maxNumberToAdd)
return maxNumberToAdd
}
britaniar marked this conversation as resolved.
Show resolved Hide resolved
func (r *Reconciler) calculateRealTarget(crp *fleetv1beta1.ClusterResourcePlacement, schedulerTargetedBinds []*fleetv1beta1.ClusterResourceBinding) int {
crpKObj := klog.KObj(crp)
// calculate the target number of bindings
Expand Down
85 changes: 85 additions & 0 deletions pkg/controllers/rollout/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/utils/ptr"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -522,6 +523,90 @@ var _ = Describe("Test the rollout Controller", func() {
}, timeout, interval).Should(BeTrue(), "rollout controller should roll all the bindings to use the latest resource snapshot")
})

It("Should wait designated time before rolling out ", func() {
// create CRP
var targetCluster int32 = 11
rolloutCRP = clusterResourcePlacementForTest(testCRPName, createPlacementPolicyForTest(fleetv1beta1.PickNPlacementType, targetCluster))
// remove the strategy
rolloutCRP.Spec.Strategy = fleetv1beta1.RolloutStrategy{RollingUpdate: &fleetv1beta1.RollingUpdateConfig{UnavailablePeriodSeconds: ptr.To(60)}}
Expect(k8sClient.Create(ctx, rolloutCRP)).Should(Succeed())
// create master resource snapshot that is latest
masterSnapshot := generateResourceSnapshot(rolloutCRP.Name, 0, true)
Expect(k8sClient.Create(ctx, masterSnapshot)).Should(Succeed())
By(fmt.Sprintf("master resource snapshot %s created", masterSnapshot.Name))
// create scheduled bindings for master snapshot on target clusters
clusters := make([]string, targetCluster)
for i := 0; i < int(targetCluster); i++ {
clusters[i] = "cluster-" + utils.RandStr()
binding := generateClusterResourceBinding(fleetv1beta1.BindingStateScheduled, masterSnapshot.Name, clusters[i])
Expect(k8sClient.Create(ctx, binding)).Should(Succeed())
By(fmt.Sprintf("resource binding %s created", binding.Name))
bindings = append(bindings, binding)
}
// check that all bindings are scheduled
Eventually(func() bool {
for _, binding := range bindings {
err := k8sClient.Get(ctx, types.NamespacedName{Name: binding.GetName()}, binding)
if err != nil {
return false
}
if binding.Spec.State != fleetv1beta1.BindingStateBound || binding.Spec.ResourceSnapshotName != masterSnapshot.Name {
return false
}
}
return true
}, timeout, interval).Should(BeTrue(), "rollout controller should roll all the bindings to Bound state")

// simulate that some of the bindings are available successfully
applySuccessfully := 3
for i := 0; i < applySuccessfully; i++ {
markBindingAvailable(bindings[i], true)
}
// simulate that some of the bindings fail to apply
for i := applySuccessfully; i < int(targetCluster); i++ {
markBindingApplied(bindings[i], false)
}
// mark the master snapshot as not latest
masterSnapshot.SetLabels(map[string]string{
fleetv1beta1.CRPTrackingLabel: testCRPName,
fleetv1beta1.IsLatestSnapshotLabel: "false"},
)
Expect(k8sClient.Update(ctx, masterSnapshot)).Should(Succeed())
// create a new master resource snapshot
newMasterSnapshot := generateResourceSnapshot(rolloutCRP.Name, 1, true)
Expect(k8sClient.Create(ctx, newMasterSnapshot)).Should(Succeed())
Consistently(func() bool {
allMatch := true
for _, binding := range bindings {
err := k8sClient.Get(ctx, types.NamespacedName{Name: binding.GetName()}, binding)
if err != nil {
allMatch = false
}
if binding.Spec.ResourceSnapshotName != newMasterSnapshot.Name {
return true
}
}
return allMatch
}, consistentTimeout, consistentInterval).Should(BeTrue(), "rollout controller should not roll all the bindings to use the latest resource snapshot")

Eventually(func() bool {
allMatch := true
for _, binding := range bindings {
err := k8sClient.Get(ctx, types.NamespacedName{Name: binding.GetName()}, binding)
if err != nil {
allMatch = false
}
if binding.Spec.ResourceSnapshotName == newMasterSnapshot.Name {
// simulate the work generator to make the newly updated bindings to be available
markBindingAvailable(binding, true)
} else {
allMatch = false
}
}
return allMatch
}, 5*time.Minute, interval).Should(BeTrue(), "rollout controller should roll all the bindings to use the latest resource snapshot")
})

// TODO: should update scheduled bindings to the latest snapshot when it is updated to bound state.

// TODO: should count the deleting bindings as can be Unavailable.
Expand Down
Loading
Loading