Skip to content

Commit

Permalink
grpclb, dns: pass balancer addresses via resolver.State (#3614)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed May 26, 2020
1 parent d071d56 commit eb827fb
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 45 deletions.
22 changes: 15 additions & 7 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,19 @@ import (
"sync"
"time"

durationpb "github.com/golang/protobuf/ptypes/duration"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/resolver/dns"
"google.golang.org/grpc/resolver"

durationpb "github.com/golang/protobuf/ptypes/duration"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
)

const (
Expand Down Expand Up @@ -410,11 +412,6 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
lb.handleServiceConfig(gc)

addrs := ccs.ResolverState.Addresses
if len(addrs) == 0 {
// There should be at least one address, either grpclb server or
// fallback. Empty address is not valid.
return balancer.ErrBadResolverState
}

var remoteBalancerAddrs, backendAddrs []resolver.Address
for _, a := range addrs {
Expand All @@ -425,6 +422,17 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
backendAddrs = append(backendAddrs, a)
}
}
if sd := grpclbstate.Get(ccs.ResolverState); sd != nil {
// Override any balancer addresses provided via
// ccs.ResolverState.Addresses.
remoteBalancerAddrs = sd.BalancerAddresses
}

if len(backendAddrs)+len(remoteBalancerAddrs) == 0 {
// There should be at least one address, either grpclb server or
// fallback. Empty address is not valid.
return balancer.ErrBadResolverState
}

if len(remoteBalancerAddrs) == 0 {
if lb.ccRemoteLB != nil {
Expand Down
26 changes: 17 additions & 9 deletions balancer/grpclb/grpclb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@ import (
"testing"
"time"

durationpb "github.com/golang/protobuf/ptypes/duration"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
lbgrpc "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/grpctest"
Expand All @@ -44,6 +42,10 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/status"

durationpb "github.com/golang/protobuf/ptypes/duration"
lbgrpc "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
testpb "google.golang.org/grpc/test/grpc_testing"
)

Expand Down Expand Up @@ -390,6 +392,8 @@ func newLoadBalancer(numberOfBackends int, statsChan chan *lbpb.ClientStats) (ts
return
}

var grpclbConfig = `{"loadBalancingConfig": [{"grpclb": {}}]}`

func (s) TestGRPCLB(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
Expand Down Expand Up @@ -422,13 +426,17 @@ func (s) TestGRPCLB(t *testing.T) {
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)

r.UpdateState(resolver.State{Addresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}}})
rs := grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig)},
&grpclbstate.State{BalancerAddresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.Backend,
ServerName: lbServerName,
}}})
r.UpdateState(rs)

if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
}
Expand Down
51 changes: 51 additions & 0 deletions balancer/grpclb/state/state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package state declares grpclb types to be set by resolvers wishing to pass
// information to grpclb via resolver.State Attributes.
package state

import (
"google.golang.org/grpc/resolver"
)

// keyType is the key to use for storing State in Attributes.
type keyType string

const key = keyType("grpc.grpclb.state")

// State contains gRPCLB-relevant data passed from the name resolver.
type State struct {
// BalancerAddresses contains the remote load balancer address(es). If
// set, overrides any resolver-provided addresses with Type of GRPCLB.
BalancerAddresses []resolver.Address
}

// Set returns a copy of the provided state with attributes containing s. s's
// data should not be mutated after calling Set.
func Set(state resolver.State, s *State) resolver.State {
state.Attributes = state.Attributes.WithValues(key, s)
return state
}

// Get returns the grpclb State in the resolver.State, or nil if not present.
// The returned data should not be mutated.
func Get(state resolver.State) *State {
s, _ := state.Attributes.Value(key).(*State)
return s
}
11 changes: 7 additions & 4 deletions internal/resolver/dns/dns_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sync"
"time"

grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcrand"
Expand Down Expand Up @@ -251,7 +252,7 @@ func (d *dnsResolver) lookupSRV() ([]resolver.Address, error) {
return nil, fmt.Errorf("dns: error parsing A record IP address %v", a)
}
addr := ip + ":" + strconv.Itoa(int(s.Port))
newAddrs = append(newAddrs, resolver.Address{Addr: addr, Type: resolver.GRPCLB, ServerName: s.Target})
newAddrs = append(newAddrs, resolver.Address{Addr: addr, ServerName: s.Target})
}
}
return newAddrs, nil
Expand Down Expand Up @@ -326,13 +327,15 @@ func (d *dnsResolver) lookup() (*resolver.State, error) {
if hostErr != nil && (srvErr != nil || len(srv) == 0) {
return nil, hostErr
}
state := &resolver.State{
Addresses: append(addrs, srv...),

state := resolver.State{Addresses: addrs}
if len(srv) > 0 {
state = grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: srv})
}
if !d.disableServiceConfig {
state.ServiceConfig = d.lookupTXT()
}
return state, nil
return &state, nil
}

// formatIP returns ok = false if addr is not a valid textual representation of an IP address.
Expand Down
61 changes: 37 additions & 24 deletions internal/resolver/dns/dns_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"testing"
"time"

grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -725,11 +726,11 @@ func testDNSResolver(t *testing.T) {
t.Fatalf("UpdateState not called after 2s; aborting")
}
if !reflect.DeepEqual(a.addrWant, state.Addresses) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, state.Addresses, a.addrWant)
t.Errorf("Resolved addresses of target: %q = %+v, want %+v", a.target, state.Addresses, a.addrWant)
}
sc := scFromState(state)
if a.scWant != sc {
t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant)
t.Errorf("Resolved service config of target: %q = %+v, want %+v", a.target, sc, a.scWant)
}
r.Close()
}
Expand All @@ -742,45 +743,52 @@ func testDNSResolverWithSRV(t *testing.T) {
}()
defer leakcheck.Check(t)
tests := []struct {
target string
addrWant []resolver.Address
scWant string
target string
addrWant []resolver.Address
grpclbAddrs []resolver.Address
scWant string
}{
{
"foo.bar.com",
[]resolver.Address{{Addr: "1.2.3.4" + colonDefaultPort}, {Addr: "5.6.7.8" + colonDefaultPort}},
nil,
generateSC("foo.bar.com"),
},
{
"foo.bar.com:1234",
[]resolver.Address{{Addr: "1.2.3.4:1234"}, {Addr: "5.6.7.8:1234"}},
nil,
generateSC("foo.bar.com"),
},
{
"srv.ipv4.single.fake",
[]resolver.Address{{Addr: "2.4.6.8" + colonDefaultPort}, {Addr: "1.2.3.4:1234", Type: resolver.GRPCLB, ServerName: "ipv4.single.fake"}},
[]resolver.Address{{Addr: "2.4.6.8" + colonDefaultPort}},
[]resolver.Address{{Addr: "1.2.3.4:1234", ServerName: "ipv4.single.fake"}},
generateSC("srv.ipv4.single.fake"),
},
{
"srv.ipv4.multi.fake",
nil,
[]resolver.Address{
{Addr: "1.2.3.4:1234", Type: resolver.GRPCLB, ServerName: "ipv4.multi.fake"},
{Addr: "5.6.7.8:1234", Type: resolver.GRPCLB, ServerName: "ipv4.multi.fake"},
{Addr: "9.10.11.12:1234", Type: resolver.GRPCLB, ServerName: "ipv4.multi.fake"},
{Addr: "1.2.3.4:1234", ServerName: "ipv4.multi.fake"},
{Addr: "5.6.7.8:1234", ServerName: "ipv4.multi.fake"},
{Addr: "9.10.11.12:1234", ServerName: "ipv4.multi.fake"},
},
generateSC("srv.ipv4.multi.fake"),
},
{
"srv.ipv6.single.fake",
[]resolver.Address{{Addr: "[2607:f8b0:400a:801::1001]:1234", Type: resolver.GRPCLB, ServerName: "ipv6.single.fake"}},
nil,
[]resolver.Address{{Addr: "[2607:f8b0:400a:801::1001]:1234", ServerName: "ipv6.single.fake"}},
generateSC("srv.ipv6.single.fake"),
},
{
"srv.ipv6.multi.fake",
nil,
[]resolver.Address{
{Addr: "[2607:f8b0:400a:801::1001]:1234", Type: resolver.GRPCLB, ServerName: "ipv6.multi.fake"},
{Addr: "[2607:f8b0:400a:801::1002]:1234", Type: resolver.GRPCLB, ServerName: "ipv6.multi.fake"},
{Addr: "[2607:f8b0:400a:801::1003]:1234", Type: resolver.GRPCLB, ServerName: "ipv6.multi.fake"},
{Addr: "[2607:f8b0:400a:801::1001]:1234", ServerName: "ipv6.multi.fake"},
{Addr: "[2607:f8b0:400a:801::1002]:1234", ServerName: "ipv6.multi.fake"},
{Addr: "[2607:f8b0:400a:801::1003]:1234", ServerName: "ipv6.multi.fake"},
},
generateSC("srv.ipv6.multi.fake"),
},
Expand All @@ -807,11 +815,16 @@ func testDNSResolverWithSRV(t *testing.T) {
t.Fatalf("UpdateState not called after 2s; aborting")
}
if !reflect.DeepEqual(a.addrWant, state.Addresses) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, state.Addresses, a.addrWant)
t.Errorf("Resolved addresses of target: %q = %+v, want %+v", a.target, state.Addresses, a.addrWant)
}
gs := grpclbstate.Get(state)
if (gs == nil && len(a.grpclbAddrs) > 0) ||
(gs != nil && !reflect.DeepEqual(a.grpclbAddrs, gs.BalancerAddresses)) {
t.Errorf("Resolved state of target: %q = %+v (State=%+v), want state.Attributes.State=%+v", a.target, state, gs, a.grpclbAddrs)
}
sc := scFromState(state)
if a.scWant != sc {
t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant)
t.Errorf("Resolved service config of target: %q = %+v, want %+v", a.target, sc, a.scWant)
}
}
}
Expand Down Expand Up @@ -879,11 +892,11 @@ func testDNSResolveNow(t *testing.T) {
t.Fatalf("UpdateState not called after 2s; aborting. state=%v", state)
}
if !reflect.DeepEqual(a.addrWant, state.Addresses) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, state.Addresses, a.addrWant)
t.Errorf("Resolved addresses of target: %q = %+v, want %+v", a.target, state.Addresses, a.addrWant)
}
sc := scFromState(state)
if a.scWant != sc {
t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant)
t.Errorf("Resolved service config of target: %q = %+v, want %+v", a.target, sc, a.scWant)
}

revertTbl := mutateTbl(a.target)
Expand All @@ -900,10 +913,10 @@ func testDNSResolveNow(t *testing.T) {
}
sc = scFromState(state)
if !reflect.DeepEqual(a.addrNext, state.Addresses) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, state.Addresses, a.addrNext)
t.Errorf("Resolved addresses of target: %q = %+v, want %+v", a.target, state.Addresses, a.addrNext)
}
if a.scNext != sc {
t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scNext)
t.Errorf("Resolved service config of target: %q = %+v, want %+v", a.target, sc, a.scNext)
}
revertTbl()
}
Expand Down Expand Up @@ -946,7 +959,7 @@ func testIPResolver(t *testing.T) {
time.Sleep(time.Millisecond)
}
if !reflect.DeepEqual(v.want, state.Addresses) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", v.target, state.Addresses, v.want)
t.Errorf("Resolved addresses of target: %q = %+v, want %+v", v.target, state.Addresses, v.want)
}
r.ResolveNow(resolver.ResolveNowOptions{})
for i := 0; i < 50; i++ {
Expand Down Expand Up @@ -1039,7 +1052,7 @@ func TestDisableServiceConfig(t *testing.T) {
}
sc := scFromState(state)
if a.scWant != sc {
t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant)
t.Errorf("Resolved service config of target: %q = %+v, want %+v", a.target, sc, a.scWant)
}
}
}
Expand Down Expand Up @@ -1098,7 +1111,7 @@ func TestDNSResolverRetry(t *testing.T) {
}
want := []resolver.Address{{Addr: "1.2.3.4" + colonDefaultPort}}
if !reflect.DeepEqual(want, state.Addresses) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, state.Addresses, want)
t.Errorf("Resolved addresses of target: %q = %+v, want %+v", target, state.Addresses, want)
}
// mutate the host lookup table so the target has 0 address returned.
revertTbl := mutateTbl(target)
Expand All @@ -1125,7 +1138,7 @@ func TestDNSResolverRetry(t *testing.T) {
time.Sleep(time.Millisecond)
}
if !reflect.DeepEqual(want, state.Addresses) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, state.Addresses, want)
t.Errorf("Resolved addresses of target: %q = %+v, want %+v", target, state.Addresses, want)
}
}

Expand Down Expand Up @@ -1330,7 +1343,7 @@ func TestRateLimitedResolve(t *testing.T) {
time.Sleep(time.Millisecond)
}
if !reflect.DeepEqual(state.Addresses, wantAddrs) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, state.Addresses, wantAddrs)
t.Errorf("Resolved addresses of target: %q = %+v, want %+v", target, state.Addresses, wantAddrs)
}
}

Expand Down
5 changes: 4 additions & 1 deletion resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ const (
Backend AddressType = iota
// GRPCLB indicates the address is for a grpclb load balancer.
//
// Deprecated: use Attributes in Address instead.
// Deprecated: to select the GRPCLB load balancing policy, use a service
// config with a corresponding loadBalancingConfig. To supply balancer
// addresses to the GRPCLB load balancing policy, set State.Attributes
// using balancer/grpclb/state.Set.
GRPCLB
)

Expand Down

0 comments on commit eb827fb

Please sign in to comment.