Skip to content

Commit

Permalink
rls: Update rls protos and code/test to reflect those changes. (#3832)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Aug 21, 2020
1 parent d16bb4c commit 6c0171f
Show file tree
Hide file tree
Showing 6 changed files with 430 additions and 549 deletions.
22 changes: 1 addition & 21 deletions balancer/rls/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ type lbConfig struct {
maxAge time.Duration
staleAge time.Duration
cacheSizeBytes int64
rpStrategy rlspb.RouteLookupConfig_RequestProcessingStrategy
defaultTarget string
cpName string
cpTargetField string
Expand All @@ -75,7 +74,6 @@ func (lbCfg *lbConfig) Equal(other *lbConfig) bool {
lbCfg.maxAge == other.maxAge &&
lbCfg.staleAge == other.staleAge &&
lbCfg.cacheSizeBytes == other.cacheSizeBytes &&
lbCfg.rpStrategy == other.rpStrategy &&
lbCfg.defaultTarget == other.defaultTarget &&
lbCfg.cpName == other.cpName &&
lbCfg.cpTargetField == other.cpTargetField &&
Expand Down Expand Up @@ -167,11 +165,6 @@ func (l *loadBalancingConfig) UnmarshalJSON(data []byte) error {
// - must be greater than zero
// - TODO(easwars): Define a minimum value for this field, to be used when
// left unspecified
// ** request_processing_strategy field:
// - must have a value other than STRATEGY_UNSPECIFIED
// - if set to SYNC_LOOKUP_DEFAULT_TARGET_ON_ERROR or
// ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS, the default_target field must be
// set to a non-empty value
// * childPolicy field:
// - must find a valid child policy with a valid config (the child policy must
// be able to parse the provided config successfully when we pass it a dummy
Expand Down Expand Up @@ -242,22 +235,10 @@ func (*rlsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig,
logger.Infof("rls: max_age in service config is %v, using %v", maxAge, maxMaxAge)
maxAge = maxMaxAge
}

cacheSizeBytes := rlsProto.GetCacheSizeBytes()
if cacheSizeBytes <= 0 {
return nil, fmt.Errorf("rls: cache_size_bytes must be greater than 0 in service config {%+v}", string(c))
}

rpStrategy := rlsProto.GetRequestProcessingStrategy()
if rpStrategy == rlspb.RouteLookupConfig_STRATEGY_UNSPECIFIED {
return nil, fmt.Errorf("rls: request_processing_strategy cannot be left unspecified in service config {%+v}", string(c))
}
defaultTarget := rlsProto.GetDefaultTarget()
if (rpStrategy == rlspb.RouteLookupConfig_SYNC_LOOKUP_DEFAULT_TARGET_ON_ERROR ||
rpStrategy == rlspb.RouteLookupConfig_ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS) && defaultTarget == "" {
return nil, fmt.Errorf("rls: request_processing_strategy is %s, but default_target is not set", rpStrategy.String())
}

if childPolicy == nil {
return nil, fmt.Errorf("rls: childPolicy is invalid in service config {%+v}", string(c))
}
Expand All @@ -283,8 +264,7 @@ func (*rlsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig,
maxAge: maxAge,
staleAge: staleAge,
cacheSizeBytes: cacheSizeBytes,
rpStrategy: rpStrategy,
defaultTarget: defaultTarget,
defaultTarget: rlsProto.GetDefaultTarget(),
// TODO(easwars): Once we refactor validateChildPolicyConfig and make
// it a method on the lbConfig object, we could directly store the
// balancer.Builder and/or balancer.ConfigParser here instead of the
Expand Down
47 changes: 1 addition & 46 deletions balancer/rls/internal/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ import (
"github.com/google/go-cmp/cmp"

"google.golang.org/grpc/balancer"
_ "google.golang.org/grpc/balancer/grpclb" // grpclb for config parsing.
rlspb "google.golang.org/grpc/balancer/rls/internal/proto/grpc_lookup_v1"
_ "google.golang.org/grpc/balancer/grpclb" // grpclb for config parsing.
_ "google.golang.org/grpc/internal/resolver/passthrough" // passthrough resolver.
)

Expand Down Expand Up @@ -58,7 +57,6 @@ func testEqual(a, b *lbConfig) bool {
a.maxAge == b.maxAge &&
a.staleAge == b.staleAge &&
a.cacheSizeBytes == b.cacheSizeBytes &&
a.rpStrategy == b.rpStrategy &&
a.defaultTarget == b.defaultTarget &&
a.cpName == b.cpName &&
a.cpTargetField == b.cpTargetField &&
Expand Down Expand Up @@ -91,7 +89,6 @@ func TestParseConfig(t *testing.T) {
"maxAge" : "500s",
"staleAge": "600s",
"cacheSizeBytes": 1000,
"request_processing_strategy": "ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS",
"defaultTarget": "passthrough:https:///default"
},
"childPolicy": [
Expand All @@ -107,7 +104,6 @@ func TestParseConfig(t *testing.T) {
maxAge: 5 * time.Minute, // This is max maxAge.
staleAge: time.Duration(0), // StaleAge is ignore because it was higher than maxAge.
cacheSizeBytes: 1000,
rpStrategy: rlspb.RouteLookupConfig_ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS,
defaultTarget: "passthrough:https:///default",
cpName: "grpclb",
cpTargetField: "service_name",
Expand All @@ -127,7 +123,6 @@ func TestParseConfig(t *testing.T) {
"maxAge": "60s",
"staleAge" : "50s",
"cacheSizeBytes": 1000,
"request_processing_strategy": "ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS",
"defaultTarget": "passthrough:https:///default"
},
"childPolicy": [{"grpclb": {"childPolicy": [{"pickfirst": {}}]}}],
Expand All @@ -139,7 +134,6 @@ func TestParseConfig(t *testing.T) {
maxAge: 60 * time.Second,
staleAge: 50 * time.Second,
cacheSizeBytes: 1000,
rpStrategy: rlspb.RouteLookupConfig_ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS,
defaultTarget: "passthrough:https:///default",
cpName: "grpclb",
cpTargetField: "service_name",
Expand Down Expand Up @@ -288,41 +282,6 @@ func TestParseConfigErrors(t *testing.T) {
}`),
wantErr: "rls: cache_size_bytes must be greater than 0 in service config",
},
{
desc: "invalid request processing strategy",
input: []byte(`{
"routeLookupConfig": {
"grpcKeybuilders": [{
"names": [{"service": "service", "method": "method"}],
"headers": [{"key": "k1", "names": ["v1"]}]
}],
"lookupService": "passthrough:https:///target",
"lookupServiceTimeout" : "10s",
"maxAge": "30s",
"staleAge" : "25s",
"cacheSizeBytes": 1000
}
}`),
wantErr: "rls: request_processing_strategy cannot be left unspecified in service config",
},
{
desc: "request processing strategy without default target",
input: []byte(`{
"routeLookupConfig": {
"grpcKeybuilders": [{
"names": [{"service": "service", "method": "method"}],
"headers": [{"key": "k1", "names": ["v1"]}]
}],
"lookupService": "passthrough:https:///target",
"lookupServiceTimeout" : "10s",
"maxAge": "30s",
"staleAge" : "25s",
"cacheSizeBytes": 1000,
"request_processing_strategy": "ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS"
}
}`),
wantErr: "default_target is not set",
},
{
desc: "no child policy",
input: []byte(`{
Expand All @@ -336,7 +295,6 @@ func TestParseConfigErrors(t *testing.T) {
"maxAge": "30s",
"staleAge" : "25s",
"cacheSizeBytes": 1000,
"request_processing_strategy": "ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS",
"defaultTarget": "passthrough:https:///default"
}
}`),
Expand All @@ -355,7 +313,6 @@ func TestParseConfigErrors(t *testing.T) {
"maxAge": "30s",
"staleAge" : "25s",
"cacheSizeBytes": 1000,
"request_processing_strategy": "ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS",
"defaultTarget": "passthrough:https:///default"
},
"childPolicy": [
Expand All @@ -378,7 +335,6 @@ func TestParseConfigErrors(t *testing.T) {
"maxAge": "30s",
"staleAge" : "25s",
"cacheSizeBytes": 1000,
"request_processing_strategy": "ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS",
"defaultTarget": "passthrough:https:///default"
},
"childPolicy": [
Expand All @@ -402,7 +358,6 @@ func TestParseConfigErrors(t *testing.T) {
"maxAge": "30s",
"staleAge" : "25s",
"cacheSizeBytes": 1000,
"request_processing_strategy": "ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS",
"defaultTarget": "passthrough:https:///default"
},
"childPolicy": [
Expand Down
43 changes: 12 additions & 31 deletions balancer/rls/internal/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/rls/internal/cache"
"google.golang.org/grpc/balancer/rls/internal/keys"
rlspb "google.golang.org/grpc/balancer/rls/internal/proto/grpc_lookup_v1"
"google.golang.org/grpc/metadata"
)

Expand All @@ -42,10 +41,6 @@ type rlsPicker struct {
// The keyBuilder map used to generate RLS keys for the RPC. This is built
// by the LB policy based on the received ServiceConfig.
kbm keys.BuilderMap
// This is the request processing strategy as indicated by the LB policy's
// ServiceConfig. This controls how to process a RPC when the data required
// to make the pick decision is not in the cache.
strategy rlspb.RouteLookupConfig_RequestProcessingStrategy

// The following hooks are setup by the LB policy to enable the rlsPicker to
// access state stored in the policy. This approach has the following
Expand Down Expand Up @@ -79,15 +74,6 @@ type rlsPicker struct {
defaultPick func(balancer.PickInfo) (balancer.PickResult, error)
}

// This helper function decides if the pick should delegate to the default
// rlsPicker based on the request processing strategy. This is used when the
// data cache does not have a valid entry for the current RPC and the RLS
// request is throttled, or if the current data cache entry is in backoff.
func (p *rlsPicker) shouldDelegateToDefault() bool {
return p.strategy == rlspb.RouteLookupConfig_SYNC_LOOKUP_DEFAULT_TARGET_ON_ERROR ||
p.strategy == rlspb.RouteLookupConfig_ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS
}

// Pick makes the routing decision for every outbound RPC.
func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// For every incoming request, we first build the RLS keys using the
Expand Down Expand Up @@ -123,9 +109,9 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
if p.shouldThrottle() {
// The entry doesn't exist or has expired and the new RLS request
// has been throttled. Treat it as an error and delegate to default
// pick or fail the pick, based on the request processing strategy.
// pick, if one exists, or fail the pick.
if entry == nil || entry.ExpiryTime.Before(now) {
if p.shouldDelegateToDefault() {
if p.defaultPick != nil {
return p.defaultPick(info)
}
return balancer.PickResult{}, errRLSThrottled
Expand All @@ -144,25 +130,20 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// this cache entry.
return entry.ChildPicker.Pick(info)
} else if entry.BackoffTime.After(now) {
// The entry has expired, but is in backoff. We either delegate to
// the default rlsPicker or return the error from the last failed
// RLS request for this entry.
if p.shouldDelegateToDefault() {
// The entry has expired, but is in backoff. We delegate to the
// default pick, if one exists, or return the error from the last
// failed RLS request for this entry.
if p.defaultPick != nil {
return p.defaultPick(info)
}
return balancer.PickResult{}, entry.CallStatus
}
}

// Either we didn't find an entry or found an entry which had expired and
// was not in backoff (which is also essentially equivalent to not finding
// an entry), and we started an RLS request in the background. We either
// queue the pick or delegate to the default pick. In the former case, upon
// receipt of an RLS response, the LB policy will send a new rlsPicker to
// the channel, and the pick will be retried.
if p.strategy == rlspb.RouteLookupConfig_SYNC_LOOKUP_DEFAULT_TARGET_ON_ERROR ||
p.strategy == rlspb.RouteLookupConfig_SYNC_LOOKUP_CLIENT_SEES_ERROR {
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
return p.defaultPick(info)
// We get here only in the following cases:
// * No data cache entry or expired entry, RLS request sent out
// * No valid data cache entry and Pending cache entry exists
// We need to queue to pick which will be handled once the RLS response is
// received.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
Loading

0 comments on commit 6c0171f

Please sign in to comment.