Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: use locality from the connected address for load reporting #7378

Merged
merged 9 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address review comments #1
  • Loading branch information
townba committed Jul 2, 2024
commit 32d869b88f6c794a173da7207666907e11426474
13 changes: 10 additions & 3 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func unregisterForTesting(name string) {
delete(m, name)
}

// getConnectedAddress returns the connected address for a SubConnState.
// getConnectedAddress returns the connected address for a SubConnState and
easwars marked this conversation as resolved.
Show resolved Hide resolved
// whether or not it is valid.
func getConnectedAddress(scs SubConnState) (resolver.Address, bool) {
return scs.connectedAddress, scs.ConnectivityState == connectivity.Ready
}
Expand Down Expand Up @@ -422,11 +423,17 @@ type SubConnState struct {
// ConnectionError is set if the ConnectivityState is TransientFailure,
// describing the reason the SubConn failed. Otherwise, it is nil.
ConnectionError error
// connectedAddr contains the connected address when ConnectivityState is Ready. Otherwise, it is
// indeterminate.
// connectedAddr contains the connected address when ConnectivityState is
// Ready. Otherwise, it is indeterminate.
connectedAddress resolver.Address
}

func (lhs SubConnState) Equal(rhs SubConnState) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we delete this entirely when connectedAddress is removed? If so, ignore. If not, then should this accept pointers instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed entirely.

This was only added to get this single check in xds/internal/balancer/outlierdetection/balancer_test.go to pass, but that doesn't work if it uses pointers. I've removed this in favor of adding balancer.SubConnState{} to the cmp.AllowUnexported.

return lhs.ConnectivityState == rhs.ConnectivityState &&
lhs.ConnectionError == rhs.ConnectionError &&
lhs.connectedAddress.Addr == rhs.connectedAddress.Addr
easwars marked this conversation as resolved.
Show resolved Hide resolved
}

// ClientConnState describes the state of a ClientConn relevant to the
// balancer.
type ClientConnState struct {
Expand Down
6 changes: 3 additions & 3 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
grpcinternal "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
Expand Down Expand Up @@ -263,8 +263,8 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolve
// TODO: delete this comment when UpdateSubConnState is removed.
scs := balancer.SubConnState{ConnectivityState: s, ConnectionError: err}
if s == connectivity.Ready {
if SetConnectedAddress, ok := grpcinternal.SetConnectedAddress.(func(state *balancer.SubConnState, addr resolver.Address)); ok {
SetConnectedAddress(&scs, curAddr)
if sca, ok := internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address)); ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please define this as a global instead.

var setConnectedAddress = internal.SetConnectedAddress.(func(*.........))

That way we don't have a conditional here and a runtime unknown. Instead it's an init time assertion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

sca(&scs, curAddr)
}
}
acbw.stateListener(scs)
Expand Down
44 changes: 16 additions & 28 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@

const (
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
withBalancerAttributes = true
withoutBalancerAttributes = false
minConnectTimeout = 20 * time.Second
)

var (
Expand Down Expand Up @@ -816,25 +814,16 @@

// addressWithoutBalancerAttributes returns a copy of the input address with
// the BalancerAttributes field cleared.
func addressWithoutBalancerAttributes(a resolver.Address) resolver.Address {

Check failure on line 817 in clientconn.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

func addressWithoutBalancerAttributes is unused (U1000)
a.BalancerAttributes = nil
return a

Check warning on line 819 in clientconn.go

View check run for this annotation

Codecov / codecov/patch

clientconn.go#L817-L819

Added lines #L817 - L819 were not covered by tests
}

// Makes a copy of the input addresses slice and optionally clears out the
// balancer attributes field. Addresses are passed during subconn creation and
// address update operations. In both cases, we may clear the balancer
// attributes by calling this function, which would therefore allow us to use
// the Equal method provided by the resolver.Address type for comparison.
func copyAddresses(in []resolver.Address, includeBalancerAttributes bool) []resolver.Address {
// Makes a copy of the input addresses slice. Addresses are passed during
// subconn creation and address update operations.
func copyAddresses(in []resolver.Address) []resolver.Address {
out := make([]resolver.Address, len(in))
for i := range in {
if includeBalancerAttributes {
out[i] = in[i]
} else {
out[i] = addressWithoutBalancerAttributes(in[i])
}
}
copy(out, in)
return out
}

Expand All @@ -849,7 +838,7 @@
ac := &addrConn{
state: connectivity.Idle,
cc: cc,
addrs: copyAddresses(addrs, withBalancerAttributes),
addrs: copyAddresses(addrs),
scopts: opts,
dopts: cc.dopts,
channelz: channelz.RegisterSubChannel(cc.channelz, ""),
Expand Down Expand Up @@ -936,18 +925,18 @@
return nil
}

func equalAddressIgnoreBalancerAttributes(a, b resolver.Address) bool {
func equalAddress(a, b resolver.Address) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably makes sense to add a function level comment here saying why we are not using the resolver.Address.Equal and instead defining our own. Something as simple as:

// equalAddress returns true is a and b are considered equal.
// This is different from the Equal method on the resolver.Address type 
// which considers all fields to determine equality. Here, we only consider
// fields that are meaningful to the subConn.
func equalAddress(a, b resolver.Address) bool { ... }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming the function might be an even nicer improvement. equalIgnoringBalAttributes?

Copy link
Contributor Author

@townba townba Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Used equalAddressIgnoringBalAttributes so I could also have equalAddressesIgnoringBalAttributes below.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should probably be pointers to avoid the copy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return a.Addr == b.Addr && a.ServerName == b.ServerName &&
a.Attributes.Equal(b.Attributes) &&
a.Metadata == b.Metadata
}

func equalAddressesIgnoreBalancerAttributes(a, b []resolver.Address) bool {
func equalAddresses(a, b []resolver.Address) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if !equalAddressIgnoreBalancerAttributes(v, b[i]) {
if !equalAddress(v, b[i]) {
return false
}
}
Expand All @@ -957,15 +946,15 @@
// updateAddrs updates ac.addrs with the new addresses list and handles active
// connections or connection attempts.
func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
addrs = copyAddresses(addrs, withBalancerAttributes)
addrs = copyAddresses(addrs)
limit := len(addrs)
if limit > 5 {
limit = 5
}
channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs addrs (%d of %d): %v", limit, len(addrs), copyAddresses(addrs[:limit], withoutBalancerAttributes))
channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs addrs (%d of %d): %v", limit, len(addrs), addrs[:limit])

ac.mu.Lock()
if equalAddressesIgnoreBalancerAttributes(ac.addrs, addrs) {
if equalAddresses(ac.addrs, addrs) {
ac.mu.Unlock()
return
}
Expand All @@ -984,7 +973,7 @@
// Try to find the connected address.
for _, a := range addrs {
a.ServerName = ac.cc.getServerName(a)
if equalAddressIgnoreBalancerAttributes(a, ac.curAddr) {
if equalAddress(a, ac.curAddr) {
// We are connected to a valid address, so do nothing but
// update the addresses.
ac.mu.Unlock()
Expand Down Expand Up @@ -1365,7 +1354,6 @@
// new transport.
func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
addr.ServerName = ac.cc.getServerName(addr)
addrWithoutBalancerAttributes := addressWithoutBalancerAttributes(addr)
hctx, hcancel := context.WithCancel(ctx)

onClose := func(r transport.GoAwayReason) {
Expand Down Expand Up @@ -1400,14 +1388,14 @@
defer cancel()
copts.ChannelzParent = ac.channelz

newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addrWithoutBalancerAttributes, copts, onClose)
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onClose)
if err != nil {
if logger.V(2) {
logger.Infof("Creating new client transport to %q: %v", addrWithoutBalancerAttributes, err)
logger.Infof("Creating new client transport to %q: %v", addr, err)
}
// newTr is either nil, or closed.
hcancel()
channelz.Warningf(logger, ac.channelz, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addrWithoutBalancerAttributes, err)
channelz.Warningf(logger, ac.channelz, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err)
return err
}

Expand Down
22 changes: 12 additions & 10 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
grpcinternal "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/grpclog"
Expand Down Expand Up @@ -361,24 +361,26 @@
func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
clusterName := b.getClusterName()
newAddrs := make([]resolver.Address, len(addrs))
// TODO: Don't set the locality here. Let the StateListener handle it all.
var lID xdsinternal.LocalityID
for i, addr := range addrs {
newAddrs[i] = xds.SetXDSHandshakeClusterName(addr, clusterName)
lID = xdsinternal.GetLocalityID(newAddrs[i])
}
var sc balancer.SubConn
ret := &scWrapper{}
scw := &scWrapper{}
oldListener := opts.StateListener
opts.StateListener = func(state balancer.SubConnState) {
b.updateSubConnState(sc, state, oldListener)
// Read connected address and call updateLocalityID() based on the connected address's locality.
// https://github.com/grpc/grpc-go/issues/7339
if GetConnectedAddress, ok := grpcinternal.GetConnectedAddress.(func(state balancer.SubConnState) (resolver.Address, bool)); ok {
if addr, ok := GetConnectedAddress(state); ok {
// TODO: Why is lID empty when running the test? The locality info is being lost somehow.
if gca, ok := internal.GetConnectedAddress.(func(balancer.SubConnState) (resolver.Address, bool)); ok {
if addr, ok := gca(state); ok {
lID := xdsinternal.GetLocalityID(addr)
if !lID.Equal(xdsinternal.LocalityID{}) {
ret.updateLocalityID(lID)
if !lID.Empty() {
scw.updateLocalityID(lID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go style: let's try to avoid all this nesting:

StateListener = func... {
	b.updateSubConnState(...)
	if state != Ready {
		return
	}
	locality := xdsinternal.GetLocalityID(getConnectedAddress(state))
	if locality.Empty() {
		if logger.V(2) { log }
		return
	}
	scw.updateLocalityID(locality)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

} else if b.logger.V(2) {
b.logger.Infof("Locality ID for %v unexpectedly empty", addr)

Check warning on line 383 in xds/internal/balancer/clusterimpl/clusterimpl.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterimpl/clusterimpl.go#L383

Added line #L383 was not covered by tests
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand All @@ -387,9 +389,9 @@
if err != nil {
return nil, err
}
ret.SubConn = sc
ret.updateLocalityID(lID)
return ret, nil
scw.SubConn = sc
scw.updateLocalityID(lID)
return scw, nil
}

func (b *clusterImplBalancer) RemoveSubConn(sc balancer.SubConn) {
Expand Down
5 changes: 5 additions & 0 deletions xds/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func (l LocalityID) Equal(o any) bool {
return l.Region == ol.Region && l.Zone == ol.Zone && l.SubZone == ol.SubZone
}

// Empty returns whether or not the locality ID is empty.
func (l LocalityID) Empty() bool {
return len(l.Region) == 0 && len(l.Zone) == 0 && len(l.SubZone) == 0
easwars marked this conversation as resolved.
Show resolved Hide resolved
}

// LocalityIDFromString converts a json representation of locality, into a
// LocalityID struct.
func LocalityIDFromString(s string) (ret LocalityID, _ error) {
Expand Down
Loading