Skip to content

Commit

Permalink
refactor/reorganize
Browse files Browse the repository at this point in the history
  • Loading branch information
britaniar committed Jul 30, 2024
1 parent 8821203 commit 176ae52
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 96 deletions.
229 changes: 150 additions & 79 deletions pkg/controllers/rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,97 +334,31 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
// Wait for the first applied but not ready binding to be ready.
// return wait time longer if the rollout is stuck on failed apply/available bindings
allReady := true
minWaitTime := time.Duration(*crp.Spec.Strategy.RollingUpdate.UnavailablePeriodSeconds) * time.Second
minWaitTime := time.Duration(*crp.Spec.Strategy.RollingUpdate.UnavailablePeriodSeconds) * time.Second / 5
crpKObj := klog.KObj(crp)
for idx := range allBindings {
binding := allBindings[idx]
for _, binding := range allBindings {
bindingKObj := klog.KObj(binding)
switch binding.Spec.State {
case fleetv1beta1.BindingStateUnscheduled:
appliedCondition := binding.GetCondition(string(fleetv1beta1.ResourceBindingApplied))
availableCondition := binding.GetCondition(string(fleetv1beta1.ResourceBindingAvailable))
if condition.IsConditionStatusFalse(appliedCondition, binding.Generation) || condition.IsConditionStatusFalse(availableCondition, binding.Generation) {
allReady = false
klog.V(3).InfoS("Found a failed to be ready unscheduled binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
} else {
canBeReadyBindings = append(canBeReadyBindings, binding)
}
waitTime, bindingReady := isBindingReady(binding, readyTimeCutOff)
if bindingReady {
klog.V(3).InfoS("Found a ready unscheduled binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
readyBindings = append(readyBindings, binding)
} else if !bindingReady {
allReady = false
if waitTime < minWaitTime && waitTime > 0 {
minWaitTime = waitTime
}
}
if binding.DeletionTimestamp.IsZero() {
// it's not been deleted yet, so it is a removal candidate
klog.V(3).InfoS("Found a not yet deleted unscheduled binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
// The desired binding is nil for the removeCandidates.
removeCandidates = append(removeCandidates, toBeUpdatedBinding{currentBinding: binding})
} else if bindingReady {
// it is being deleted, it can be removed from the cluster at any time, so it can be unavailable at any time
canBeUnavailableBindings = append(canBeUnavailableBindings, binding)
}

r.processUnscheduledBinding(crpKObj, bindingKObj, &minWaitTime, readyTimeCutOff, binding, &readyBindings, &canBeReadyBindings, &canBeUnavailableBindings, &removeCandidates, &allReady)
case fleetv1beta1.BindingStateScheduled:
// the scheduler has picked a cluster for this binding
schedulerTargetedBinds = append(schedulerTargetedBinds, binding)
allReady = false
// this binding has not been bound yet, so it is an update candidate
// pickFromResourceMatchedOverridesForTargetCluster always returns the ordered list of the overrides.
cro, ro, err := r.pickFromResourceMatchedOverridesForTargetCluster(ctx, binding, matchedCROs, matchedROs)
err := r.processScheduledBinding(ctx, binding, crp, latestResourceSnapshot, matchedCROs, matchedROs, &schedulerTargetedBinds, &boundingCandidates, &allReady)
if err != nil {
return nil, nil, false, minWaitTime, err
}
boundingCandidates = append(boundingCandidates, createUpdateInfo(binding, crp, latestResourceSnapshot, cro, ro))

case fleetv1beta1.BindingStateBound:
bindingFailed := false
schedulerTargetedBinds = append(schedulerTargetedBinds, binding)
if waitTime, bindingReady := isBindingReady(binding, readyTimeCutOff); bindingReady {
klog.V(3).InfoS("Found a ready bound binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
readyBindings = append(readyBindings, binding)
} else {
allReady = false
if waitTime < minWaitTime && waitTime > 0 {
minWaitTime = waitTime
}
}
// check if the binding is failed or still on going
appliedCondition := binding.GetCondition(string(fleetv1beta1.ResourceBindingApplied))
availableCondition := binding.GetCondition(string(fleetv1beta1.ResourceBindingAvailable))
if condition.IsConditionStatusFalse(appliedCondition, binding.Generation) || condition.IsConditionStatusFalse(availableCondition, binding.Generation) {
klog.V(3).InfoS("Found a failed to be ready bound binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
bindingFailed = true
} else {
canBeReadyBindings = append(canBeReadyBindings, binding)
}

// pickFromResourceMatchedOverridesForTargetCluster always returns the ordered list of the overrides.
cro, ro, err := r.pickFromResourceMatchedOverridesForTargetCluster(ctx, binding, matchedCROs, matchedROs)
err := r.processBoundBinding(ctx, crpKObj, bindingKObj, binding, crp, latestResourceSnapshot, matchedCROs, matchedROs, &schedulerTargetedBinds, &readyBindings, &canBeReadyBindings, &updateCandidates, &applyFailedUpdateCandidates, &minWaitTime, readyTimeCutOff, &allReady)
if err != nil {
return nil, nil, false, minWaitTime, err
}

// The binding needs update if it's not pointing to the latest resource resourceBinding or the overrides.
if binding.Spec.ResourceSnapshotName != latestResourceSnapshot.Name || !equality.Semantic.DeepEqual(binding.Spec.ClusterResourceOverrideSnapshots, cro) || !equality.Semantic.DeepEqual(binding.Spec.ResourceOverrideSnapshots, ro) {
updateInfo := createUpdateInfo(binding, crp, latestResourceSnapshot, cro, ro)
if bindingFailed {
// the binding has been applied but failed to apply, we can safely update it to latest resources without affecting max unavailable count
applyFailedUpdateCandidates = append(applyFailedUpdateCandidates, updateInfo)
} else {
updateCandidates = append(updateCandidates, updateInfo)
}
}
}
}
if allReady {
minWaitTime = 0
}

// Calculate target number
targetNumber := r.calculateRealTarget(crp, schedulerTargetedBinds)
klog.V(2).InfoS("Calculated the targetNumber", "clusterResourcePlacement", crpKObj,
"targetNumber", targetNumber, "readyBindingNumber", len(readyBindings), "canBeUnavailableBindingNumber", len(canBeUnavailableBindings),
Expand All @@ -437,9 +371,143 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
return toBeUpdatedBindingList, nil, false, minWaitTime, nil
}

// calculate the max number of bindings that can be unavailable according to user specified maxUnavailable
maxNumberToRemove := calculateMaxToRemove(*crp, targetNumber, readyBindings, canBeUnavailableBindings, crpKObj)
toBeUpdatedBindingList, staleUnselectedBinding := determineBindingsToUpdate(crp, removeCandidates, updateCandidates, boundingCandidates, applyFailedUpdateCandidates, targetNumber,
readyBindings, canBeReadyBindings, canBeUnavailableBindings, crpKObj)

return toBeUpdatedBindingList, staleUnselectedBinding, true, minWaitTime, nil
}

// Process unscheduled bindings
func (r *Reconciler) processUnscheduledBinding(
crpKObj, bindingKObj klog.ObjectRef,
minWaitTime *time.Duration,
readyTimeCutOff time.Time,
binding *fleetv1beta1.ClusterResourceBinding,
readyBindings, canBeReadyBindings, canBeUnavailableBindings *[]*fleetv1beta1.ClusterResourceBinding,
removeCandidates *[]toBeUpdatedBinding,
allReady *bool,
) {
appliedCondition := binding.GetCondition(string(fleetv1beta1.ResourceBindingApplied))
availableCondition := binding.GetCondition(string(fleetv1beta1.ResourceBindingAvailable))
if condition.IsConditionStatusFalse(appliedCondition, binding.Generation) || condition.IsConditionStatusFalse(availableCondition, binding.Generation) {
*allReady = false
klog.V(3).InfoS("Found a failed to be ready unscheduled binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
} else {
*canBeReadyBindings = append(*canBeReadyBindings, binding)
}
waitTime, bindingReady := isBindingReady(binding, readyTimeCutOff)
if bindingReady {
klog.V(3).InfoS("Found a ready unscheduled binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
*readyBindings = append(*readyBindings, binding)
} else {
*allReady = false
if waitTime < *minWaitTime && waitTime > 0 {
*minWaitTime = waitTime
}
}
if binding.DeletionTimestamp.IsZero() {
// it's not been deleted yet, so it is a removal candidate
klog.V(3).InfoS("Found a not yet deleted unscheduled binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
// The desired binding is nil for the removeCandidates.
*removeCandidates = append(*removeCandidates, toBeUpdatedBinding{currentBinding: binding})
} else if bindingReady {
// it is being deleted, it can be removed from the cluster at any time, so it can be unavailable at any time
*canBeUnavailableBindings = append(*canBeUnavailableBindings, binding)
}
}

// Process scheduled bindings
func (r *Reconciler) processScheduledBinding(
ctx context.Context,
binding *fleetv1beta1.ClusterResourceBinding,
crp *fleetv1beta1.ClusterResourcePlacement,
latestResourceSnapshot *fleetv1beta1.ClusterResourceSnapshot,
matchedCROs []*fleetv1alpha1.ClusterResourceOverrideSnapshot,
matchedROs []*fleetv1alpha1.ResourceOverrideSnapshot,
schedulerTargetedBinds *[]*fleetv1beta1.ClusterResourceBinding,
boundingCandidates *[]toBeUpdatedBinding,
allReady *bool,
) error {
*schedulerTargetedBinds = append(*schedulerTargetedBinds, binding)
*allReady = false
// this binding has not been bound yet, so it is an update candidate
// pickFromResourceMatchedOverridesForTargetCluster always returns the ordered list of the overrides.
cro, ro, err := r.pickFromResourceMatchedOverridesForTargetCluster(ctx, binding, matchedCROs, matchedROs)
if err != nil {
return err
}
*boundingCandidates = append(*boundingCandidates, createUpdateInfo(binding, crp, latestResourceSnapshot, cro, ro))
return nil
}

// Process bound bindings
func (r *Reconciler) processBoundBinding(
ctx context.Context,
crpKObj, bindingKObj klog.ObjectRef,
binding *fleetv1beta1.ClusterResourceBinding,
crp *fleetv1beta1.ClusterResourcePlacement,
latestResourceSnapshot *fleetv1beta1.ClusterResourceSnapshot,
matchedCROs []*fleetv1alpha1.ClusterResourceOverrideSnapshot,
matchedROs []*fleetv1alpha1.ResourceOverrideSnapshot,
schedulerTargetedBinds, readyBindings, canBeReadyBindings *[]*fleetv1beta1.ClusterResourceBinding,
updateCandidates, applyFailedUpdateCandidates *[]toBeUpdatedBinding,
minWaitTime *time.Duration,
readyTimeCutOff time.Time,
allReady *bool,
) error {
bindingFailed := false
*schedulerTargetedBinds = append(*schedulerTargetedBinds, binding)
waitTime, bindingReady := isBindingReady(binding, readyTimeCutOff)
if bindingReady {
klog.V(3).InfoS("Found a ready bound binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
*readyBindings = append(*readyBindings, binding)
} else {
*allReady = false
if waitTime < *minWaitTime && waitTime > 0 {
*minWaitTime = waitTime
}
}
// check if the binding is failed or still on going
appliedCondition := binding.GetCondition(string(fleetv1beta1.ResourceBindingApplied))
availableCondition := binding.GetCondition(string(fleetv1beta1.ResourceBindingAvailable))
if condition.IsConditionStatusFalse(appliedCondition, binding.Generation) || condition.IsConditionStatusFalse(availableCondition, binding.Generation) {
klog.V(3).InfoS("Found a failed to be ready bound binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
bindingFailed = true
} else {
*canBeReadyBindings = append(*canBeReadyBindings, binding)
}
// pickFromResourceMatchedOverridesForTargetCluster always returns the ordered list of the overrides.
cro, ro, err := r.pickFromResourceMatchedOverridesForTargetCluster(ctx, binding, matchedCROs, matchedROs)
if err != nil {
return err
}
// The binding needs update if it's not pointing to the latest resource resourceBinding or the overrides.
if binding.Spec.ResourceSnapshotName != latestResourceSnapshot.Name || !equality.Semantic.DeepEqual(binding.Spec.ClusterResourceOverrideSnapshots, cro) || !equality.Semantic.DeepEqual(binding.Spec.ResourceOverrideSnapshots, ro) {
updateInfo := createUpdateInfo(binding, crp, latestResourceSnapshot, cro, ro)
if bindingFailed {
// the binding has been applied but failed to apply, we can safely update it to latest resources without affecting max unavailable count
*applyFailedUpdateCandidates = append(*applyFailedUpdateCandidates, updateInfo)
} else {
*updateCandidates = append(*updateCandidates, updateInfo)
}
}
return nil
}

// Determine bindings to update
func determineBindingsToUpdate(
crp *fleetv1beta1.ClusterResourcePlacement,
removeCandidates, updateCandidates, boundingCandidates, applyFailedUpdateCandidates []toBeUpdatedBinding,
targetNumber int,
readyBindings, canBeReadyBindings, canBeUnavailableBindings []*fleetv1beta1.ClusterResourceBinding,
crpKObj klog.ObjectRef,
) (
toBeUpdatedBindingList []toBeUpdatedBinding,
staleUnselectedBinding []toBeUpdatedBinding,
) {
toBeUpdatedBindingList = []toBeUpdatedBinding{}
// calculate the max number of bindings that can be unavailable according to user specified maxUnavailable
maxNumberToRemove := calculateMaxToRemove(crp, targetNumber, readyBindings, canBeUnavailableBindings, crpKObj)
// we can still update the bindings that are failed to apply already regardless of the maxNumberToRemove
toBeUpdatedBindingList = append(toBeUpdatedBindingList, applyFailedUpdateCandidates...)

Expand All @@ -460,7 +528,8 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
}

// calculate the max number of bindings that can be added according to user specified MaxSurge
maxNumberToAdd := calculateMaxToAdd(*crp, targetNumber, canBeReadyBindings, crpKObj)
maxNumberToAdd := calculateMaxToAdd(crp, targetNumber, canBeReadyBindings, crpKObj)

// boundingCandidatesUnselectedIndex stores the last index of the boundingCandidates which are not selected to be updated.
// The rolloutStarted condition of these elements from this index should be updated.
boundingCandidatesUnselectedIndex := 0
Expand All @@ -469,16 +538,17 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
toBeUpdatedBindingList = append(toBeUpdatedBindingList, boundingCandidates[boundingCandidatesUnselectedIndex])
}

staleUnselectedBinding := make([]toBeUpdatedBinding, 0)
staleUnselectedBinding = []toBeUpdatedBinding{}
if updateCandidateUnselectedIndex < len(updateCandidates) {
staleUnselectedBinding = append(staleUnselectedBinding, updateCandidates[updateCandidateUnselectedIndex:]...)
}
if boundingCandidatesUnselectedIndex < len(boundingCandidates) {
staleUnselectedBinding = append(staleUnselectedBinding, boundingCandidates[boundingCandidatesUnselectedIndex:]...)
}
return toBeUpdatedBindingList, staleUnselectedBinding, true, minWaitTime, nil
return toBeUpdatedBindingList, staleUnselectedBinding
}
func calculateMaxToRemove(crp fleetv1beta1.ClusterResourcePlacement, targetNumber int, readyBindings, canBeUnavailableBindings []*fleetv1beta1.ClusterResourceBinding, crpKObj klog.ObjectRef) int {

func calculateMaxToRemove(crp *fleetv1beta1.ClusterResourcePlacement, targetNumber int, readyBindings, canBeUnavailableBindings []*fleetv1beta1.ClusterResourceBinding, crpKObj klog.ObjectRef) int {
maxUnavailableNumber, _ := intstr.GetScaledValueFromIntOrPercent(crp.Spec.Strategy.RollingUpdate.MaxUnavailable, targetNumber, true)
minAvailableNumber := targetNumber - maxUnavailableNumber
// This is the lower bound of the number of bindings that can be available during the rolling update
Expand All @@ -490,7 +560,8 @@ func calculateMaxToRemove(crp fleetv1beta1.ClusterResourcePlacement, targetNumbe
"lowerBoundAvailableBindings", lowerBoundAvailableNumber, "maxNumberOfBindingsToRemove", maxNumberToRemove)
return maxNumberToRemove
}
func calculateMaxToAdd(crp fleetv1beta1.ClusterResourcePlacement, targetNumber int, canBeReadyBindings []*fleetv1beta1.ClusterResourceBinding, crpKObj klog.ObjectRef) int {

func calculateMaxToAdd(crp *fleetv1beta1.ClusterResourcePlacement, targetNumber int, canBeReadyBindings []*fleetv1beta1.ClusterResourceBinding, crpKObj klog.ObjectRef) int {
maxSurgeNumber, _ := intstr.GetScaledValueFromIntOrPercent(crp.Spec.Strategy.RollingUpdate.MaxSurge, targetNumber, true)
maxReadyNumber := targetNumber + maxSurgeNumber
// This is the upper bound of the number of bindings that can be ready during the rolling update
Expand Down
Loading

0 comments on commit 176ae52

Please sign in to comment.