Skip to content

Commit

Permalink
move filter before to prevent unneeded processing (istio#46718)
Browse files Browse the repository at this point in the history
* move filter before

* move filter before

* Fix test

* revert log
  • Loading branch information
hzxuzhonghu committed Sep 14, 2023
1 parent ea5ac9d commit 491d7da
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 29 deletions.
26 changes: 5 additions & 21 deletions pilot/pkg/bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,18 +847,12 @@ func (s *Server) initRegistryEventHandlers() {
log.Info("initializing registry event handlers")
// Flush cached discovery responses whenever services configuration change.
serviceHandler := func(prev, curr *model.Service, event model.Event) {
needsPush := true
if event == model.EventUpdate {
needsPush = serviceUpdateNeedsPush(prev, curr)
}
if needsPush {
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.ServiceEntry, Name: string(curr.Hostname), Namespace: curr.Attributes.Namespace}),
Reason: model.NewReasonStats(model.ServiceUpdate),
}
s.XDSServer.ConfigUpdate(pushReq)
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.ServiceEntry, Name: string(curr.Hostname), Namespace: curr.Attributes.Namespace}),
Reason: model.NewReasonStats(model.ServiceUpdate),
}
s.XDSServer.ConfigUpdate(pushReq)
}
s.ServiceController().AppendServiceHandler(serviceHandler)

Expand Down Expand Up @@ -1335,13 +1329,3 @@ func (s *Server) initReadinessProbes() {
s.addReadinessProbe(name, probe)
}
}

func serviceUpdateNeedsPush(prev, curr *model.Service) bool {
if !features.EnableOptimizedServicePush {
return true
}
if prev == nil {
return true
}
return !prev.Equals(curr)
}
29 changes: 22 additions & 7 deletions pilot/pkg/serviceregistry/kube/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (c *Controller) Cleanup() error {
return nil
}

func (c *Controller) onServiceEvent(_, curr *v1.Service, event model.Event) error {
func (c *Controller) onServiceEvent(pre, curr *v1.Service, event model.Event) error {
log.Debugf("Handle event %s for service %s in namespace %s", event, curr.Name, curr.Namespace)

// Create the standard (cluster.local) service.
Expand Down Expand Up @@ -409,6 +409,12 @@ func (c *Controller) deleteService(svc *model.Service) {
}

func (c *Controller) addOrUpdateService(curr *v1.Service, currConv *model.Service, event model.Event, updateEDSCache bool) {
// instance conversion is only required when service is added/updated.
c.Lock()
prevConv := c.servicesMap[currConv.Hostname]
c.servicesMap[currConv.Hostname] = currConv
c.Unlock()

needsFullPush := false
// First, process nodePort gateway service, whose externalIPs specified
// and loadbalancer gateway service
Expand All @@ -429,12 +435,6 @@ func (c *Controller) addOrUpdateService(curr *v1.Service, currConv *model.Servic
if curr != nil && curr.Spec.Type == v1.ServiceTypeExternalName {
updateEDSCache = true
}
var prevConv *model.Service
// instance conversion is only required when service is added/updated.
c.Lock()
prevConv = c.servicesMap[currConv.Hostname]
c.servicesMap[currConv.Hostname] = currConv
c.Unlock()

// This full push needed to update ALL ends endpoints, even though we do a full push on service add/update
// as that full push is only triggered for the specific service.
Expand All @@ -455,6 +455,11 @@ func (c *Controller) addOrUpdateService(curr *v1.Service, currConv *model.Servic
}
}

// filter out same service event
if event == model.EventUpdate && !serviceUpdateNeedsPush(prevConv, currConv) {
return
}

c.opts.XDSUpdater.SvcUpdate(shard, string(currConv.Hostname), ns, event)
c.handlers.NotifyServiceHandlers(prevConv, currConv, event)
}
Expand Down Expand Up @@ -1131,3 +1136,13 @@ func (c *Controller) servicesForNamespacedName(name types.NamespacedName) []*mod
}
return nil
}

func serviceUpdateNeedsPush(prev, curr *model.Service) bool {
if !features.EnableOptimizedServicePush {
return true
}
if prev == nil {
return true
}
return !prev.Equals(curr)
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (ic *serviceImportCacheImpl) onServiceImportEvent(_, obj controllers.Object

// The service already existed. Treat it as an update.
event = model.EventUpdate

mcsService = mcsService.DeepCopy()
if ic.updateIPs(mcsService, ips) {
needsFullPush = true
}
Expand All @@ -193,6 +193,7 @@ func (ic *serviceImportCacheImpl) onServiceImportEvent(_, obj controllers.Object
// a change to the discoverability policy.
ic.addOrUpdateService(nil, mcsService, event, true)

// TODO: do we really need a full push, we should do it in `addOrUpdateService`.
if needsFullPush {
ic.doFullPush(mcsHost, si.GetNamespace())
}
Expand Down

0 comments on commit 491d7da

Please sign in to comment.