Skip to content

Commit

Permalink
Don't reload when use-cluster-ip endpoints update, and change the i…
Browse files Browse the repository at this point in the history
…ngress `use-cluster-ip` implementation to use the cluster ip instead of the fqdn (#5318)

* change use-cluster-ip implementation to use the cluster ip instead of fqdn, don't relaod when use-cluster-ip endpoints update

* Update controller.go

* Fix vs use cluster ip test

* move assert function to custom_assertions.py
  • Loading branch information
j1m-ryan authored Apr 5, 2024
1 parent 2a2c782 commit 0095bc3
Show file tree
Hide file tree
Showing 10 changed files with 366 additions and 41 deletions.
27 changes: 7 additions & 20 deletions internal/configs/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,34 +534,21 @@ func createUpstream(ingEx *IngressEx, name string, backend *networking.IngressBa
endps = []string{}
}

if cfg.UseClusterIP {
fqdn := fmt.Sprintf("%s.%s.svc.cluster.local:%d", backend.Service.Name, ingEx.Ingress.Namespace, backend.Service.Port.Number)
for _, endp := range endps {
upsServers = append(upsServers, version1.UpstreamServer{
Address: fqdn,
Address: endp,
MaxFails: cfg.MaxFails,
MaxConns: cfg.MaxConns,
FailTimeout: cfg.FailTimeout,
SlowStart: cfg.SlowStart,
Resolve: isExternalNameSvc,
})
}
if len(upsServers) > 0 {
sort.Slice(upsServers, func(i, j int) bool {
return upsServers[i].Address < upsServers[j].Address
})
ups.UpstreamServers = upsServers
} else {
for _, endp := range endps {
upsServers = append(upsServers, version1.UpstreamServer{
Address: endp,
MaxFails: cfg.MaxFails,
MaxConns: cfg.MaxConns,
FailTimeout: cfg.FailTimeout,
SlowStart: cfg.SlowStart,
Resolve: isExternalNameSvc,
})
}
if len(upsServers) > 0 {
sort.Slice(upsServers, func(i, j int) bool {
return upsServers[i].Address < upsServers[j].Address
})
ups.UpstreamServers = upsServers
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions internal/configs/ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ func createExpectedConfigForMergeableCafeIngressWithUseClusterIP() version1.Ingr
UpstreamZoneSize: upstreamZoneSize,
UpstreamServers: []version1.UpstreamServer{
{
Address: "coffee-svc.default.svc.cluster.local:80",
Address: "10.0.0.1:80",
MaxFails: 1,
MaxConns: 0,
FailTimeout: "10s",
Expand Down Expand Up @@ -803,7 +803,7 @@ func createExpectedConfigForCafeIngressWithUseClusterIP() version1.IngressNginxC
UpstreamZoneSize: upstreamZoneSize,
UpstreamServers: []version1.UpstreamServer{
{
Address: "coffee-svc.default.svc.cluster.local:80",
Address: "10.0.0.1:80",
MaxFails: 1,
MaxConns: 0,
FailTimeout: "10s",
Expand All @@ -817,7 +817,7 @@ func createExpectedConfigForCafeIngressWithUseClusterIP() version1.IngressNginxC
UpstreamZoneSize: upstreamZoneSize,
UpstreamServers: []version1.UpstreamServer{
{
Address: "tea-svc.default.svc.cluster.local:80",
Address: "10.0.0.2:80",
MaxFails: 1,
MaxConns: 0,
FailTimeout: "10s",
Expand Down
119 changes: 101 additions & 18 deletions internal/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,35 +842,51 @@ func (lbc *LoadBalancerController) syncEndpointSlices(task task) bool {
}

endpointSlice := obj.(*discovery_v1.EndpointSlice)
svcResource := lbc.configuration.FindResourcesForService(endpointSlice.Namespace, endpointSlice.Labels["kubernetes.io/service-name"])
svcName := endpointSlice.Labels["kubernetes.io/service-name"]
svcResource := lbc.configuration.FindResourcesForService(endpointSlice.Namespace, svcName)

resourceExes := lbc.createExtendedResources(svcResource)

if len(resourceExes.IngressExes) > 0 {
resourcesFound = true
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.IngressExes)
err = lbc.configurator.UpdateEndpoints(resourceExes.IngressExes)
if err != nil {
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.IngressExes, err)
for _, ingEx := range resourceExes.IngressExes {
if lbc.ingressRequiresEndpointsUpdate(ingEx, svcName) {
resourcesFound = true
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.IngressExes)
err = lbc.configurator.UpdateEndpoints(resourceExes.IngressExes)
if err != nil {
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.IngressExes, err)
}
break
}
}
}

if len(resourceExes.MergeableIngresses) > 0 {
resourcesFound = true
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.MergeableIngresses)
err = lbc.configurator.UpdateEndpointsMergeableIngress(resourceExes.MergeableIngresses)
if err != nil {
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.MergeableIngresses, err)
for _, mergeableIngresses := range resourceExes.MergeableIngresses {
if lbc.mergeableIngressRequiresEndpointsUpdate(mergeableIngresses, svcName) {
resourcesFound = true
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.MergeableIngresses)
err = lbc.configurator.UpdateEndpointsMergeableIngress(resourceExes.MergeableIngresses)
if err != nil {
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.MergeableIngresses, err)
}
break
}
}
}

if lbc.areCustomResourcesEnabled {
if len(resourceExes.VirtualServerExes) > 0 {
resourcesFound = true
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.VirtualServerExes)
err := lbc.configurator.UpdateEndpointsForVirtualServers(resourceExes.VirtualServerExes)
if err != nil {
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.VirtualServerExes, err)
for _, vsEx := range resourceExes.VirtualServerExes {
if lbc.virtualServerRequiresEndpointsUpdate(vsEx, svcName) {
resourcesFound = true
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.VirtualServerExes)
err := lbc.configurator.UpdateEndpointsForVirtualServers(resourceExes.VirtualServerExes)
if err != nil {
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.VirtualServerExes, err)
}
break
}
}
}

Expand All @@ -886,6 +902,63 @@ func (lbc *LoadBalancerController) syncEndpointSlices(task task) bool {
return resourcesFound
}

func (lbc *LoadBalancerController) virtualServerRequiresEndpointsUpdate(vsEx *configs.VirtualServerEx, serviceName string) bool {
for _, upstream := range vsEx.VirtualServer.Spec.Upstreams {
if upstream.Service == serviceName && !upstream.UseClusterIP {
return true
}
}

for _, vsr := range vsEx.VirtualServerRoutes {
for _, upstream := range vsr.Spec.Upstreams {
if upstream.Service == serviceName && !upstream.UseClusterIP {
return true
}
}
}

return false
}

func (lbc *LoadBalancerController) ingressRequiresEndpointsUpdate(ingressEx *configs.IngressEx, serviceName string) bool {
hasUseClusterIPAnnotation := ingressEx.Ingress.Annotations[useClusterIPAnnotation] == "true"

for _, rule := range ingressEx.Ingress.Spec.Rules {
if http := rule.HTTP; http != nil {
for _, path := range http.Paths {
if path.Backend.Service != nil && path.Backend.Service.Name == serviceName {
if !hasUseClusterIPAnnotation {
return true
}
}
}
}
}

if http := ingressEx.Ingress.Spec.DefaultBackend; http != nil {
if http.Service != nil && http.Service.Name == serviceName {
if !hasUseClusterIPAnnotation {
return true
}
}
}

return false
}

func (lbc *LoadBalancerController) mergeableIngressRequiresEndpointsUpdate(mergeableIngresses *configs.MergeableIngresses, serviceName string) bool {
masterIngress := mergeableIngresses.Master
minions := mergeableIngresses.Minions

for _, minion := range minions {
if lbc.ingressRequiresEndpointsUpdate(minion, serviceName) {
return true
}
}

return lbc.ingressRequiresEndpointsUpdate(masterIngress, serviceName)
}

func (lbc *LoadBalancerController) createExtendedResources(resources []Resource) configs.ExtendedResources {
var result configs.ExtendedResources

Expand Down Expand Up @@ -2793,6 +2866,7 @@ func (lbc *LoadBalancerController) createMergeableIngresses(ingConfig *IngressCo
}

func (lbc *LoadBalancerController) createIngressEx(ing *networking.Ingress, validHosts map[string]bool, validMinionPaths map[string]bool) *configs.IngressEx {
var endps []string
ingEx := &configs.IngressEx{
Ingress: ing,
ValidHosts: validHosts,
Expand Down Expand Up @@ -2874,6 +2948,7 @@ func (lbc *LoadBalancerController) createIngressEx(ing *networking.Ingress, vali
ingEx.HealthChecks = make(map[string]*api_v1.Probe)
ingEx.ExternalNameSvcs = make(map[string]bool)
ingEx.PodsByIP = make(map[string]configs.PodInfo)
hasUseClusterIP := ingEx.Ingress.Annotations[configs.UseClusterIPAnnotation] == "true"

if ing.Spec.DefaultBackend != nil {
podEndps := []podEndpoint{}
Expand All @@ -2892,7 +2967,11 @@ func (lbc *LoadBalancerController) createIngressEx(ing *networking.Ingress, vali
glog.Warningf("Error retrieving endpoints for the service %v: %v", ing.Spec.DefaultBackend.Service.Name, err)
}

endps := getIPAddressesFromEndpoints(podEndps)
if svc != nil && !external && hasUseClusterIP {
endps = []string{ipv6SafeAddrPort(svc.Spec.ClusterIP, ing.Spec.DefaultBackend.Service.Port.Number)}
} else {
endps = getIPAddressesFromEndpoints(podEndps)
}

// endps is empty if there was any error before this point
ingEx.Endpoints[ing.Spec.DefaultBackend.Service.Name+configs.GetBackendPortAsString(ing.Spec.DefaultBackend.Service.Port)] = endps
Expand Down Expand Up @@ -2948,7 +3027,11 @@ func (lbc *LoadBalancerController) createIngressEx(ing *networking.Ingress, vali
glog.Warningf("Error retrieving endpoints for the service %v: %v", path.Backend.Service.Name, err)
}

endps := getIPAddressesFromEndpoints(podEndps)
if svc != nil && !external && hasUseClusterIP {
endps = []string{ipv6SafeAddrPort(svc.Spec.ClusterIP, path.Backend.Service.Port.Number)}
} else {
endps = getIPAddressesFromEndpoints(podEndps)
}

// endps is empty if there was any error before this point
ingEx.Endpoints[path.Backend.Service.Name+configs.GetBackendPortAsString(path.Backend.Service.Port)] = endps
Expand Down
20 changes: 20 additions & 0 deletions tests/data/use-cluster-ip/ingress/mergeable/minion-ingress.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: use-cluster-ip-ingress-minion
annotations:
nginx.org/use-cluster-ip: "true"
nginx.org/mergeable-ingress-type: "minion"
spec:
ingressClassName: nginx
rules:
- host: use-cluster-ip.example.com
http:
paths:
- path: /backend1
pathType: Prefix
backend:
service:
name: backend1-svc
port:
number: 80
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
annotations:
nginx.org/mergeable-ingress-type: "master"
name: use-cluster-ip-ingress-master
spec:
ingressClassName: nginx
rules:
- host: use-cluster-ip.example.com
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
annotations:
nginx.org/use-cluster-ip: "true"
name: use-cluster-ip-ingress
spec:
ingressClassName: nginx
rules:
- host: use-cluster-ip.example.com
http:
paths:
- path: /backend1
pathType: Prefix
backend:
service:
name: backend1-svc
port:
number: 80
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apiVersion: k8s.nginx.org/v1
kind: VirtualServer
metadata:
name: virtual-server
spec:
host: virtual-server.example.com
upstreams:
- name: backend1
service: backend1-svc
port: 80
use-cluster-ip: true
- name: backend2
service: backend2-svc
port: 80
routes:
- path: "/backend1"
action:
pass: backend1
- path: "/backend2"
action:
pass: backend2
Loading

0 comments on commit 0095bc3

Please sign in to comment.