Skip to content

Commit

Permalink
xds/client: move watchers from xdsclient to a separate struct (#4963)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Nov 9, 2021
1 parent bac0a7e commit 59e024e
Show file tree
Hide file tree
Showing 36 changed files with 1,204 additions and 979 deletions.
2 changes: 1 addition & 1 deletion xds/csds/csds.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func nodeProtoToV3(n proto.Message) *v3corepb.Node {
return node
}

func dumpToGenericXdsConfig(typeURL string, dumpF func() (string, map[string]xdsclient.UpdateWithMD)) []*v3statuspb.ClientConfig_GenericXdsConfig {
func dumpToGenericXdsConfig(typeURL string, dumpF func() (string, map[string]xdsresource.UpdateWithMD)) []*v3statuspb.ClientConfig_GenericXdsConfig {
_, dump := dumpF()
ret := make([]*v3statuspb.ClientConfig_GenericXdsConfig, 0, len(dump))
for name, d := range dump {
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,11 @@ func (b *cdsBalancer) run() {
func (b *cdsBalancer) handleErrorFromUpdate(err error, fromParent bool) {
// This is not necessary today, because xds client never sends connection
// errors.
if fromParent && xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound {
if fromParent && xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
b.clusterHandler.close()
}
if b.childLB != nil {
if xdsclient.ErrType(err) != xdsclient.ErrorTypeConnection {
if xdsresource.ErrType(err) != xdsresource.ErrorTypeConnection {
// Connection errors will be sent to the child balancers directly.
// There's no need to forward them.
b.childLB.ResolverError(err)
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (s) TestHandleClusterUpdateError(t *testing.T) {
}

// Push a resource-not-found-error this time around.
resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "cdsBalancer resource not found error")
resourceErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "cdsBalancer resource not found error")
xdsC.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{}, resourceErr)
// Make sure that the watch is not cancelled. This error indicates that the
// request cluster resource is not found. We should continue to watch it.
Expand Down Expand Up @@ -557,7 +557,7 @@ func (s) TestResolverError(t *testing.T) {
}

// Push a resource-not-found-error this time around.
resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "cdsBalancer resource not found error")
resourceErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "cdsBalancer resource not found error")
cdsB.ResolverError(resourceErr)
// Make sure the registered watch is cancelled.
if _, err := xdsC.WaitForCancelClusterWatch(ctx); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion xds/internal/balancer/clusterresolver/clusterresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

// Name is the name of the cluster_resolver balancer.
Expand Down Expand Up @@ -244,7 +245,7 @@ func (b *clusterResolverBalancer) updateChildConfig() error {
// In both cases, the sub-balancers will be receive the error.
func (b *clusterResolverBalancer) handleErrorFromUpdate(err error, fromParent bool) {
b.logger.Warningf("Received error: %v", err)
if fromParent && xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound {
if fromParent && xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
// This is an error from the parent ClientConn (can be the parent CDS
// balancer), and is a resource-not-found error. This means the resource
// (can be either LDS or CDS) was removed. Stop the EDS watch.
Expand Down
8 changes: 4 additions & 4 deletions xds/internal/balancer/clusterresolver/clusterresolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
t.Fatalf("EDS impl got unexpected update: %v", err)
}

connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
connectionErr := xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "connection error")
xdsC.InvokeWatchEDSCallback("", xdsresource.EndpointsUpdate{}, connectionErr)

sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
Expand All @@ -298,7 +298,7 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
t.Fatalf("want resolver error, got %v", err)
}

resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error")
resourceErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error")
xdsC.InvokeWatchEDSCallback("", xdsresource.EndpointsUpdate{}, resourceErr)
// Even if error is resource not found, watch shouldn't be canceled, because
// this is an EDS resource removed (and xds client actually never sends this
Expand Down Expand Up @@ -369,7 +369,7 @@ func (s) TestErrorFromResolver(t *testing.T) {
t.Fatalf("EDS impl got unexpected update: %v", err)
}

connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
connectionErr := xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "connection error")
edsB.ResolverError(connectionErr)

sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
Expand All @@ -387,7 +387,7 @@ func (s) TestErrorFromResolver(t *testing.T) {
t.Fatalf("want resolver error, got %v", err)
}

resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error")
resourceErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error")
edsB.ResolverError(resourceErr)
if _, err := xdsC.WaitForCancelEDSWatch(ctx); err != nil {
t.Fatalf("want watch to be canceled, waitForCancel failed: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsresource.ListenerUpdate,
// type we check is ResourceNotFound, which indicates the LDS resource
// was removed, and besides sending the error to callback, we also
// cancel the RDS watch.
if xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound && w.rdsCancel != nil {
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound && w.rdsCancel != nil {
w.rdsCancel()
w.rdsName = ""
w.rdsCancel = nil
Expand Down
3 changes: 2 additions & 1 deletion xds/internal/resolver/xds_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

const xdsScheme = "xds"
Expand Down Expand Up @@ -234,7 +235,7 @@ func (r *xdsResolver) run() {
case update := <-r.updateCh:
if update.err != nil {
r.logger.Warningf("Watch error on resource %v from xds-client %p, %v", r.target.Endpoint, r.client, update.err)
if xdsclient.ErrType(update.err) == xdsclient.ErrorTypeResourceNotFound {
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
// If error is resource-not-found, it means the LDS
// resource was removed. Ultimately send an empty service
// config, which picks pick-first, with no address, and
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/resolver/xds_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) {
}

// Delete the resource
suErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "resource removed error")
suErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource removed error")
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{}, suErr)

if _, err = tcc.stateCh.Receive(ctx); err != nil {
Expand Down Expand Up @@ -764,7 +764,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) {

// Delete the resource. The channel should receive a service config with the
// original cluster but with an erroring config selector.
suErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "resource removed error")
suErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource removed error")
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{}, suErr)

if gotState, err = tcc.stateCh.Receive(ctx); err != nil {
Expand Down Expand Up @@ -1164,7 +1164,7 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) {

// Invoke the watchAPI callback with a bad service update and wait for the
// ReportError method to be called on the ClientConn.
suErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "resource removed error")
suErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource removed error")
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{}, suErr)

if gotErrVal, gotErr := tcc.errorCh.Receive(ctx); gotErr != context.DeadlineExceeded {
Expand Down
5 changes: 2 additions & 3 deletions xds/internal/server/listener_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
Expand Down Expand Up @@ -365,7 +364,7 @@ func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) {
}
if update.err != nil {
l.logger.Warningf("Received error for rds names specified in resource %q: %+v", l.name, update.err)
if xdsclient.ErrType(update.err) == xdsclient.ErrorTypeResourceNotFound {
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
}
// For errors which are anything other than "resource-not-found", we
Expand All @@ -381,7 +380,7 @@ func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) {
func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
if update.err != nil {
l.logger.Warningf("Received error for resource %q: %+v", l.name, update.err)
if xdsclient.ErrType(update.err) == xdsclient.ErrorTypeResourceNotFound {
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
}
// For errors which are anything other than "resource-not-found", we
Expand Down
8 changes: 4 additions & 4 deletions xds/internal/xdsclient/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ type XDSClient interface {
WatchEndpoints(clusterName string, edsCb func(xdsresource.EndpointsUpdate, error)) (cancel func())
ReportLoad(server string) (*load.Store, func())

DumpLDS() (string, map[string]UpdateWithMD)
DumpRDS() (string, map[string]UpdateWithMD)
DumpCDS() (string, map[string]UpdateWithMD)
DumpEDS() (string, map[string]UpdateWithMD)
DumpLDS() (string, map[string]xdsresource.UpdateWithMD)
DumpRDS() (string, map[string]xdsresource.UpdateWithMD)
DumpCDS() (string, map[string]xdsresource.UpdateWithMD)
DumpEDS() (string, map[string]xdsresource.UpdateWithMD)

BootstrapConfig() *bootstrap.Config
Close()
Expand Down
Loading

0 comments on commit 59e024e

Please sign in to comment.