Skip to content

Commit

Permalink
xds/client: move transport_helper from xdsclient to a separate struct (
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Nov 10, 2021
1 parent 6603e73 commit 52d9416
Show file tree
Hide file tree
Showing 49 changed files with 1,051 additions and 1,072 deletions.
4 changes: 2 additions & 2 deletions xds/csds/csds.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/protobuf/types/known/timestamppb"

_ "google.golang.org/grpc/xds/internal/xdsclient/v2" // Register v2 xds_client.
_ "google.golang.org/grpc/xds/internal/xdsclient/v3" // Register v3 xds_client.
_ "google.golang.org/grpc/xds/internal/xdsclient/controller/version/v2" // Register v2 xds_client.
_ "google.golang.org/grpc/xds/internal/xdsclient/controller/version/v3" // Register v3 xds_client.
)

var (
Expand Down
2 changes: 1 addition & 1 deletion xds/googledirectpath/googlec2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ import (
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/xds" // To register xds resolvers and balancers.
"google.golang.org/grpc/xds/internal/version"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/types/known/structpb"
)

Expand Down
2 changes: 1 addition & 1 deletion xds/googledirectpath/googlec2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/version"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/structpb"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"

_ "google.golang.org/grpc/xds/internal/xdsclient/v2" // V2 client registration.
_ "google.golang.org/grpc/xds/internal/xdsclient/controller/version/v2" // V2 client registration.
)

const (
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/httpfilter/fault/fault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ import (
tpb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
testpb "google.golang.org/grpc/test/grpc_testing"

_ "google.golang.org/grpc/xds/internal/balancer" // Register the balancers.
_ "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver.
_ "google.golang.org/grpc/xds/internal/xdsclient/v3" // Register the v3 xDS API client.
_ "google.golang.org/grpc/xds/internal/balancer" // Register the balancers.
_ "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver.
_ "google.golang.org/grpc/xds/internal/xdsclient/controller/version/v3" // Register the v3 xDS API client.
)

const defaultTestTimeout = 10 * time.Second
Expand Down
15 changes: 10 additions & 5 deletions xds/internal/resolver/xds_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ func (b *xdsResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, op
}
defer func() {
if retErr != nil {
if r.client != nil {
r.client.Close()
}
r.Close()
}
}()
r.logger = prefixLogger(r)
Expand Down Expand Up @@ -304,8 +302,15 @@ func (*xdsResolver) ResolveNow(o resolver.ResolveNowOptions) {}

// Close closes the resolver, and also closes the underlying xdsClient.
func (r *xdsResolver) Close() {
r.cancelWatch()
r.client.Close()
// Note that Close needs to check for nils even if some of them are always
// set in the constructor. This is because the constructor defers Close() in
// error cases, and the fields might not be set when the error happens.
if r.cancelWatch != nil {
r.cancelWatch()
}
if r.client != nil {
r.client.Close()
}
r.closed.Fire()
r.logger.Infof("Shutdown")
}
14 changes: 0 additions & 14 deletions xds/internal/testutils/fakeserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package fakeserver

import (
"context"
"fmt"
"io"
"net"
Expand All @@ -29,7 +28,6 @@ import (
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -134,18 +132,6 @@ func StartServer() (*Server, func(), error) {
return s, func() { server.Stop() }, nil
}

// XDSClientConn returns a grpc.ClientConn connected to the fakeServer.
func (xdsS *Server) XDSClientConn() (*grpc.ClientConn, func(), error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultDialTimeout)
defer cancel()

cc, err := grpc.DialContext(ctx, xdsS.Address, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
return nil, nil, fmt.Errorf("grpc.DialContext(%s) failed: %v", xdsS.Address, err)
}
return cc, func() { cc.Close() }, nil
}

type xdsServer struct {
reqChan *testutils.Channel
respChan chan *Response
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/xds/internal/version"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/bootstrap/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
"google.golang.org/grpc/credentials/tls/certprovider"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/xds/internal/version"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
)

var (
Expand Down
197 changes: 21 additions & 176 deletions xds/internal/xdsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,134 +21,16 @@
package xdsclient

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/xds/internal/version"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/load"
"google.golang.org/grpc/xds/internal/xdsclient/pubsub"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

var (
m = make(map[version.TransportAPI]APIClientBuilder)
)

// RegisterAPIClientBuilder registers a client builder for xDS transport protocol
// version specified by b.Version().
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple builders are
// registered for the same version, the one registered last will take effect.
func RegisterAPIClientBuilder(b APIClientBuilder) {
m[b.Version()] = b
}

// getAPIClientBuilder returns the client builder registered for the provided
// xDS transport API version.
func getAPIClientBuilder(version version.TransportAPI) APIClientBuilder {
if b, ok := m[version]; ok {
return b
}
return nil
}

// BuildOptions contains options to be passed to client builders.
type BuildOptions struct {
// Parent is a top-level xDS client which has the intelligence to take
// appropriate action based on xDS responses received from the management
// server.
Parent UpdateHandler
// Validator performs post unmarshal validation checks.
Validator xdsresource.UpdateValidatorFunc
// NodeProto contains the Node proto to be used in xDS requests. The actual
// type depends on the transport protocol version used.
NodeProto proto.Message
// Backoff returns the amount of time to backoff before retrying broken
// streams.
Backoff func(int) time.Duration
// Logger provides enhanced logging capabilities.
Logger *grpclog.PrefixLogger
}

// APIClientBuilder creates an xDS client for a specific xDS transport protocol
// version.
type APIClientBuilder interface {
// Build builds a transport protocol specific implementation of the xDS
// client based on the provided clientConn to the management server and the
// provided options.
Build(*grpc.ClientConn, BuildOptions) (APIClient, error)
// Version returns the xDS transport protocol version used by clients build
// using this builder.
Version() version.TransportAPI
}

// APIClient represents the functionality provided by transport protocol
// version specific implementations of the xDS client.
//
// TODO: unexport this interface and all the methods after the PR to make
// xdsClient sharable by clients. AddWatch and RemoveWatch are exported for
// v2/v3 to override because they need to keep track of LDS name for RDS to use.
// After the share xdsClient change, that's no longer necessary. After that, we
// will still keep this interface for testing purposes.
type APIClient interface {
// AddWatch adds a watch for an xDS resource given its type and name.
AddWatch(xdsresource.ResourceType, string)

// RemoveWatch cancels an already registered watch for an xDS resource
// given its type and name.
RemoveWatch(xdsresource.ResourceType, string)

// reportLoad starts an LRS stream to periodically report load using the
// provided ClientConn, which represent a connection to the management
// server.
reportLoad(ctx context.Context, cc *grpc.ClientConn, opts loadReportingOptions)

// Close cleans up resources allocated by the API client.
Close()
}

// loadReportingOptions contains configuration knobs for reporting load data.
type loadReportingOptions struct {
loadStore *load.Store
}

// UpdateHandler receives and processes (by taking appropriate actions) xDS
// resource updates from an APIClient for a specific version.
type UpdateHandler interface {
// NewListeners handles updates to xDS listener resources.
NewListeners(map[string]xdsresource.ListenerUpdateErrTuple, xdsresource.UpdateMetadata)
// NewRouteConfigs handles updates to xDS RouteConfiguration resources.
NewRouteConfigs(map[string]xdsresource.RouteConfigUpdateErrTuple, xdsresource.UpdateMetadata)
// NewClusters handles updates to xDS Cluster resources.
NewClusters(map[string]xdsresource.ClusterUpdateErrTuple, xdsresource.UpdateMetadata)
// NewEndpoints handles updates to xDS ClusterLoadAssignment (or tersely
// referred to as Endpoints) resources.
NewEndpoints(map[string]xdsresource.EndpointsUpdateErrTuple, xdsresource.UpdateMetadata)
// NewConnectionError handles connection errors from the xDS stream. The
// error will be reported to all the resource watchers.
NewConnectionError(err error)
}

// Function to be overridden in tests.
var newAPIClient = func(apiVersion version.TransportAPI, cc *grpc.ClientConn, opts BuildOptions) (APIClient, error) {
cb := getAPIClientBuilder(apiVersion)
if cb == nil {
return nil, fmt.Errorf("no client builder for xDS API version: %v", apiVersion)
}
return cb.Build(cc, opts)
}

// clientImpl is the real implementation of the xds client. The exported Client
// is a wrapper of this struct with a ref count.
//
Expand All @@ -157,83 +39,39 @@ var newAPIClient = func(apiVersion version.TransportAPI, cc *grpc.ClientConn, op
// style of ccBalancerWrapper so that the Client type does not implement these
// exported methods.
type clientImpl struct {
done *grpcsync.Event
config *bootstrap.Config
cc *grpc.ClientConn // Connection to the management server.
apiClient APIClient
done *grpcsync.Event
config *bootstrap.Config

controller controllerInterface

logger *grpclog.PrefixLogger
pubsub *pubsub.Pubsub

// Changes to map lrsClients and the lrsClient inside the map need to be
// protected by lrsMu.
lrsMu sync.Mutex
lrsClients map[string]*lrsClient
}

// newWithConfig returns a new xdsClient with the given config.
func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration) (_ *clientImpl, retErr error) {
switch {
case config.XDSServer == nil:
return nil, errors.New("xds: no xds_server provided")
case config.XDSServer.ServerURI == "":
return nil, errors.New("xds: no xds_server name provided in options")
case config.XDSServer.Creds == nil:
return nil, errors.New("xds: no credentials provided in options")
case config.XDSServer.NodeProto == nil:
return nil, errors.New("xds: no node_proto provided in options")
}

dopts := []grpc.DialOption{
config.XDSServer.Creds,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 5 * time.Minute,
Timeout: 20 * time.Second,
}),
}

c := &clientImpl{
done: grpcsync.NewEvent(),
config: config,
lrsClients: make(map[string]*lrsClient),
done: grpcsync.NewEvent(),
config: config,
}

defer func() {
if retErr != nil {
if c.cc != nil {
c.cc.Close()
}
if c.pubsub != nil {
c.pubsub.Close()
}
if c.apiClient != nil {
c.apiClient.Close()
}
c.Close()
}
}()

cc, err := grpc.Dial(config.XDSServer.ServerURI, dopts...)
if err != nil {
// An error from a non-blocking dial indicates something serious.
return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", config.XDSServer.ServerURI, err)
}
c.cc = cc
c.logger = prefixLogger(c)
c.logger.Infof("Created ClientConn to xDS management server: %s", config.XDSServer)

c.pubsub = pubsub.New(watchExpiryTimeout, c.logger)

apiClient, err := newAPIClient(config.XDSServer.TransportAPI, cc, BuildOptions{
Parent: c,
Validator: c.updateValidator,
NodeProto: config.XDSServer.NodeProto,
Backoff: backoff.DefaultExponential.Backoff,
Logger: c.logger,
})
controller, err := newController(config.XDSServer, c.pubsub, c.updateValidator, c.logger)
if err != nil {
return nil, err
return nil, fmt.Errorf("xds: failed to connect to the control plane: %v", err)
}
c.apiClient = apiClient
c.controller = controller

c.logger.Infof("Created")
return c, nil
}
Expand All @@ -252,9 +90,16 @@ func (c *clientImpl) Close() {
c.done.Fire()
// TODO: Should we invoke the registered callbacks here with an error that
// the client is closed?
c.apiClient.Close()
c.cc.Close()
c.pubsub.Close()

// Note that Close needs to check for nils even if some of them are always
// set in the constructor. This is because the constructor defers Close() in
// error cases, and the fields might not be set when the error happens.
if c.controller != nil {
c.controller.Close()
}
if c.pubsub != nil {
c.pubsub.Close()
}
c.logger.Infof("Shutdown")
}

Expand Down
Loading

0 comments on commit 52d9416

Please sign in to comment.