Skip to content

Commit

Permalink
xds/client: move unmarshal functions and types to a separate package (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Nov 8, 2021
1 parent 3fa1988 commit 79e9c95
Show file tree
Hide file tree
Showing 68 changed files with 3,218 additions and 2,913 deletions.
13 changes: 7 additions & 6 deletions xds/csds/csds.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/protobuf/types/known/timestamppb"

_ "google.golang.org/grpc/xds/internal/xdsclient/v2" // Register v2 xds_client.
Expand Down Expand Up @@ -197,17 +198,17 @@ func dumpToGenericXdsConfig(typeURL string, dumpF func() (string, map[string]xds
return ret
}

func serviceStatusToProto(serviceStatus xdsclient.ServiceStatus) v3adminpb.ClientResourceStatus {
func serviceStatusToProto(serviceStatus xdsresource.ServiceStatus) v3adminpb.ClientResourceStatus {
switch serviceStatus {
case xdsclient.ServiceStatusUnknown:
case xdsresource.ServiceStatusUnknown:
return v3adminpb.ClientResourceStatus_UNKNOWN
case xdsclient.ServiceStatusRequested:
case xdsresource.ServiceStatusRequested:
return v3adminpb.ClientResourceStatus_REQUESTED
case xdsclient.ServiceStatusNotExist:
case xdsresource.ServiceStatusNotExist:
return v3adminpb.ClientResourceStatus_DOES_NOT_EXIST
case xdsclient.ServiceStatusACKed:
case xdsresource.ServiceStatusACKed:
return v3adminpb.ClientResourceStatus_ACKED
case xdsclient.ServiceStatusNACKed:
case xdsresource.ServiceStatusNACKed:
return v3adminpb.ClientResourceStatus_NACKED
default:
return v3adminpb.ClientResourceStatus_UNKNOWN
Expand Down
9 changes: 5 additions & 4 deletions xds/csds/csds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
_ "google.golang.org/grpc/xds/internal/httpfilter/router"
"google.golang.org/grpc/xds/internal/testutils/e2e"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/anypb"

Expand Down Expand Up @@ -140,16 +141,16 @@ func TestCSDS(t *testing.T) {
defer cleanup()

for _, target := range ldsTargets {
xdsC.WatchListener(target, func(xdsclient.ListenerUpdate, error) {})
xdsC.WatchListener(target, func(xdsresource.ListenerUpdate, error) {})
}
for _, target := range rdsTargets {
xdsC.WatchRouteConfig(target, func(xdsclient.RouteConfigUpdate, error) {})
xdsC.WatchRouteConfig(target, func(xdsresource.RouteConfigUpdate, error) {})
}
for _, target := range cdsTargets {
xdsC.WatchCluster(target, func(xdsclient.ClusterUpdate, error) {})
xdsC.WatchCluster(target, func(xdsresource.ClusterUpdate, error) {})
}
for _, target := range edsTargets {
xdsC.WatchEndpoints(target, func(xdsclient.EndpointsUpdate, error) {})
xdsC.WatchEndpoints(target, func(xdsresource.EndpointsUpdate, error) {})
}

for i := 0; i < retryCount; i++ {
Expand Down
7 changes: 4 additions & 3 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

const (
Expand Down Expand Up @@ -185,7 +186,7 @@ func (b *cdsBalancer) handleClientConnUpdate(update *ccUpdate) {
// management server, creates appropriate certificate provider plugins, and
// updates the HandhakeInfo which is added as an address attribute in
// NewSubConn() calls.
func (b *cdsBalancer) handleSecurityConfig(config *xdsclient.SecurityConfig) error {
func (b *cdsBalancer) handleSecurityConfig(config *xdsresource.SecurityConfig) error {
// If xdsCredentials are not in use, i.e, the user did not want to get
// security configuration from an xDS server, we should not be acting on the
// received security config here. Doing so poses a security threat.
Expand Down Expand Up @@ -310,7 +311,7 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
dms := make([]clusterresolver.DiscoveryMechanism, len(update.updates))
for i, cu := range update.updates {
switch cu.ClusterType {
case xdsclient.ClusterTypeEDS:
case xdsresource.ClusterTypeEDS:
dms[i] = clusterresolver.DiscoveryMechanism{
Type: clusterresolver.DiscoveryMechanismTypeEDS,
Cluster: cu.ClusterName,
Expand All @@ -324,7 +325,7 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
dms[i].LoadReportingServerName = new(string)

}
case xdsclient.ClusterTypeLogicalDNS:
case xdsresource.ClusterTypeLogicalDNS:
dms[i] = clusterresolver.DiscoveryMechanism{
Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS,
DNSHostname: cu.DNSHostName,
Expand Down
24 changes: 12 additions & 12 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (
"google.golang.org/grpc/resolver"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

const (
Expand All @@ -57,17 +57,17 @@ var (
}
fpb1, fpb2 *fakeProviderBuilder
bootstrapConfig *bootstrap.Config
cdsUpdateWithGoodSecurityCfg = xdsclient.ClusterUpdate{
cdsUpdateWithGoodSecurityCfg = xdsresource.ClusterUpdate{
ClusterName: serviceName,
SecurityCfg: &xdsclient.SecurityConfig{
SecurityCfg: &xdsresource.SecurityConfig{
RootInstanceName: "default1",
IdentityInstanceName: "default2",
SubjectAltNameMatchers: testSANMatchers,
},
}
cdsUpdateWithMissingSecurityCfg = xdsclient.ClusterUpdate{
cdsUpdateWithMissingSecurityCfg = xdsresource.ClusterUpdate{
ClusterName: serviceName,
SecurityCfg: &xdsclient.SecurityConfig{
SecurityCfg: &xdsresource.SecurityConfig{
RootInstanceName: "not-default",
},
}
Expand Down Expand Up @@ -250,7 +250,7 @@ func (s) TestSecurityConfigWithoutXDSCreds(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
Expand Down Expand Up @@ -306,7 +306,7 @@ func (s) TestNoSecurityConfigWithXDSCreds(t *testing.T) {
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup. No security config is
// passed to the CDS balancer as part of this update.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
Expand Down Expand Up @@ -566,7 +566,7 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) {
// an update which contains bad security config. So, we expect the CDS
// balancer to forward this error to the EDS balancer and eventually the
// channel needs to be put in a bad state.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -671,9 +671,9 @@ func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{
cdsUpdate := xdsresource.ClusterUpdate{
ClusterName: serviceName,
SecurityCfg: &xdsclient.SecurityConfig{
SecurityCfg: &xdsresource.SecurityConfig{
RootInstanceName: "default1",
SubjectAltNameMatchers: testSANMatchers,
},
Expand All @@ -696,9 +696,9 @@ func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) {
}

// Push another update with a new security configuration.
cdsUpdate = xdsclient.ClusterUpdate{
cdsUpdate = xdsresource.ClusterUpdate{
ClusterName: serviceName,
SecurityCfg: &xdsclient.SecurityConfig{
SecurityCfg: &xdsresource.SecurityConfig{
RootInstanceName: "default2",
SubjectAltNameMatchers: testSANMatchers,
},
Expand Down
31 changes: 16 additions & 15 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

const (
Expand All @@ -58,7 +59,7 @@ func Test(t *testing.T) {

// cdsWatchInfo wraps the update and the error sent in a CDS watch callback.
type cdsWatchInfo struct {
update xdsclient.ClusterUpdate
update xdsresource.ClusterUpdate
err error
}

Expand Down Expand Up @@ -361,25 +362,25 @@ func (s) TestHandleClusterUpdate(t *testing.T) {

tests := []struct {
name string
cdsUpdate xdsclient.ClusterUpdate
cdsUpdate xdsresource.ClusterUpdate
updateErr error
wantCCS balancer.ClientConnState
}{
{
name: "happy-case-with-lrs",
cdsUpdate: xdsclient.ClusterUpdate{ClusterName: serviceName, EnableLRS: true},
cdsUpdate: xdsresource.ClusterUpdate{ClusterName: serviceName, EnableLRS: true},
wantCCS: edsCCS(serviceName, nil, true, nil),
},
{
name: "happy-case-without-lrs",
cdsUpdate: xdsclient.ClusterUpdate{ClusterName: serviceName},
cdsUpdate: xdsresource.ClusterUpdate{ClusterName: serviceName},
wantCCS: edsCCS(serviceName, nil, false, nil),
},
{
name: "happy-case-with-ring-hash-lb-policy",
cdsUpdate: xdsclient.ClusterUpdate{
cdsUpdate: xdsresource.ClusterUpdate{
ClusterName: serviceName,
LBPolicy: &xdsclient.ClusterLBPolicyRingHash{MinimumRingSize: 10, MaximumRingSize: 100},
LBPolicy: &xdsresource.ClusterLBPolicyRingHash{MinimumRingSize: 10, MaximumRingSize: 100},
},
wantCCS: edsCCS(serviceName, nil, false, &internalserviceconfig.BalancerConfig{
Name: ringhash.Name,
Expand Down Expand Up @@ -417,7 +418,7 @@ func (s) TestHandleClusterUpdateError(t *testing.T) {
// resolver error at this point should result in the CDS balancer returning
// an error picker.
watcherErr := errors.New("cdsBalancer watcher error")
xdsC.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{}, watcherErr)
xdsC.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{}, watcherErr)

// Since the error being pushed here is not a resource-not-found-error, the
// registered watch should not be cancelled.
Expand Down Expand Up @@ -451,14 +452,14 @@ func (s) TestHandleClusterUpdateError(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil)
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
}

// Again push a non-resource-not-found-error through the watcher callback.
xdsC.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{}, watcherErr)
xdsC.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{}, watcherErr)
// Make sure the registered watch is not cancelled.
sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
Expand All @@ -472,7 +473,7 @@ func (s) TestHandleClusterUpdateError(t *testing.T) {

// Push a resource-not-found-error this time around.
resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "cdsBalancer resource not found error")
xdsC.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{}, resourceErr)
xdsC.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{}, resourceErr)
// Make sure that the watch is not cancelled. This error indicates that the
// request cluster resource is not found. We should continue to watch it.
sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
Expand Down Expand Up @@ -536,7 +537,7 @@ func (s) TestResolverError(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil)
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -585,7 +586,7 @@ func (s) TestUpdateSubConnState(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
Expand Down Expand Up @@ -620,7 +621,7 @@ func (s) TestCircuitBreaking(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will update
// the service's counter with the new max requests.
var maxRequests uint32 = 1
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: clusterName, MaxRequests: &maxRequests}
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: clusterName, MaxRequests: &maxRequests}
wantCCS := edsCCS(clusterName, &maxRequests, false, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
Expand Down Expand Up @@ -653,7 +654,7 @@ func (s) TestClose(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
Expand Down Expand Up @@ -724,7 +725,7 @@ func (s) TestExitIdle(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
Expand Down
21 changes: 11 additions & 10 deletions xds/internal/balancer/cdsbalancer/cluster_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"

"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

var errNotReceivedUpdate = errors.New("tried to construct a cluster update on a cluster that has not received an update")
Expand All @@ -31,17 +32,17 @@ var errNotReceivedUpdate = errors.New("tried to construct a cluster update on a
// (if one doesn't already exist) and pushing the update to it.
type clusterHandlerUpdate struct {
// securityCfg is the Security Config from the top (root) cluster.
securityCfg *xdsclient.SecurityConfig
securityCfg *xdsresource.SecurityConfig
// lbPolicy is the lb policy from the top (root) cluster.
//
// Currently, we only support roundrobin or ringhash, and since roundrobin
// does need configs, this is only set to the ringhash config, if the policy
// is ringhash. In the future, if we support more policies, we can make this
// an interface, and set it to config of the other policies.
lbPolicy *xdsclient.ClusterLBPolicyRingHash
lbPolicy *xdsresource.ClusterLBPolicyRingHash

// updates is a list of ClusterUpdates from all the leaf clusters.
updates []xdsclient.ClusterUpdate
updates []xdsresource.ClusterUpdate
err error
}

Expand Down Expand Up @@ -139,7 +140,7 @@ type clusterNode struct {

// A ClusterUpdate in order to build a list of cluster updates for CDS to
// send down to child XdsClusterResolverLoadBalancingPolicy.
clusterUpdate xdsclient.ClusterUpdate
clusterUpdate xdsresource.ClusterUpdate

// This boolean determines whether this Node has received an update or not.
// This isn't the best practice, but this will protect a list of Cluster
Expand Down Expand Up @@ -176,7 +177,7 @@ func (c *clusterNode) delete() {
}

// Construct cluster update (potentially a list of ClusterUpdates) for a node.
func (c *clusterNode) constructClusterUpdate() ([]xdsclient.ClusterUpdate, error) {
func (c *clusterNode) constructClusterUpdate() ([]xdsresource.ClusterUpdate, error) {
// If the cluster has not yet received an update, the cluster update is not
// yet ready.
if !c.receivedUpdate {
Expand All @@ -185,13 +186,13 @@ func (c *clusterNode) constructClusterUpdate() ([]xdsclient.ClusterUpdate, error

// Base case - LogicalDNS or EDS. Both of these cluster types will be tied
// to a single ClusterUpdate.
if c.clusterUpdate.ClusterType != xdsclient.ClusterTypeAggregate {
return []xdsclient.ClusterUpdate{c.clusterUpdate}, nil
if c.clusterUpdate.ClusterType != xdsresource.ClusterTypeAggregate {
return []xdsresource.ClusterUpdate{c.clusterUpdate}, nil
}

// If an aggregate construct a list by recursively calling down to all of
// it's children.
var childrenUpdates []xdsclient.ClusterUpdate
var childrenUpdates []xdsresource.ClusterUpdate
for _, child := range c.children {
childUpdateList, err := child.constructClusterUpdate()
if err != nil {
Expand All @@ -206,7 +207,7 @@ func (c *clusterNode) constructClusterUpdate() ([]xdsclient.ClusterUpdate, error
// also handles any logic with regards to any child state that may have changed.
// At the end of the handleResp(), the clusterUpdate will be pinged in certain
// situations to try and construct an update to send back to CDS.
func (c *clusterNode) handleResp(clusterUpdate xdsclient.ClusterUpdate, err error) {
func (c *clusterNode) handleResp(clusterUpdate xdsresource.ClusterUpdate, err error) {
c.clusterHandler.clusterMutex.Lock()
defer c.clusterHandler.clusterMutex.Unlock()
if err != nil { // Write this error for run() to pick up in CDS LB policy.
Expand All @@ -230,7 +231,7 @@ func (c *clusterNode) handleResp(clusterUpdate xdsclient.ClusterUpdate, err erro
// handler to return. Also, if there was any children from previously,
// delete the children, as the cluster type is no longer an aggregate
// cluster.
if clusterUpdate.ClusterType != xdsclient.ClusterTypeAggregate {
if clusterUpdate.ClusterType != xdsresource.ClusterTypeAggregate {
for _, child := range c.children {
child.delete()
}
Expand Down
Loading

0 comments on commit 79e9c95

Please sign in to comment.