Skip to content

Commit

Permalink
xds/client: notify the resource watchers of xDS errors (#4564)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Jul 14, 2021
1 parent bfe1d0d commit b586e92
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 61 deletions.
11 changes: 5 additions & 6 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,18 +392,17 @@ func (b *cdsBalancer) run() {
// In both cases, the error will be forwarded to EDS balancer. And if error is
// resource-not-found, the child EDS balancer will stop watching EDS.
func (b *cdsBalancer) handleErrorFromUpdate(err error, fromParent bool) {
// TODO: connection errors will be sent to the eds balancers directly, and
// also forwarded by the parent balancers/resolvers. So the eds balancer may
// see the same error multiple times. We way want to only forward the error
// to eds if it's not a connection error.
//
// This is not necessary today, because xds client never sends connection
// errors.
if fromParent && xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound {
b.clusterHandler.close()
}
if b.edsLB != nil {
b.edsLB.ResolverError(err)
if xdsclient.ErrType(err) != xdsclient.ErrorTypeConnection {
// Connection errors will be sent to the child balancers directly.
// There's no need to forward them.
b.edsLB.ResolverError(err)
}
} else {
// If eds balancer was never created, fail the RPCs with
// errors.
Expand Down
37 changes: 29 additions & 8 deletions xds/internal/xdsclient/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,16 @@ func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate, metadata Up
// On NACK, update overall version to the NACKed resp.
c.ldsVersion = metadata.ErrState.Version
for name := range updates {
if _, ok := c.ldsWatchers[name]; ok {
if s, ok := c.ldsWatchers[name]; ok {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.ldsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.ldsMD[name] = mdCopy
// TODO: send the NACK error to the watcher.
for wi := range s {
wi.newError(metadata.ErrState.Err)
}
}
}
return
Expand Down Expand Up @@ -143,14 +145,16 @@ func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate, metad
// On NACK, update overall version to the NACKed resp.
c.rdsVersion = metadata.ErrState.Version
for name := range updates {
if _, ok := c.rdsWatchers[name]; ok {
if s, ok := c.rdsWatchers[name]; ok {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.rdsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.rdsMD[name] = mdCopy
// TODO: send the NACK error to the watcher.
for wi := range s {
wi.newError(metadata.ErrState.Err)
}
}
}
return
Expand Down Expand Up @@ -185,14 +189,16 @@ func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate, metadata Upda
// On NACK, update overall version to the NACKed resp.
c.cdsVersion = metadata.ErrState.Version
for name := range updates {
if _, ok := c.cdsWatchers[name]; ok {
if s, ok := c.cdsWatchers[name]; ok {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.cdsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.cdsMD[name] = mdCopy
// TODO: send the NACK error to the watcher.
for wi := range s {
wi.newError(metadata.ErrState.Err)
}
}
}
return
Expand Down Expand Up @@ -244,14 +250,16 @@ func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate, metadata U
// On NACK, update overall version to the NACKed resp.
c.edsVersion = metadata.ErrState.Version
for name := range updates {
if _, ok := c.edsWatchers[name]; ok {
if s, ok := c.edsWatchers[name]; ok {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.edsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.edsMD[name] = mdCopy
// TODO: send the NACK error to the watcher.
for wi := range s {
wi.newError(metadata.ErrState.Err)
}
}
}
return
Expand All @@ -272,3 +280,16 @@ func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate, metadata U
}
}
}

// NewConnectionError is called by the underlying xdsAPIClient when it receives
// a connection error. The error will be forwarded to all the resource watchers.
func (c *clientImpl) NewConnectionError(err error) {
c.mu.Lock()
defer c.mu.Unlock()

for _, s := range c.edsWatchers {
for wi := range s {
wi.newError(NewErrorf(ErrorTypeConnection, "xds: error received from xDS stream: %v", err))
}
}
}
3 changes: 3 additions & 0 deletions xds/internal/xdsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ type UpdateHandler interface {
// NewEndpoints handles updates to xDS ClusterLoadAssignment (or tersely
// referred to as Endpoints) resources.
NewEndpoints(map[string]EndpointsUpdate, UpdateMetadata)
// NewConnectionError handles connection errors from the xDS stream. The
// error will be reported to all the resource watchers.
NewConnectionError(err error)
}

// ServiceStatus is the status of the update.
Expand Down
38 changes: 31 additions & 7 deletions xds/internal/xdsclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,59 +187,83 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) {

wantUpdate := ClusterUpdate{ClusterName: testEDSName}
client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate}, UpdateMetadata{})
if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil {
if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate, nil); err != nil {
t.Fatal(err)
}

wantUpdate2 := ClusterUpdate{ClusterName: testEDSName + "2"}
client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate2}, UpdateMetadata{})
if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate2); err != nil {
if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate2, nil); err != nil {
t.Fatal(err)
}
}

func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate ListenerUpdate) error {
func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate ListenerUpdate, wantErr error) error {
u, err := updateCh.Receive(ctx)
if err != nil {
return fmt.Errorf("timeout when waiting for listener update: %v", err)
}
gotUpdate := u.(ldsUpdateErr)
if wantErr != nil {
if gotUpdate.err != wantErr {
return fmt.Errorf("unexpected error: %v, want %v", gotUpdate.err, wantErr)
}
return nil
}
if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate) {
return fmt.Errorf("unexpected endpointsUpdate: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate)
}
return nil
}

func verifyRouteConfigUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate RouteConfigUpdate) error {
func verifyRouteConfigUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate RouteConfigUpdate, wantErr error) error {
u, err := updateCh.Receive(ctx)
if err != nil {
return fmt.Errorf("timeout when waiting for route configuration update: %v", err)
}
gotUpdate := u.(rdsUpdateErr)
if wantErr != nil {
if gotUpdate.err != wantErr {
return fmt.Errorf("unexpected error: %v, want %v", gotUpdate.err, wantErr)
}
return nil
}
if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate) {
return fmt.Errorf("unexpected route config update: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate)
}
return nil
}

func verifyClusterUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate ClusterUpdate) error {
func verifyClusterUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate ClusterUpdate, wantErr error) error {
u, err := updateCh.Receive(ctx)
if err != nil {
return fmt.Errorf("timeout when waiting for cluster update: %v", err)
}
gotUpdate := u.(clusterUpdateErr)
if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate) {
if wantErr != nil {
if gotUpdate.err != wantErr {
return fmt.Errorf("unexpected error: %v, want %v", gotUpdate.err, wantErr)
}
return nil
}
if !cmp.Equal(gotUpdate.u, wantUpdate) {
return fmt.Errorf("unexpected clusterUpdate: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate)
}
return nil
}

func verifyEndpointsUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate EndpointsUpdate) error {
func verifyEndpointsUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate EndpointsUpdate, wantErr error) error {
u, err := updateCh.Receive(ctx)
if err != nil {
return fmt.Errorf("timeout when waiting for endpoints update: %v", err)
}
gotUpdate := u.(endpointsUpdateErr)
if wantErr != nil {
if gotUpdate.err != wantErr {
return fmt.Errorf("unexpected error: %v, want %v", gotUpdate.err, wantErr)
}
return nil
}
if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty()) {
return fmt.Errorf("unexpected endpointsUpdate: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate)
}
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/v2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (v2c *client) RecvResponse(s grpc.ClientStream) (proto.Message, error) {

resp, err := stream.Recv()
if err != nil {
// TODO: call watch callbacks with error when stream is broken.
v2c.parent.NewConnectionError(err)
return nil, fmt.Errorf("xds: stream.Recv() failed: %v", err)
}
v2c.logger.Infof("ADS response received, type: %v", resp.GetTypeUrl())
Expand Down
2 changes: 2 additions & 0 deletions xds/internal/xdsclient/v2/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,8 @@ func (t *testUpdateReceiver) NewEndpoints(d map[string]xdsclient.EndpointsUpdate
t.newUpdate(xdsclient.EndpointsResource, dd, metadata)
}

func (t *testUpdateReceiver) NewConnectionError(error) {}

func (t *testUpdateReceiver) newUpdate(rType xdsclient.ResourceType, d map[string]interface{}, metadata xdsclient.UpdateMetadata) {
t.f(rType, d, metadata)
}
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (v3c *client) RecvResponse(s grpc.ClientStream) (proto.Message, error) {

resp, err := stream.Recv()
if err != nil {
// TODO: call watch callbacks with error when stream is broken.
v3c.parent.NewConnectionError(err)
return nil, fmt.Errorf("xds: stream.Recv() failed: %v", err)
}
v3c.logger.Infof("ADS response received, type: %v", resp.GetTypeUrl())
Expand Down
11 changes: 11 additions & 0 deletions xds/internal/xdsclient/watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ func (wi *watchInfo) newUpdate(update interface{}) {
wi.c.scheduleCallback(wi, update, nil)
}

func (wi *watchInfo) newError(err error) {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.state = watchInfoStateRespReceived
wi.expiryTimer.Stop()
wi.sendErrorLocked(err)
}

func (wi *watchInfo) resourceNotFound() {
wi.mu.Lock()
defer wi.mu.Unlock()
Expand Down
Loading

0 comments on commit b586e92

Please sign in to comment.