From b586e9215896c69206b29af00f30bc34d483b6fc Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 14 Jul 2021 13:10:19 -0700 Subject: [PATCH] xds/client: notify the resource watchers of xDS errors (#4564) --- .../balancer/cdsbalancer/cdsbalancer.go | 11 ++-- xds/internal/xdsclient/callback.go | 37 ++++++++--- xds/internal/xdsclient/client.go | 3 + xds/internal/xdsclient/client_test.go | 38 ++++++++--- xds/internal/xdsclient/v2/client.go | 2 +- xds/internal/xdsclient/v2/client_test.go | 2 + xds/internal/xdsclient/v3/client.go | 2 +- xds/internal/xdsclient/watchers.go | 11 ++++ .../xdsclient/watchers_cluster_test.go | 63 +++++++++++++++---- .../xdsclient/watchers_endpoints_test.go | 51 ++++++++++++--- .../xdsclient/watchers_listener_test.go | 61 ++++++++++++++---- xds/internal/xdsclient/watchers_route_test.go | 49 +++++++++++++-- 12 files changed, 269 insertions(+), 61 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index b04d150a311..c9014247a76 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -392,18 +392,17 @@ func (b *cdsBalancer) run() { // In both cases, the error will be forwarded to EDS balancer. And if error is // resource-not-found, the child EDS balancer will stop watching EDS. func (b *cdsBalancer) handleErrorFromUpdate(err error, fromParent bool) { - // TODO: connection errors will be sent to the eds balancers directly, and - // also forwarded by the parent balancers/resolvers. So the eds balancer may - // see the same error multiple times. We way want to only forward the error - // to eds if it's not a connection error. - // // This is not necessary today, because xds client never sends connection // errors. if fromParent && xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound { b.clusterHandler.close() } if b.edsLB != nil { - b.edsLB.ResolverError(err) + if xdsclient.ErrType(err) != xdsclient.ErrorTypeConnection { + // Connection errors will be sent to the child balancers directly. + // There's no need to forward them. + b.edsLB.ResolverError(err) + } } else { // If eds balancer was never created, fail the RPCs with // errors. diff --git a/xds/internal/xdsclient/callback.go b/xds/internal/xdsclient/callback.go index 64b7d6794c4..b8ad0ec7636 100644 --- a/xds/internal/xdsclient/callback.go +++ b/xds/internal/xdsclient/callback.go @@ -84,14 +84,16 @@ func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate, metadata Up // On NACK, update overall version to the NACKed resp. c.ldsVersion = metadata.ErrState.Version for name := range updates { - if _, ok := c.ldsWatchers[name]; ok { + if s, ok := c.ldsWatchers[name]; ok { // On error, keep previous version for each resource. But update // status and error. mdCopy := c.ldsMD[name] mdCopy.ErrState = metadata.ErrState mdCopy.Status = metadata.Status c.ldsMD[name] = mdCopy - // TODO: send the NACK error to the watcher. + for wi := range s { + wi.newError(metadata.ErrState.Err) + } } } return @@ -143,14 +145,16 @@ func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate, metad // On NACK, update overall version to the NACKed resp. c.rdsVersion = metadata.ErrState.Version for name := range updates { - if _, ok := c.rdsWatchers[name]; ok { + if s, ok := c.rdsWatchers[name]; ok { // On error, keep previous version for each resource. But update // status and error. mdCopy := c.rdsMD[name] mdCopy.ErrState = metadata.ErrState mdCopy.Status = metadata.Status c.rdsMD[name] = mdCopy - // TODO: send the NACK error to the watcher. + for wi := range s { + wi.newError(metadata.ErrState.Err) + } } } return @@ -185,14 +189,16 @@ func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate, metadata Upda // On NACK, update overall version to the NACKed resp. c.cdsVersion = metadata.ErrState.Version for name := range updates { - if _, ok := c.cdsWatchers[name]; ok { + if s, ok := c.cdsWatchers[name]; ok { // On error, keep previous version for each resource. But update // status and error. mdCopy := c.cdsMD[name] mdCopy.ErrState = metadata.ErrState mdCopy.Status = metadata.Status c.cdsMD[name] = mdCopy - // TODO: send the NACK error to the watcher. + for wi := range s { + wi.newError(metadata.ErrState.Err) + } } } return @@ -244,14 +250,16 @@ func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate, metadata U // On NACK, update overall version to the NACKed resp. c.edsVersion = metadata.ErrState.Version for name := range updates { - if _, ok := c.edsWatchers[name]; ok { + if s, ok := c.edsWatchers[name]; ok { // On error, keep previous version for each resource. But update // status and error. mdCopy := c.edsMD[name] mdCopy.ErrState = metadata.ErrState mdCopy.Status = metadata.Status c.edsMD[name] = mdCopy - // TODO: send the NACK error to the watcher. + for wi := range s { + wi.newError(metadata.ErrState.Err) + } } } return @@ -272,3 +280,16 @@ func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate, metadata U } } } + +// NewConnectionError is called by the underlying xdsAPIClient when it receives +// a connection error. The error will be forwarded to all the resource watchers. +func (c *clientImpl) NewConnectionError(err error) { + c.mu.Lock() + defer c.mu.Unlock() + + for _, s := range c.edsWatchers { + for wi := range s { + wi.newError(NewErrorf(ErrorTypeConnection, "xds: error received from xDS stream: %v", err)) + } + } +} diff --git a/xds/internal/xdsclient/client.go b/xds/internal/xdsclient/client.go index cb0d93bd98e..a7de226cd29 100644 --- a/xds/internal/xdsclient/client.go +++ b/xds/internal/xdsclient/client.go @@ -141,6 +141,9 @@ type UpdateHandler interface { // NewEndpoints handles updates to xDS ClusterLoadAssignment (or tersely // referred to as Endpoints) resources. NewEndpoints(map[string]EndpointsUpdate, UpdateMetadata) + // NewConnectionError handles connection errors from the xDS stream. The + // error will be reported to all the resource watchers. + NewConnectionError(err error) } // ServiceStatus is the status of the update. diff --git a/xds/internal/xdsclient/client_test.go b/xds/internal/xdsclient/client_test.go index 12590408e6c..abf51c45b83 100644 --- a/xds/internal/xdsclient/client_test.go +++ b/xds/internal/xdsclient/client_test.go @@ -187,59 +187,83 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) { wantUpdate := ClusterUpdate{ClusterName: testEDSName} client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate}, UpdateMetadata{}) - if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil { + if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate, nil); err != nil { t.Fatal(err) } wantUpdate2 := ClusterUpdate{ClusterName: testEDSName + "2"} client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate2}, UpdateMetadata{}) - if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate2); err != nil { + if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate2, nil); err != nil { t.Fatal(err) } } -func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate ListenerUpdate) error { +func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate ListenerUpdate, wantErr error) error { u, err := updateCh.Receive(ctx) if err != nil { return fmt.Errorf("timeout when waiting for listener update: %v", err) } gotUpdate := u.(ldsUpdateErr) + if wantErr != nil { + if gotUpdate.err != wantErr { + return fmt.Errorf("unexpected error: %v, want %v", gotUpdate.err, wantErr) + } + return nil + } if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate) { return fmt.Errorf("unexpected endpointsUpdate: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate) } return nil } -func verifyRouteConfigUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate RouteConfigUpdate) error { +func verifyRouteConfigUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate RouteConfigUpdate, wantErr error) error { u, err := updateCh.Receive(ctx) if err != nil { return fmt.Errorf("timeout when waiting for route configuration update: %v", err) } gotUpdate := u.(rdsUpdateErr) + if wantErr != nil { + if gotUpdate.err != wantErr { + return fmt.Errorf("unexpected error: %v, want %v", gotUpdate.err, wantErr) + } + return nil + } if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate) { return fmt.Errorf("unexpected route config update: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate) } return nil } -func verifyClusterUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate ClusterUpdate) error { +func verifyClusterUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate ClusterUpdate, wantErr error) error { u, err := updateCh.Receive(ctx) if err != nil { return fmt.Errorf("timeout when waiting for cluster update: %v", err) } gotUpdate := u.(clusterUpdateErr) - if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate) { + if wantErr != nil { + if gotUpdate.err != wantErr { + return fmt.Errorf("unexpected error: %v, want %v", gotUpdate.err, wantErr) + } + return nil + } + if !cmp.Equal(gotUpdate.u, wantUpdate) { return fmt.Errorf("unexpected clusterUpdate: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate) } return nil } -func verifyEndpointsUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate EndpointsUpdate) error { +func verifyEndpointsUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate EndpointsUpdate, wantErr error) error { u, err := updateCh.Receive(ctx) if err != nil { return fmt.Errorf("timeout when waiting for endpoints update: %v", err) } gotUpdate := u.(endpointsUpdateErr) + if wantErr != nil { + if gotUpdate.err != wantErr { + return fmt.Errorf("unexpected error: %v, want %v", gotUpdate.err, wantErr) + } + return nil + } if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty()) { return fmt.Errorf("unexpected endpointsUpdate: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate) } diff --git a/xds/internal/xdsclient/v2/client.go b/xds/internal/xdsclient/v2/client.go index 311621f0405..766db2564b8 100644 --- a/xds/internal/xdsclient/v2/client.go +++ b/xds/internal/xdsclient/v2/client.go @@ -140,7 +140,7 @@ func (v2c *client) RecvResponse(s grpc.ClientStream) (proto.Message, error) { resp, err := stream.Recv() if err != nil { - // TODO: call watch callbacks with error when stream is broken. + v2c.parent.NewConnectionError(err) return nil, fmt.Errorf("xds: stream.Recv() failed: %v", err) } v2c.logger.Infof("ADS response received, type: %v", resp.GetTypeUrl()) diff --git a/xds/internal/xdsclient/v2/client_test.go b/xds/internal/xdsclient/v2/client_test.go index efa228a8c6e..138c9a16169 100644 --- a/xds/internal/xdsclient/v2/client_test.go +++ b/xds/internal/xdsclient/v2/client_test.go @@ -339,6 +339,8 @@ func (t *testUpdateReceiver) NewEndpoints(d map[string]xdsclient.EndpointsUpdate t.newUpdate(xdsclient.EndpointsResource, dd, metadata) } +func (t *testUpdateReceiver) NewConnectionError(error) {} + func (t *testUpdateReceiver) newUpdate(rType xdsclient.ResourceType, d map[string]interface{}, metadata xdsclient.UpdateMetadata) { t.f(rType, d, metadata) } diff --git a/xds/internal/xdsclient/v3/client.go b/xds/internal/xdsclient/v3/client.go index be8ff7720d8..6088189f97f 100644 --- a/xds/internal/xdsclient/v3/client.go +++ b/xds/internal/xdsclient/v3/client.go @@ -140,7 +140,7 @@ func (v3c *client) RecvResponse(s grpc.ClientStream) (proto.Message, error) { resp, err := stream.Recv() if err != nil { - // TODO: call watch callbacks with error when stream is broken. + v3c.parent.NewConnectionError(err) return nil, fmt.Errorf("xds: stream.Recv() failed: %v", err) } v3c.logger.Infof("ADS response received, type: %v", resp.GetTypeUrl()) diff --git a/xds/internal/xdsclient/watchers.go b/xds/internal/xdsclient/watchers.go index 249db4de91e..e26ed360308 100644 --- a/xds/internal/xdsclient/watchers.go +++ b/xds/internal/xdsclient/watchers.go @@ -66,6 +66,17 @@ func (wi *watchInfo) newUpdate(update interface{}) { wi.c.scheduleCallback(wi, update, nil) } +func (wi *watchInfo) newError(err error) { + wi.mu.Lock() + defer wi.mu.Unlock() + if wi.state == watchInfoStateCanceled { + return + } + wi.state = watchInfoStateRespReceived + wi.expiryTimer.Stop() + wi.sendErrorLocked(err) +} + func (wi *watchInfo) resourceNotFound() { wi.mu.Lock() defer wi.mu.Unlock() diff --git a/xds/internal/xdsclient/watchers_cluster_test.go b/xds/internal/xdsclient/watchers_cluster_test.go index 8c33486fa01..0ded3644599 100644 --- a/xds/internal/xdsclient/watchers_cluster_test.go +++ b/xds/internal/xdsclient/watchers_cluster_test.go @@ -22,6 +22,7 @@ package xdsclient import ( "context" + "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -66,7 +67,7 @@ func (s) TestClusterWatch(t *testing.T) { wantUpdate := ClusterUpdate{ClusterName: testEDSName} client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate}, UpdateMetadata{}) - if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil { + if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate, nil); err != nil { t.Fatal(err) } @@ -75,7 +76,7 @@ func (s) TestClusterWatch(t *testing.T) { testCDSName: wantUpdate, "randomName": {}, }, UpdateMetadata{}) - if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil { + if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate, nil); err != nil { t.Fatal(err) } @@ -131,7 +132,7 @@ func (s) TestClusterTwoWatchSameResourceName(t *testing.T) { wantUpdate := ClusterUpdate{ClusterName: testEDSName} client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate}, UpdateMetadata{}) for i := 0; i < count; i++ { - if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate); err != nil { + if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate, nil); err != nil { t.Fatal(err) } } @@ -140,7 +141,7 @@ func (s) TestClusterTwoWatchSameResourceName(t *testing.T) { cancelLastWatch() client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate}, UpdateMetadata{}) for i := 0; i < count-1; i++ { - if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate); err != nil { + if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate, nil); err != nil { t.Fatal(err) } } @@ -208,11 +209,11 @@ func (s) TestClusterThreeWatchDifferentResourceName(t *testing.T) { }, UpdateMetadata{}) for i := 0; i < count; i++ { - if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate1); err != nil { + if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate1, nil); err != nil { t.Fatal(err) } } - if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil { + if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2, nil); err != nil { t.Fatal(err) } } @@ -249,7 +250,7 @@ func (s) TestClusterWatchAfterCache(t *testing.T) { client.NewClusters(map[string]ClusterUpdate{ testCDSName: wantUpdate, }, UpdateMetadata{}) - if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil { + if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate, nil); err != nil { t.Fatal(err) } @@ -265,7 +266,7 @@ func (s) TestClusterWatchAfterCache(t *testing.T) { } // New watch should receives the update. - if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate); err != nil { + if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate, nil); err != nil { t.Fatal(err) } @@ -349,7 +350,7 @@ func (s) TestClusterWatchExpiryTimerStop(t *testing.T) { client.NewClusters(map[string]ClusterUpdate{ testCDSName: wantUpdate, }, UpdateMetadata{}) - if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil { + if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate, nil); err != nil { t.Fatal(err) } @@ -408,10 +409,10 @@ func (s) TestClusterResourceRemoved(t *testing.T) { testCDSName + "1": wantUpdate1, testCDSName + "2": wantUpdate2, }, UpdateMetadata{}) - if err := verifyClusterUpdate(ctx, clusterUpdateCh1, wantUpdate1); err != nil { + if err := verifyClusterUpdate(ctx, clusterUpdateCh1, wantUpdate1, nil); err != nil { t.Fatal(err) } - if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil { + if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2, nil); err != nil { t.Fatal(err) } @@ -424,7 +425,7 @@ func (s) TestClusterResourceRemoved(t *testing.T) { } // Watcher 2 should get the same update again. - if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil { + if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2, nil); err != nil { t.Fatal(err) } @@ -439,7 +440,43 @@ func (s) TestClusterResourceRemoved(t *testing.T) { } // Watcher 2 should get the same update again. - if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil { + if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2, nil); err != nil { + t.Fatal(err) + } +} + +// TestClusterWatchNACKError covers the case that an update is NACK'ed, and the +// watcher should also receive the error. +func (s) TestClusterWatchNACKError(t *testing.T) { + apiClientCh, cleanup := overrideNewAPIClient() + defer cleanup() + + client, err := newWithConfig(clientOpts(testXDSServer, false)) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + c, err := apiClientCh.Receive(ctx) + if err != nil { + t.Fatalf("timeout when waiting for API client to be created: %v", err) + } + apiClient := c.(*testAPIClient) + + clusterUpdateCh := testutils.NewChannel() + cancelWatch := client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { + clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) + }) + defer cancelWatch() + if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } + + wantError := fmt.Errorf("testing error") + client.NewClusters(map[string]ClusterUpdate{testCDSName: {}}, UpdateMetadata{ErrState: &UpdateErrorMetadata{Err: wantError}}) + if err := verifyClusterUpdate(ctx, clusterUpdateCh, ClusterUpdate{}, nil); err != nil { t.Fatal(err) } } diff --git a/xds/internal/xdsclient/watchers_endpoints_test.go b/xds/internal/xdsclient/watchers_endpoints_test.go index 70f06514d9f..9b5000b8bb9 100644 --- a/xds/internal/xdsclient/watchers_endpoints_test.go +++ b/xds/internal/xdsclient/watchers_endpoints_test.go @@ -22,6 +22,7 @@ package xdsclient import ( "context" + "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -84,7 +85,7 @@ func (s) TestEndpointsWatch(t *testing.T) { wantUpdate := EndpointsUpdate{Localities: []Locality{testLocalities[0]}} client.NewEndpoints(map[string]EndpointsUpdate{testCDSName: wantUpdate}, UpdateMetadata{}) - if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh, wantUpdate); err != nil { + if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh, wantUpdate, nil); err != nil { t.Fatal(err) } @@ -150,7 +151,7 @@ func (s) TestEndpointsTwoWatchSameResourceName(t *testing.T) { wantUpdate := EndpointsUpdate{Localities: []Locality{testLocalities[0]}} client.NewEndpoints(map[string]EndpointsUpdate{testCDSName: wantUpdate}, UpdateMetadata{}) for i := 0; i < count; i++ { - if err := verifyEndpointsUpdate(ctx, endpointsUpdateChs[i], wantUpdate); err != nil { + if err := verifyEndpointsUpdate(ctx, endpointsUpdateChs[i], wantUpdate, nil); err != nil { t.Fatal(err) } } @@ -159,7 +160,7 @@ func (s) TestEndpointsTwoWatchSameResourceName(t *testing.T) { cancelLastWatch() client.NewEndpoints(map[string]EndpointsUpdate{testCDSName: wantUpdate}, UpdateMetadata{}) for i := 0; i < count-1; i++ { - if err := verifyEndpointsUpdate(ctx, endpointsUpdateChs[i], wantUpdate); err != nil { + if err := verifyEndpointsUpdate(ctx, endpointsUpdateChs[i], wantUpdate, nil); err != nil { t.Fatal(err) } } @@ -227,11 +228,11 @@ func (s) TestEndpointsThreeWatchDifferentResourceName(t *testing.T) { }, UpdateMetadata{}) for i := 0; i < count; i++ { - if err := verifyEndpointsUpdate(ctx, endpointsUpdateChs[i], wantUpdate1); err != nil { + if err := verifyEndpointsUpdate(ctx, endpointsUpdateChs[i], wantUpdate1, nil); err != nil { t.Fatal(err) } } - if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh2, wantUpdate2); err != nil { + if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh2, wantUpdate2, nil); err != nil { t.Fatal(err) } } @@ -266,7 +267,7 @@ func (s) TestEndpointsWatchAfterCache(t *testing.T) { wantUpdate := EndpointsUpdate{Localities: []Locality{testLocalities[0]}} client.NewEndpoints(map[string]EndpointsUpdate{testCDSName: wantUpdate}, UpdateMetadata{}) - if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh, wantUpdate); err != nil { + if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh, wantUpdate, nil); err != nil { t.Fatal(err) } @@ -282,7 +283,7 @@ func (s) TestEndpointsWatchAfterCache(t *testing.T) { } // New watch should receives the update. - if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh2, wantUpdate); err != nil { + if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh2, wantUpdate, nil); err != nil { t.Fatal(err) } @@ -332,3 +333,39 @@ func (s) TestEndpointsWatchExpiryTimer(t *testing.T) { t.Fatalf("unexpected endpointsUpdate: (%v, %v), want: (EndpointsUpdate{}, nil)", gotUpdate.u, gotUpdate.err) } } + +// TestEndpointsWatchNACKError covers the case that an update is NACK'ed, and +// the watcher should also receive the error. +func (s) TestEndpointsWatchNACKError(t *testing.T) { + apiClientCh, cleanup := overrideNewAPIClient() + defer cleanup() + + client, err := newWithConfig(clientOpts(testXDSServer, false)) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + c, err := apiClientCh.Receive(ctx) + if err != nil { + t.Fatalf("timeout when waiting for API client to be created: %v", err) + } + apiClient := c.(*testAPIClient) + + endpointsUpdateCh := testutils.NewChannel() + cancelWatch := client.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) { + endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err}) + }) + defer cancelWatch() + if _, err := apiClient.addWatches[EndpointsResource].Receive(ctx); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } + + wantError := fmt.Errorf("testing error") + client.NewEndpoints(map[string]EndpointsUpdate{testCDSName: {}}, UpdateMetadata{ErrState: &UpdateErrorMetadata{Err: wantError}}) + if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh, EndpointsUpdate{}, wantError); err != nil { + t.Fatal(err) + } +} diff --git a/xds/internal/xdsclient/watchers_listener_test.go b/xds/internal/xdsclient/watchers_listener_test.go index 79ef997a72d..f7a1169bee3 100644 --- a/xds/internal/xdsclient/watchers_listener_test.go +++ b/xds/internal/xdsclient/watchers_listener_test.go @@ -22,6 +22,7 @@ package xdsclient import ( "context" + "fmt" "testing" "google.golang.org/grpc/internal/testutils" @@ -64,7 +65,7 @@ func (s) TestLDSWatch(t *testing.T) { wantUpdate := ListenerUpdate{RouteConfigName: testRDSName} client.NewListeners(map[string]ListenerUpdate{testLDSName: wantUpdate}, UpdateMetadata{}) - if err := verifyListenerUpdate(ctx, ldsUpdateCh, wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, ldsUpdateCh, wantUpdate, nil); err != nil { t.Fatal(err) } @@ -73,7 +74,7 @@ func (s) TestLDSWatch(t *testing.T) { testLDSName: wantUpdate, "randomName": {}, }, UpdateMetadata{}) - if err := verifyListenerUpdate(ctx, ldsUpdateCh, wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, ldsUpdateCh, wantUpdate, nil); err != nil { t.Fatal(err) } @@ -132,7 +133,7 @@ func (s) TestLDSTwoWatchSameResourceName(t *testing.T) { wantUpdate := ListenerUpdate{RouteConfigName: testRDSName} client.NewListeners(map[string]ListenerUpdate{testLDSName: wantUpdate}, UpdateMetadata{}) for i := 0; i < count; i++ { - if err := verifyListenerUpdate(ctx, ldsUpdateChs[i], wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, ldsUpdateChs[i], wantUpdate, nil); err != nil { t.Fatal(err) } } @@ -141,7 +142,7 @@ func (s) TestLDSTwoWatchSameResourceName(t *testing.T) { cancelLastWatch() client.NewListeners(map[string]ListenerUpdate{testLDSName: wantUpdate}, UpdateMetadata{}) for i := 0; i < count-1; i++ { - if err := verifyListenerUpdate(ctx, ldsUpdateChs[i], wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, ldsUpdateChs[i], wantUpdate, nil); err != nil { t.Fatal(err) } } @@ -210,11 +211,11 @@ func (s) TestLDSThreeWatchDifferentResourceName(t *testing.T) { }, UpdateMetadata{}) for i := 0; i < count; i++ { - if err := verifyListenerUpdate(ctx, ldsUpdateChs[i], wantUpdate1); err != nil { + if err := verifyListenerUpdate(ctx, ldsUpdateChs[i], wantUpdate1, nil); err != nil { t.Fatal(err) } } - if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2); err != nil { + if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2, nil); err != nil { t.Fatal(err) } } @@ -249,7 +250,7 @@ func (s) TestLDSWatchAfterCache(t *testing.T) { wantUpdate := ListenerUpdate{RouteConfigName: testRDSName} client.NewListeners(map[string]ListenerUpdate{testLDSName: wantUpdate}, UpdateMetadata{}) - if err := verifyListenerUpdate(ctx, ldsUpdateCh, wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, ldsUpdateCh, wantUpdate, nil); err != nil { t.Fatal(err) } @@ -265,7 +266,7 @@ func (s) TestLDSWatchAfterCache(t *testing.T) { } // New watch should receive the update. - if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate, nil); err != nil { t.Fatal(err) } @@ -323,10 +324,10 @@ func (s) TestLDSResourceRemoved(t *testing.T) { testLDSName + "1": wantUpdate1, testLDSName + "2": wantUpdate2, }, UpdateMetadata{}) - if err := verifyListenerUpdate(ctx, ldsUpdateCh1, wantUpdate1); err != nil { + if err := verifyListenerUpdate(ctx, ldsUpdateCh1, wantUpdate1, nil); err != nil { t.Fatal(err) } - if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2); err != nil { + if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2, nil); err != nil { t.Fatal(err) } @@ -339,7 +340,7 @@ func (s) TestLDSResourceRemoved(t *testing.T) { } // Watcher 2 should get the same update again. - if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2); err != nil { + if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2, nil); err != nil { t.Fatal(err) } @@ -354,7 +355,43 @@ func (s) TestLDSResourceRemoved(t *testing.T) { } // Watcher 2 should get the same update again. - if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2); err != nil { + if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2, nil); err != nil { + t.Fatal(err) + } +} + +// TestListenerWatchNACKError covers the case that an update is NACK'ed, and the +// watcher should also receive the error. +func (s) TestListenerWatchNACKError(t *testing.T) { + apiClientCh, cleanup := overrideNewAPIClient() + defer cleanup() + + client, err := newWithConfig(clientOpts(testXDSServer, false)) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + c, err := apiClientCh.Receive(ctx) + if err != nil { + t.Fatalf("timeout when waiting for API client to be created: %v", err) + } + apiClient := c.(*testAPIClient) + + ldsUpdateCh := testutils.NewChannel() + cancelWatch := client.WatchListener(testLDSName, func(update ListenerUpdate, err error) { + ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err}) + }) + defer cancelWatch() + if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } + + wantError := fmt.Errorf("testing error") + client.NewListeners(map[string]ListenerUpdate{testLDSName: {}}, UpdateMetadata{ErrState: &UpdateErrorMetadata{Err: wantError}}) + if err := verifyListenerUpdate(ctx, ldsUpdateCh, ListenerUpdate{}, wantError); err != nil { t.Fatal(err) } } diff --git a/xds/internal/xdsclient/watchers_route_test.go b/xds/internal/xdsclient/watchers_route_test.go index 08b035a0b0a..5e9b51da9d2 100644 --- a/xds/internal/xdsclient/watchers_route_test.go +++ b/xds/internal/xdsclient/watchers_route_test.go @@ -22,6 +22,7 @@ package xdsclient import ( "context" + "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -73,7 +74,7 @@ func (s) TestRDSWatch(t *testing.T) { }, } client.NewRouteConfigs(map[string]RouteConfigUpdate{testRDSName: wantUpdate}, UpdateMetadata{}) - if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh, wantUpdate); err != nil { + if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh, wantUpdate, nil); err != nil { t.Fatal(err) } @@ -146,7 +147,7 @@ func (s) TestRDSTwoWatchSameResourceName(t *testing.T) { } client.NewRouteConfigs(map[string]RouteConfigUpdate{testRDSName: wantUpdate}, UpdateMetadata{}) for i := 0; i < count; i++ { - if err := verifyRouteConfigUpdate(ctx, rdsUpdateChs[i], wantUpdate); err != nil { + if err := verifyRouteConfigUpdate(ctx, rdsUpdateChs[i], wantUpdate, nil); err != nil { t.Fatal(err) } } @@ -155,7 +156,7 @@ func (s) TestRDSTwoWatchSameResourceName(t *testing.T) { cancelLastWatch() client.NewRouteConfigs(map[string]RouteConfigUpdate{testRDSName: wantUpdate}, UpdateMetadata{}) for i := 0; i < count-1; i++ { - if err := verifyRouteConfigUpdate(ctx, rdsUpdateChs[i], wantUpdate); err != nil { + if err := verifyRouteConfigUpdate(ctx, rdsUpdateChs[i], wantUpdate, nil); err != nil { t.Fatal(err) } } @@ -237,11 +238,11 @@ func (s) TestRDSThreeWatchDifferentResourceName(t *testing.T) { }, UpdateMetadata{}) for i := 0; i < count; i++ { - if err := verifyRouteConfigUpdate(ctx, rdsUpdateChs[i], wantUpdate1); err != nil { + if err := verifyRouteConfigUpdate(ctx, rdsUpdateChs[i], wantUpdate1, nil); err != nil { t.Fatal(err) } } - if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh2, wantUpdate2); err != nil { + if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh2, wantUpdate2, nil); err != nil { t.Fatal(err) } } @@ -283,7 +284,7 @@ func (s) TestRDSWatchAfterCache(t *testing.T) { }, } client.NewRouteConfigs(map[string]RouteConfigUpdate{testRDSName: wantUpdate}, UpdateMetadata{}) - if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh, wantUpdate); err != nil { + if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh, wantUpdate, nil); err != nil { t.Fatal(err) } @@ -310,3 +311,39 @@ func (s) TestRDSWatchAfterCache(t *testing.T) { t.Errorf("unexpected RouteConfigUpdate: %v, %v, want channel recv timeout", u, err) } } + +// TestRouteWatchNACKError covers the case that an update is NACK'ed, and the +// watcher should also receive the error. +func (s) TestRouteWatchNACKError(t *testing.T) { + apiClientCh, cleanup := overrideNewAPIClient() + defer cleanup() + + client, err := newWithConfig(clientOpts(testXDSServer, false)) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + c, err := apiClientCh.Receive(ctx) + if err != nil { + t.Fatalf("timeout when waiting for API client to be created: %v", err) + } + apiClient := c.(*testAPIClient) + + rdsUpdateCh := testutils.NewChannel() + cancelWatch := client.WatchRouteConfig(testCDSName, func(update RouteConfigUpdate, err error) { + rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err}) + }) + defer cancelWatch() + if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } + + wantError := fmt.Errorf("testing error") + client.NewRouteConfigs(map[string]RouteConfigUpdate{testCDSName: {}}, UpdateMetadata{ErrState: &UpdateErrorMetadata{Err: wantError}}) + if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh, RouteConfigUpdate{}, wantError); err != nil { + t.Fatal(err) + } +}