Skip to content

Commit

Permalink
xds/circuit_breaking: counters should be keyed by {cluster, EDS servi…
Browse files Browse the repository at this point in the history
…ce name} pair (#4560)
  • Loading branch information
menghanl committed Jun 22, 2021
1 parent 50328cf commit 14c7ed6
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 49 deletions.
6 changes: 3 additions & 3 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,8 +596,8 @@ func (s) TestCircuitBreaking(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will update
// the service's counter with the new max requests.
var maxRequests uint32 = 1
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName, MaxRequests: &maxRequests}
wantCCS := edsCCS(serviceName, &maxRequests, false)
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: clusterName, MaxRequests: &maxRequests}
wantCCS := edsCCS(clusterName, &maxRequests, false)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
Expand All @@ -606,7 +606,7 @@ func (s) TestCircuitBreaking(t *testing.T) {

// Since the counter's max requests was set to 1, the first request should
// succeed and the second should fail.
counter := xdsclient.GetServiceRequestsCounter(serviceName)
counter := xdsclient.GetClusterRequestsCounter(clusterName, "")
if err := counter.StartRequest(maxRequests); err != nil {
t.Fatal(err)
}
Expand Down
10 changes: 5 additions & 5 deletions xds/internal/balancer/clusterimpl/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func init() {
// TestDropByCategory verifies that the balancer correctly drops the picks, and
// that the drops are reported.
func TestDropByCategory(t *testing.T) {
defer xdsclient.ClearCounterForTesting(testClusterName)
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()
defer xdsC.Close()

Expand Down Expand Up @@ -224,7 +224,7 @@ func TestDropByCategory(t *testing.T) {
// TestDropCircuitBreaking verifies that the balancer correctly drops the picks
// due to circuit breaking, and that the drops are reported.
func TestDropCircuitBreaking(t *testing.T) {
defer xdsclient.ClearCounterForTesting(testClusterName)
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()
defer xdsC.Close()

Expand Down Expand Up @@ -332,7 +332,7 @@ func TestDropCircuitBreaking(t *testing.T) {
// picker after it's closed. Because picker updates are sent in the run()
// goroutine.
func TestPickerUpdateAfterClose(t *testing.T) {
defer xdsclient.ClearCounterForTesting(testClusterName)
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()
defer xdsC.Close()

Expand Down Expand Up @@ -373,7 +373,7 @@ func TestPickerUpdateAfterClose(t *testing.T) {
// TestClusterNameInAddressAttributes covers the case that cluster name is
// attached to the subconn address attributes.
func TestClusterNameInAddressAttributes(t *testing.T) {
defer xdsclient.ClearCounterForTesting(testClusterName)
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()
defer xdsC.Close()

Expand Down Expand Up @@ -458,7 +458,7 @@ func TestClusterNameInAddressAttributes(t *testing.T) {
// TestReResolution verifies that when a SubConn turns transient failure,
// re-resolution is triggered.
func TestReResolution(t *testing.T) {
defer xdsclient.ClearCounterForTesting(testClusterName)
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()
defer xdsC.Close()

Expand Down
10 changes: 6 additions & 4 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ type clusterImplBalancer struct {
dropCategories []DropConfig // The categories for drops.
drops []*dropper
requestCounterCluster string // The cluster name for the request counter.
requestCounter *xdsclient.ServiceRequestsCounter
requestCounterService string // The service name for the request counter.
requestCounter *xdsclient.ClusterRequestsCounter
requestCountMax uint32
pickerUpdateCh *buffer.Unbounded
}
Expand Down Expand Up @@ -323,7 +324,7 @@ func (b *clusterImplBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resol

type dropConfigs struct {
drops []*dropper
requestCounter *xdsclient.ServiceRequestsCounter
requestCounter *xdsclient.ClusterRequestsCounter
requestCountMax uint32
}

Expand All @@ -344,9 +345,10 @@ func (b *clusterImplBalancer) handleDropAndRequestCount(newConfig *LBConfig) *dr

// Compare cluster name. And update picker if it's changed, because circuit
// breaking's stream counter will be different.
if b.requestCounterCluster != newConfig.Cluster {
if b.requestCounterCluster != newConfig.Cluster || b.requestCounterService != newConfig.EDSServiceName {
b.requestCounterCluster = newConfig.Cluster
b.requestCounter = xdsclient.GetServiceRequestsCounter(newConfig.Cluster)
b.requestCounterService = newConfig.EDSServiceName
b.requestCounter = xdsclient.GetClusterRequestsCounter(newConfig.Cluster, newConfig.EDSServiceName)
updatePicker = true
}
// Compare upper bound of stream count. And update picker if it's changed.
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/clusterimpl/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type dropPicker struct {
drops []*dropper
s balancer.State
loadStore loadReporter
counter *xdsclient.ServiceRequestsCounter
counter *xdsclient.ClusterRequestsCounter
countMax uint32
}

Expand Down
53 changes: 33 additions & 20 deletions xds/internal/xdsclient/requests_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,61 +24,74 @@ import (
"sync/atomic"
)

type servicesRequestsCounter struct {
type clusterNameAndServiceName struct {
clusterName, edsServcieName string
}

type clusterRequestsCounter struct {
mu sync.Mutex
services map[string]*ServiceRequestsCounter
clusters map[clusterNameAndServiceName]*ClusterRequestsCounter
}

var src = &servicesRequestsCounter{
services: make(map[string]*ServiceRequestsCounter),
var src = &clusterRequestsCounter{
clusters: make(map[clusterNameAndServiceName]*ClusterRequestsCounter),
}

// ServiceRequestsCounter is used to track the total inflight requests for a
// ClusterRequestsCounter is used to track the total inflight requests for a
// service with the provided name.
type ServiceRequestsCounter struct {
ServiceName string
numRequests uint32
type ClusterRequestsCounter struct {
ClusterName string
EDSServiceName string
numRequests uint32
}

// GetServiceRequestsCounter returns the ServiceRequestsCounter with the
// GetClusterRequestsCounter returns the ClusterRequestsCounter with the
// provided serviceName. If one does not exist, it creates it.
func GetServiceRequestsCounter(serviceName string) *ServiceRequestsCounter {
func GetClusterRequestsCounter(clusterName, edsServiceName string) *ClusterRequestsCounter {
src.mu.Lock()
defer src.mu.Unlock()
c, ok := src.services[serviceName]
k := clusterNameAndServiceName{
clusterName: clusterName,
edsServcieName: edsServiceName,
}
c, ok := src.clusters[k]
if !ok {
c = &ServiceRequestsCounter{ServiceName: serviceName}
src.services[serviceName] = c
c = &ClusterRequestsCounter{ClusterName: clusterName}
src.clusters[k] = c
}
return c
}

// StartRequest starts a request for a service, incrementing its number of
// StartRequest starts a request for a cluster, incrementing its number of
// requests by 1. Returns an error if the max number of requests is exceeded.
func (c *ServiceRequestsCounter) StartRequest(max uint32) error {
func (c *ClusterRequestsCounter) StartRequest(max uint32) error {
// Note that during race, the limits could be exceeded. This is allowed:
// "Since the implementation is eventually consistent, races between threads
// may allow limits to be potentially exceeded."
// https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/circuit_breaking#arch-overview-circuit-break.
if atomic.LoadUint32(&c.numRequests) >= max {
return fmt.Errorf("max requests %v exceeded on service %v", max, c.ServiceName)
return fmt.Errorf("max requests %v exceeded on service %v", max, c.ClusterName)
}
atomic.AddUint32(&c.numRequests, 1)
return nil
}

// EndRequest ends a request for a service, decrementing its number of requests
// by 1.
func (c *ServiceRequestsCounter) EndRequest() {
func (c *ClusterRequestsCounter) EndRequest() {
atomic.AddUint32(&c.numRequests, ^uint32(0))
}

// ClearCounterForTesting clears the counter for the service. Should be only
// used in tests.
func ClearCounterForTesting(serviceName string) {
func ClearCounterForTesting(clusterName, edsServiceName string) {
src.mu.Lock()
defer src.mu.Unlock()
c, ok := src.services[serviceName]
k := clusterNameAndServiceName{
clusterName: clusterName,
edsServcieName: edsServiceName,
}
c, ok := src.clusters[k]
if !ok {
return
}
Expand All @@ -90,5 +103,5 @@ func ClearCounterForTesting(serviceName string) {
func ClearAllCountersForTesting() {
src.mu.Lock()
defer src.mu.Unlock()
src.services = make(map[string]*ServiceRequestsCounter)
src.clusters = make(map[clusterNameAndServiceName]*ClusterRequestsCounter)
}
34 changes: 18 additions & 16 deletions xds/internal/xdsclient/requests_counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"testing"
)

const testService = "test-service-name"

type counterTest struct {
name string
maxRequests uint32
Expand All @@ -51,9 +53,9 @@ var tests = []counterTest{
},
}

func resetServiceRequestsCounter() {
src = &servicesRequestsCounter{
services: make(map[string]*ServiceRequestsCounter),
func resetClusterRequestsCounter() {
src = &clusterRequestsCounter{
clusters: make(map[clusterNameAndServiceName]*ClusterRequestsCounter),
}
}

Expand All @@ -67,7 +69,7 @@ func testCounter(t *testing.T, test counterTest) {
var successes, errors uint32
for i := 0; i < int(test.numRequests); i++ {
go func() {
counter := GetServiceRequestsCounter(test.name)
counter := GetClusterRequestsCounter(test.name, testService)
defer requestsDone.Done()
err := counter.StartRequest(test.maxRequests)
if err == nil {
Expand Down Expand Up @@ -103,26 +105,26 @@ func testCounter(t *testing.T, test counterTest) {
}

func (s) TestRequestsCounter(t *testing.T) {
defer resetServiceRequestsCounter()
defer resetClusterRequestsCounter()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testCounter(t, test)
})
}
}

func (s) TestGetServiceRequestsCounter(t *testing.T) {
defer resetServiceRequestsCounter()
func (s) TestGetClusterRequestsCounter(t *testing.T) {
defer resetClusterRequestsCounter()
for _, test := range tests {
counterA := GetServiceRequestsCounter(test.name)
counterB := GetServiceRequestsCounter(test.name)
counterA := GetClusterRequestsCounter(test.name, testService)
counterB := GetClusterRequestsCounter(test.name, testService)
if counterA != counterB {
t.Errorf("counter %v %v != counter %v %v", counterA, *counterA, counterB, *counterB)
}
}
}

func startRequests(t *testing.T, n uint32, max uint32, counter *ServiceRequestsCounter) {
func startRequests(t *testing.T, n uint32, max uint32, counter *ClusterRequestsCounter) {
for i := uint32(0); i < n; i++ {
if err := counter.StartRequest(max); err != nil {
t.Fatalf("error starting initial request: %v", err)
Expand All @@ -131,11 +133,11 @@ func startRequests(t *testing.T, n uint32, max uint32, counter *ServiceRequestsC
}

func (s) TestSetMaxRequestsIncreased(t *testing.T) {
defer resetServiceRequestsCounter()
const serviceName string = "set-max-requests-increased"
defer resetClusterRequestsCounter()
const clusterName string = "set-max-requests-increased"
var initialMax uint32 = 16

counter := GetServiceRequestsCounter(serviceName)
counter := GetClusterRequestsCounter(clusterName, testService)
startRequests(t, initialMax, initialMax, counter)
if err := counter.StartRequest(initialMax); err == nil {
t.Fatal("unexpected success on start request after max met")
Expand All @@ -148,11 +150,11 @@ func (s) TestSetMaxRequestsIncreased(t *testing.T) {
}

func (s) TestSetMaxRequestsDecreased(t *testing.T) {
defer resetServiceRequestsCounter()
const serviceName string = "set-max-requests-decreased"
defer resetClusterRequestsCounter()
const clusterName string = "set-max-requests-decreased"
var initialMax uint32 = 16

counter := GetServiceRequestsCounter(serviceName)
counter := GetClusterRequestsCounter(clusterName, testService)
startRequests(t, initialMax-1, initialMax, counter)

newMax := initialMax - 1
Expand Down

0 comments on commit 14c7ed6

Please sign in to comment.