diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 206918a37d9..8b103143ff7 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -596,8 +596,8 @@ func (s) TestCircuitBreaking(t *testing.T) { // will trigger the watch handler on the CDS balancer, which will update // the service's counter with the new max requests. var maxRequests uint32 = 1 - cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName, MaxRequests: &maxRequests} - wantCCS := edsCCS(serviceName, &maxRequests, false) + cdsUpdate := xdsclient.ClusterUpdate{ClusterName: clusterName, MaxRequests: &maxRequests} + wantCCS := edsCCS(clusterName, &maxRequests, false) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer ctxCancel() if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { @@ -606,7 +606,7 @@ func (s) TestCircuitBreaking(t *testing.T) { // Since the counter's max requests was set to 1, the first request should // succeed and the second should fail. - counter := xdsclient.GetServiceRequestsCounter(serviceName) + counter := xdsclient.GetClusterRequestsCounter(clusterName, "") if err := counter.StartRequest(maxRequests); err != nil { t.Fatal(err) } diff --git a/xds/internal/balancer/clusterimpl/balancer_test.go b/xds/internal/balancer/clusterimpl/balancer_test.go index ab3613bec31..3cb34200b62 100644 --- a/xds/internal/balancer/clusterimpl/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/balancer_test.go @@ -72,7 +72,7 @@ func init() { // TestDropByCategory verifies that the balancer correctly drops the picks, and // that the drops are reported. func TestDropByCategory(t *testing.T) { - defer xdsclient.ClearCounterForTesting(testClusterName) + defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) xdsC := fakeclient.NewClient() defer xdsC.Close() @@ -224,7 +224,7 @@ func TestDropByCategory(t *testing.T) { // TestDropCircuitBreaking verifies that the balancer correctly drops the picks // due to circuit breaking, and that the drops are reported. func TestDropCircuitBreaking(t *testing.T) { - defer xdsclient.ClearCounterForTesting(testClusterName) + defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) xdsC := fakeclient.NewClient() defer xdsC.Close() @@ -332,7 +332,7 @@ func TestDropCircuitBreaking(t *testing.T) { // picker after it's closed. Because picker updates are sent in the run() // goroutine. func TestPickerUpdateAfterClose(t *testing.T) { - defer xdsclient.ClearCounterForTesting(testClusterName) + defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) xdsC := fakeclient.NewClient() defer xdsC.Close() @@ -373,7 +373,7 @@ func TestPickerUpdateAfterClose(t *testing.T) { // TestClusterNameInAddressAttributes covers the case that cluster name is // attached to the subconn address attributes. func TestClusterNameInAddressAttributes(t *testing.T) { - defer xdsclient.ClearCounterForTesting(testClusterName) + defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) xdsC := fakeclient.NewClient() defer xdsC.Close() @@ -458,7 +458,7 @@ func TestClusterNameInAddressAttributes(t *testing.T) { // TestReResolution verifies that when a SubConn turns transient failure, // re-resolution is triggered. func TestReResolution(t *testing.T) { - defer xdsclient.ClearCounterForTesting(testClusterName) + defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) xdsC := fakeclient.NewClient() defer xdsC.Close() diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 34e7171d2c7..64b175d3c95 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -115,7 +115,8 @@ type clusterImplBalancer struct { dropCategories []DropConfig // The categories for drops. drops []*dropper requestCounterCluster string // The cluster name for the request counter. - requestCounter *xdsclient.ServiceRequestsCounter + requestCounterService string // The service name for the request counter. + requestCounter *xdsclient.ClusterRequestsCounter requestCountMax uint32 pickerUpdateCh *buffer.Unbounded } @@ -323,7 +324,7 @@ func (b *clusterImplBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resol type dropConfigs struct { drops []*dropper - requestCounter *xdsclient.ServiceRequestsCounter + requestCounter *xdsclient.ClusterRequestsCounter requestCountMax uint32 } @@ -344,9 +345,10 @@ func (b *clusterImplBalancer) handleDropAndRequestCount(newConfig *LBConfig) *dr // Compare cluster name. And update picker if it's changed, because circuit // breaking's stream counter will be different. - if b.requestCounterCluster != newConfig.Cluster { + if b.requestCounterCluster != newConfig.Cluster || b.requestCounterService != newConfig.EDSServiceName { b.requestCounterCluster = newConfig.Cluster - b.requestCounter = xdsclient.GetServiceRequestsCounter(newConfig.Cluster) + b.requestCounterService = newConfig.EDSServiceName + b.requestCounter = xdsclient.GetClusterRequestsCounter(newConfig.Cluster, newConfig.EDSServiceName) updatePicker = true } // Compare upper bound of stream count. And update picker if it's changed. diff --git a/xds/internal/balancer/clusterimpl/picker.go b/xds/internal/balancer/clusterimpl/picker.go index a03b89179ee..c2693258e12 100644 --- a/xds/internal/balancer/clusterimpl/picker.go +++ b/xds/internal/balancer/clusterimpl/picker.go @@ -75,7 +75,7 @@ type dropPicker struct { drops []*dropper s balancer.State loadStore loadReporter - counter *xdsclient.ServiceRequestsCounter + counter *xdsclient.ClusterRequestsCounter countMax uint32 } diff --git a/xds/internal/xdsclient/requests_counter.go b/xds/internal/xdsclient/requests_counter.go index b7f94332da2..beed2e9d0ad 100644 --- a/xds/internal/xdsclient/requests_counter.go +++ b/xds/internal/xdsclient/requests_counter.go @@ -24,44 +24,53 @@ import ( "sync/atomic" ) -type servicesRequestsCounter struct { +type clusterNameAndServiceName struct { + clusterName, edsServcieName string +} + +type clusterRequestsCounter struct { mu sync.Mutex - services map[string]*ServiceRequestsCounter + clusters map[clusterNameAndServiceName]*ClusterRequestsCounter } -var src = &servicesRequestsCounter{ - services: make(map[string]*ServiceRequestsCounter), +var src = &clusterRequestsCounter{ + clusters: make(map[clusterNameAndServiceName]*ClusterRequestsCounter), } -// ServiceRequestsCounter is used to track the total inflight requests for a +// ClusterRequestsCounter is used to track the total inflight requests for a // service with the provided name. -type ServiceRequestsCounter struct { - ServiceName string - numRequests uint32 +type ClusterRequestsCounter struct { + ClusterName string + EDSServiceName string + numRequests uint32 } -// GetServiceRequestsCounter returns the ServiceRequestsCounter with the +// GetClusterRequestsCounter returns the ClusterRequestsCounter with the // provided serviceName. If one does not exist, it creates it. -func GetServiceRequestsCounter(serviceName string) *ServiceRequestsCounter { +func GetClusterRequestsCounter(clusterName, edsServiceName string) *ClusterRequestsCounter { src.mu.Lock() defer src.mu.Unlock() - c, ok := src.services[serviceName] + k := clusterNameAndServiceName{ + clusterName: clusterName, + edsServcieName: edsServiceName, + } + c, ok := src.clusters[k] if !ok { - c = &ServiceRequestsCounter{ServiceName: serviceName} - src.services[serviceName] = c + c = &ClusterRequestsCounter{ClusterName: clusterName} + src.clusters[k] = c } return c } -// StartRequest starts a request for a service, incrementing its number of +// StartRequest starts a request for a cluster, incrementing its number of // requests by 1. Returns an error if the max number of requests is exceeded. -func (c *ServiceRequestsCounter) StartRequest(max uint32) error { +func (c *ClusterRequestsCounter) StartRequest(max uint32) error { // Note that during race, the limits could be exceeded. This is allowed: // "Since the implementation is eventually consistent, races between threads // may allow limits to be potentially exceeded." // https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/circuit_breaking#arch-overview-circuit-break. if atomic.LoadUint32(&c.numRequests) >= max { - return fmt.Errorf("max requests %v exceeded on service %v", max, c.ServiceName) + return fmt.Errorf("max requests %v exceeded on service %v", max, c.ClusterName) } atomic.AddUint32(&c.numRequests, 1) return nil @@ -69,16 +78,20 @@ func (c *ServiceRequestsCounter) StartRequest(max uint32) error { // EndRequest ends a request for a service, decrementing its number of requests // by 1. -func (c *ServiceRequestsCounter) EndRequest() { +func (c *ClusterRequestsCounter) EndRequest() { atomic.AddUint32(&c.numRequests, ^uint32(0)) } // ClearCounterForTesting clears the counter for the service. Should be only // used in tests. -func ClearCounterForTesting(serviceName string) { +func ClearCounterForTesting(clusterName, edsServiceName string) { src.mu.Lock() defer src.mu.Unlock() - c, ok := src.services[serviceName] + k := clusterNameAndServiceName{ + clusterName: clusterName, + edsServcieName: edsServiceName, + } + c, ok := src.clusters[k] if !ok { return } @@ -90,5 +103,5 @@ func ClearCounterForTesting(serviceName string) { func ClearAllCountersForTesting() { src.mu.Lock() defer src.mu.Unlock() - src.services = make(map[string]*ServiceRequestsCounter) + src.clusters = make(map[clusterNameAndServiceName]*ClusterRequestsCounter) } diff --git a/xds/internal/xdsclient/requests_counter_test.go b/xds/internal/xdsclient/requests_counter_test.go index f444e8f163e..cd95aeaf82e 100644 --- a/xds/internal/xdsclient/requests_counter_test.go +++ b/xds/internal/xdsclient/requests_counter_test.go @@ -26,6 +26,8 @@ import ( "testing" ) +const testService = "test-service-name" + type counterTest struct { name string maxRequests uint32 @@ -51,9 +53,9 @@ var tests = []counterTest{ }, } -func resetServiceRequestsCounter() { - src = &servicesRequestsCounter{ - services: make(map[string]*ServiceRequestsCounter), +func resetClusterRequestsCounter() { + src = &clusterRequestsCounter{ + clusters: make(map[clusterNameAndServiceName]*ClusterRequestsCounter), } } @@ -67,7 +69,7 @@ func testCounter(t *testing.T, test counterTest) { var successes, errors uint32 for i := 0; i < int(test.numRequests); i++ { go func() { - counter := GetServiceRequestsCounter(test.name) + counter := GetClusterRequestsCounter(test.name, testService) defer requestsDone.Done() err := counter.StartRequest(test.maxRequests) if err == nil { @@ -103,7 +105,7 @@ func testCounter(t *testing.T, test counterTest) { } func (s) TestRequestsCounter(t *testing.T) { - defer resetServiceRequestsCounter() + defer resetClusterRequestsCounter() for _, test := range tests { t.Run(test.name, func(t *testing.T) { testCounter(t, test) @@ -111,18 +113,18 @@ func (s) TestRequestsCounter(t *testing.T) { } } -func (s) TestGetServiceRequestsCounter(t *testing.T) { - defer resetServiceRequestsCounter() +func (s) TestGetClusterRequestsCounter(t *testing.T) { + defer resetClusterRequestsCounter() for _, test := range tests { - counterA := GetServiceRequestsCounter(test.name) - counterB := GetServiceRequestsCounter(test.name) + counterA := GetClusterRequestsCounter(test.name, testService) + counterB := GetClusterRequestsCounter(test.name, testService) if counterA != counterB { t.Errorf("counter %v %v != counter %v %v", counterA, *counterA, counterB, *counterB) } } } -func startRequests(t *testing.T, n uint32, max uint32, counter *ServiceRequestsCounter) { +func startRequests(t *testing.T, n uint32, max uint32, counter *ClusterRequestsCounter) { for i := uint32(0); i < n; i++ { if err := counter.StartRequest(max); err != nil { t.Fatalf("error starting initial request: %v", err) @@ -131,11 +133,11 @@ func startRequests(t *testing.T, n uint32, max uint32, counter *ServiceRequestsC } func (s) TestSetMaxRequestsIncreased(t *testing.T) { - defer resetServiceRequestsCounter() - const serviceName string = "set-max-requests-increased" + defer resetClusterRequestsCounter() + const clusterName string = "set-max-requests-increased" var initialMax uint32 = 16 - counter := GetServiceRequestsCounter(serviceName) + counter := GetClusterRequestsCounter(clusterName, testService) startRequests(t, initialMax, initialMax, counter) if err := counter.StartRequest(initialMax); err == nil { t.Fatal("unexpected success on start request after max met") @@ -148,11 +150,11 @@ func (s) TestSetMaxRequestsIncreased(t *testing.T) { } func (s) TestSetMaxRequestsDecreased(t *testing.T) { - defer resetServiceRequestsCounter() - const serviceName string = "set-max-requests-decreased" + defer resetClusterRequestsCounter() + const clusterName string = "set-max-requests-decreased" var initialMax uint32 = 16 - counter := GetServiceRequestsCounter(serviceName) + counter := GetClusterRequestsCounter(clusterName, testService) startRequests(t, initialMax-1, initialMax, counter) newMax := initialMax - 1