diff --git a/internal/credentials/xds/handshake_info.go b/internal/credentials/xds/handshake_info.go index 56970e0510bd..b6f1fa520fc4 100644 --- a/internal/credentials/xds/handshake_info.go +++ b/internal/credentials/xds/handshake_info.go @@ -43,12 +43,26 @@ func init() { // the Attributes field of resolver.Address. type handshakeAttrKey struct{} -// Equal reports whether the handshake info structs are identical (have the -// same pointer). This is sufficient as all subconns from one CDS balancer use -// the same one. -func (hi *HandshakeInfo) Equal(o any) bool { - oh, ok := o.(*HandshakeInfo) - return ok && oh == hi +// Equal reports whether the handshake info structs are identical. +func (hi *HandshakeInfo) Equal(other *HandshakeInfo) bool { + if hi == nil && other == nil { + return true + } + if hi == nil || other == nil { + return false + } + if hi.rootProvider != other.rootProvider || + hi.identityProvider != other.identityProvider || + hi.requireClientCert != other.requireClientCert || + len(hi.sanMatchers) != len(other.sanMatchers) { + return false + } + for i := range hi.sanMatchers { + if !hi.sanMatchers[i].Equal(other.sanMatchers[i]) { + return false + } + } + return true } // SetHandshakeInfo returns a copy of addr in which the Attributes field is diff --git a/internal/stubserver/stubserver.go b/internal/stubserver/stubserver.go index cfc4b0f2e82d..39c291500547 100644 --- a/internal/stubserver/stubserver.go +++ b/internal/stubserver/stubserver.go @@ -216,7 +216,7 @@ func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult { // StartTestService spins up a stub server exposing the TestService on a local // port. If the passed in server is nil, a stub server that implements only the // EmptyCall and UnaryCall RPCs is started. -func StartTestService(t *testing.T, server *StubServer) *StubServer { +func StartTestService(t *testing.T, server *StubServer, sopts ...grpc.ServerOption) *StubServer { if server == nil { server = &StubServer{ EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, @@ -225,7 +225,7 @@ func StartTestService(t *testing.T, server *StubServer) *StubServer { }, } } - server.StartServer() + server.StartServer(sopts...) t.Logf("Started test service backend at %q", server.Address) return server diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go index 64e18c21b6b1..7a358b1fcb5d 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go @@ -18,718 +18,715 @@ package cdsbalancer import ( "context" - "errors" + "crypto/tls" + "crypto/x509" + "encoding/json" "fmt" - "regexp" + "os" + "strings" "testing" "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/credentials/local" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/tls/certprovider" "google.golang.org/grpc/credentials/xds" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancer/stub" xdscredsinternal "google.golang.org/grpc/internal/credentials/xds" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" - "google.golang.org/grpc/internal/xds/matcher" + xdsbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" - "google.golang.org/grpc/xds/internal/testutils/fakeclient" - "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" - "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/testdata" + "google.golang.org/grpc/xds/internal/xdsclient" + + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" + + _ "google.golang.org/grpc/credentials/tls/certprovider/pemfile" // Register the file watcher certificate provider plugin. ) -const ( - fakeProvider1Name = "fake-certificate-provider-1" - fakeProvider2Name = "fake-certificate-provider-2" - fakeConfig = "my fake config" - testSAN = "test-san" -) +// testCCWrapper wraps a balancer.ClientConn and intercepts NewSubConn and +// returns the xDS handshake info back to the test for inspection. +type testCCWrapper struct { + balancer.ClientConn + handshakeInfoCh chan *xdscredsinternal.HandshakeInfo +} -var ( - testSANMatchers = []matcher.StringMatcher{ - matcher.StringMatcherForTesting(newStringP(testSAN), nil, nil, nil, nil, true), - matcher.StringMatcherForTesting(nil, newStringP(testSAN), nil, nil, nil, false), - matcher.StringMatcherForTesting(nil, nil, newStringP(testSAN), nil, nil, false), - matcher.StringMatcherForTesting(nil, nil, nil, nil, regexp.MustCompile(testSAN), false), - matcher.StringMatcherForTesting(nil, nil, nil, newStringP(testSAN), nil, false), - } - fpb1, fpb2 *fakeProviderBuilder - bootstrapConfig *bootstrap.Config - cdsUpdateWithGoodSecurityCfg = xdsresource.ClusterUpdate{ - ClusterName: serviceName, - SecurityCfg: &xdsresource.SecurityConfig{ - RootInstanceName: "default1", - IdentityInstanceName: "default2", - SubjectAltNameMatchers: testSANMatchers, - }, - LBPolicy: wrrLocalityLBConfigJSON, - } - cdsUpdateWithMissingSecurityCfg = xdsresource.ClusterUpdate{ - ClusterName: serviceName, - SecurityCfg: &xdsresource.SecurityConfig{ - RootInstanceName: "not-default", - }, +// NewSubConn forwards the call to the underlying balancer.ClientConn, but +// before that, it validates the following: +// - there is only one address in the addrs slice +// - the single address contains xDS handshake information, which is then +// pushed onto the handshakeInfoCh channel +func (tcc *testCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + if len(addrs) != 1 { + return nil, fmt.Errorf("NewSubConn got %d addresses, want 1", len(addrs)) + } + getHI := internal.GetXDSHandshakeInfoForTesting.(func(attr *attributes.Attributes) *xdscredsinternal.HandshakeInfo) + hi := getHI(addrs[0].Attributes) + if hi == nil { + return nil, fmt.Errorf("NewSubConn got address without xDS handshake info") + } + sc, err := tcc.ClientConn.NewSubConn(addrs, opts) + select { + case tcc.handshakeInfoCh <- hi: + default: } -) - -func newStringP(s string) *string { - return &s + return sc, err } -func init() { - fpb1 = &fakeProviderBuilder{name: fakeProvider1Name} - fpb2 = &fakeProviderBuilder{name: fakeProvider2Name} - cfg1, _ := fpb1.ParseConfig(fakeConfig + "1111") - cfg2, _ := fpb2.ParseConfig(fakeConfig + "2222") - bootstrapConfig = &bootstrap.Config{ - CertProviderConfigs: map[string]*certprovider.BuildableConfig{ - "default1": cfg1, - "default2": cfg2, +// Registers a wrapped cds LB policy for the duration of this test that retains +// all the functionality of the original cds LB policy, but overrides the +// NewSubConn method passed to the policy and makes the xDS handshake +// information passed to NewSubConn available to the test. +// +// Accepts as argument a channel onto which xDS handshake information passed to +// NewSubConn is written to. +func registerWrappedCDSPolicyWithNewSubConnOverride(t *testing.T, ch chan *xdscredsinternal.HandshakeInfo) { + cdsBuilder := balancer.Get(cdsName) + internal.BalancerUnregister(cdsBuilder.Name()) + var ccWrapper *testCCWrapper + stub.Register(cdsBuilder.Name(), stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + ccWrapper = &testCCWrapper{ + ClientConn: bd.ClientConn, + handshakeInfoCh: ch, + } + bd.Data = cdsBuilder.Build(ccWrapper, bd.BuildOptions) }, - } - certprovider.Register(fpb1) - certprovider.Register(fpb2) + ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + return cdsBuilder.(balancer.ConfigParser).ParseConfig(lbCfg) + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + bal := bd.Data.(balancer.Balancer) + return bal.UpdateClientConnState(ccs) + }, + Close: func(bd *stub.BalancerData) { + bal := bd.Data.(balancer.Balancer) + bal.Close() + }, + }) + t.Cleanup(func() { balancer.Register(cdsBuilder) }) } -// fakeProviderBuilder builds new instances of fakeProvider and interprets the -// config provided to it as a string. -type fakeProviderBuilder struct { - name string -} +// Common setup for security tests: +// - creates an xDS client with the specified bootstrap configuration +// - creates a manual resolver that specifies cds as the top-level LB policy +// - creates a channel that uses the passed in client creds and the manual +// resolver +// - creates a test server that uses the passed in server creds +// +// Returns the following: +// - a client channel to make RPCs +// - address of the test backend server +func setupForSecurityTests(t *testing.T, bootstrapContents []byte, clientCreds, serverCreds credentials.TransportCredentials) (*grpc.ClientConn, string) { + t.Helper() -func (b *fakeProviderBuilder) ParseConfig(config any) (*certprovider.BuildableConfig, error) { - s, ok := config.(string) - if !ok { - return nil, fmt.Errorf("providerBuilder %s received config of type %T, want string", b.name, config) + xdsClient, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + t.Cleanup(xdsClose) + + // Create a manual resolver that configures the CDS LB policy as the + // top-level LB policy on the channel. + r := manual.NewBuilderWithScheme("whatever") + jsonSC := fmt.Sprintf(`{ + "loadBalancingConfig":[{ + "cds_experimental":{ + "cluster": "%s" + } + }] + }`, clusterName) + scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) + state := xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient) + r.InitialState(state) + + // Create a ClientConn with the specified transport credentials. + cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(clientCreds), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("Failed to dial local test server: %v", err) } - return certprovider.NewBuildableConfig(b.name, []byte(s), func(certprovider.BuildOptions) certprovider.Provider { - return &fakeProvider{ - Distributor: certprovider.NewDistributor(), - config: s, - } - }), nil -} - -func (b *fakeProviderBuilder) Name() string { - return b.name -} + t.Cleanup(func() { cc.Close() }) -// fakeProvider is an implementation of the Provider interface which provides a -// method for tests to invoke to push new key materials. -type fakeProvider struct { - *certprovider.Distributor - config string -} + // Start a test service backend with the specified transport credentials. + sOpts := []grpc.ServerOption{} + if serverCreds != nil { + sOpts = append(sOpts, grpc.Creds(serverCreds)) + } + server := stubserver.StartTestService(t, nil, sOpts...) + t.Cleanup(server.Stop) -// Close helps implement the Provider interface. -func (p *fakeProvider) Close() { - p.Distributor.Stop() + return cc, server.Address } -// setupWithXDSCreds performs all the setup steps required for tests which use -// xDSCredentials. -func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *testutils.TestClientConn, func()) { +// Creates transport credentials to be used on the client side that rely on xDS +// to provide the security configuration. It falls back to insecure creds if no +// security information is received from the management server. +func xdsClientCredsWithInsecureFallback(t *testing.T) credentials.TransportCredentials { t.Helper() - xdsC := fakeclient.NewClient() - builder := balancer.Get(cdsName) - if builder == nil { - t.Fatalf("balancer.Get(%q) returned nil", cdsName) - } - // Create and pass xdsCredentials while building the CDS balancer. - creds, err := xds.NewClientCredentials(xds.ClientOptions{ - FallbackCreds: local.NewCredentials(), // Placeholder fallback credentials. - }) - if err != nil { - t.Fatalf("Failed to create xDS client creds: %v", err) - } - // Create a new CDS balancer and pass it a fake balancer.ClientConn which we - // can use to inspect the different calls made by the balancer. - tcc := testutils.NewTestClientConn(t) - cdsB := builder.Build(tcc, balancer.BuildOptions{DialCreds: creds}) - // Override the creation of the EDS balancer to return a fake EDS balancer - // implementation. - edsB := newTestEDSBalancer() - oldEDSBalancerBuilder := newChildBalancer - newChildBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) { - edsB.parentCC = cc - return edsB, nil + xdsCreds, err := xds.NewClientCredentials(xds.ClientOptions{FallbackCreds: insecure.NewCredentials()}) + if err != nil { + t.Fatalf("Failed to create xDS credentials: %v", err) } + return xdsCreds +} - // Push a ClientConnState update to the CDS balancer with a cluster name. - if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil { - t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err) - } +// Creates transport credentials to be used on the server side from certificate +// files in testdata/x509. +// +// The certificate returned by this function has a CommonName of "test-server1". +func tlsServerCreds(t *testing.T) credentials.TransportCredentials { + t.Helper() - // Make sure the CDS balancer registers a Cluster watch with the xDS client - // passed via attributes in the above update. - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - gotCluster, err := xdsC.WaitForWatchCluster(ctx) + cert, err := tls.LoadX509KeyPair(testdata.Path("x509/server1_cert.pem"), testdata.Path("x509/server1_key.pem")) if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + t.Fatalf("Failed to load server cert and key: %v", err) + } - if gotCluster != clusterName { - t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, clusterName) + pemData, err := os.ReadFile(testdata.Path("x509/client_ca_cert.pem")) + if err != nil { + t.Fatalf("Failed to read client CA cert: %v", err) } - - return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() { - newChildBalancer = oldEDSBalancerBuilder + roots := x509.NewCertPool() + roots.AppendCertsFromPEM(pemData) + cfg := &tls.Config{ + Certificates: []tls.Certificate{cert}, + ClientCAs: roots, } + return credentials.NewTLS(cfg) } -// makeNewSubConn invokes the NewSubConn() call on the balancer.ClientConn -// passed to the EDS balancer, and verifies that the CDS balancer forwards the -// call appropriately to its parent balancer.ClientConn with or without -// attributes bases on the value of wantFallback. -func makeNewSubConn(ctx context.Context, edsCC balancer.ClientConn, parentCC *testutils.TestClientConn, wantFallback bool) (balancer.SubConn, error) { - dummyAddr := "foo-address" - addrs := []resolver.Address{{Addr: dummyAddr}} - sc, err := edsCC.NewSubConn(addrs, balancer.NewSubConnOptions{}) - if err != nil { - return nil, fmt.Errorf("NewSubConn(%+v) on parent ClientConn failed: %v", addrs, err) - } +// Checks the AuthInfo available in the peer if it matches the expected security +// level of the connection. +func verifySecurityInformationFromPeer(t *testing.T, pr *peer.Peer, wantSecLevel e2e.SecurityLevel) { + // This is not a true helper in the Go sense, because it does not perform + // setup or cleanup tasks. Marking it a helper is to ensure that when the + // test fails, the line information of the caller is outputted instead of + // from here. + // + // And this function directly calls t.Fatalf() instead of returning an error + // and letting the caller decide what to do with it. This is also OK since + // all callers will simply end up calling t.Fatalf() with the returned + // error, and can't add any contextual information of value to the error + // message. + t.Helper() - select { - case <-ctx.Done(): - return nil, errors.New("timeout when waiting for new SubConn") - case gotAddrs := <-parentCC.NewSubConnAddrsCh: - if len(gotAddrs) != 1 { - return nil, fmt.Errorf("NewSubConn expected 1 address, got %d", len(gotAddrs)) + switch wantSecLevel { + case e2e.SecurityLevelNone: + if pr.AuthInfo.AuthType() != "insecure" { + t.Fatalf("AuthType() is %s, want insecure", pr.AuthInfo.AuthType()) } - if got, want := gotAddrs[0].Addr, addrs[0].Addr; got != want { - return nil, fmt.Errorf("resolver.Address passed to parent ClientConn has address %q, want %q", got, want) + case e2e.SecurityLevelMTLS: + ai, ok := pr.AuthInfo.(credentials.TLSInfo) + if !ok { + t.Fatalf("AuthInfo type is %T, want %T", pr.AuthInfo, credentials.TLSInfo{}) } - getHI := internal.GetXDSHandshakeInfoForTesting.(func(attr *attributes.Attributes) *xdscredsinternal.HandshakeInfo) - hi := getHI(gotAddrs[0].Attributes) - if hi == nil { - return nil, errors.New("resolver.Address passed to parent ClientConn doesn't contain attributes") + if len(ai.State.PeerCertificates) != 1 { + t.Fatalf("Number of peer certificates is %d, want 1", len(ai.State.PeerCertificates)) } - if gotFallback := hi.UseFallbackCreds(); gotFallback != wantFallback { - return nil, fmt.Errorf("resolver.Address HandshakeInfo uses fallback creds? %v, want %v", gotFallback, wantFallback) - } - if !wantFallback { - if diff := cmp.Diff(testSANMatchers, hi.GetSANMatchersForTesting(), cmp.AllowUnexported(regexp.Regexp{})); diff != "" { - return nil, fmt.Errorf("unexpected diff in the list of SAN matchers (-got, +want):\n%s", diff) - } + cert := ai.State.PeerCertificates[0] + const wantCommonName = "test-server1" + if cn := cert.Subject.CommonName; cn != wantCommonName { + t.Fatalf("Common name in peer certificate is %s, want %s", cn, wantCommonName) } } - return sc, nil } -// TestSecurityConfigWithoutXDSCreds tests the case where xdsCredentials are not -// in use, but the CDS balancer receives a Cluster update with security -// configuration. Verifies that no certificate providers are created, and that -// the address attributes added as part of the intercepted NewSubConn() method -// indicate the use of fallback credentials. +// Tests the case where xDS credentials are not in use, but the cds LB policy +// receives a Cluster update with security configuration. Verifies that the +// security configuration is not parsed by the cds LB policy by looking at the +// xDS handshake info passed to NewSubConn. func (s) TestSecurityConfigWithoutXDSCreds(t *testing.T) { - // This creates a CDS balancer, pushes a ClientConnState update with a fake - // xdsClient, and makes sure that the CDS balancer registers a watch on the - // provided xdsClient. - xdsC, cdsB, edsB, tcc, cancel := setupWithWatch(t) - defer func() { - cancel() - cdsB.Close() - }() - - // Override the provider builder function to push on a channel. We do not - // expect this function to be called as part of this test. - providerCh := testutils.NewChannel() - origBuildProvider := buildProvider - buildProvider = func(c map[string]*certprovider.BuildableConfig, id, cert string, wi, wr bool) (certprovider.Provider, error) { - p, err := origBuildProvider(c, id, cert, wi, wr) - providerCh.Send(nil) - return p, err - } - defer func() { buildProvider = origBuildProvider }() - - // Here we invoke the watch callback registered on the fake xdsClient. This - // will trigger the watch handler on the CDS balancer, which will attempt to - // create a new EDS balancer. The fake EDS balancer created above will be - // returned to the CDS balancer, because we have overridden the - // newChildBalancer function as part of test setup. - cdsUpdate := xdsresource.ClusterUpdate{ - ClusterName: serviceName, - LBPolicy: wrrLocalityLBConfigJSON, - } - wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfigJSON, noopODLBCfgJSON) - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { + // Register a wrapped cds LB policy for the duration of this test that writes + // the xDS handshake info passed to NewSubConn onto the given channel. + handshakeInfoCh := make(chan *xdscredsinternal.HandshakeInfo, 1) + registerWrappedCDSPolicyWithNewSubConnOverride(t, handshakeInfoCh) + + // Spin up an xDS management server. + mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + t.Cleanup(cleanup) + + // Create a grpc channel with insecure creds talking to a test server with + // insecure credentials. + cc, serverAddress := setupForSecurityTests(t, bootstrapContents, insecure.NewCredentials(), nil) + + // Configure cluster and endpoints resources in the management server. The + // cluster resource is configured to return security configuration. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } - // Make a NewSubConn and verify that the HandshakeInfo does not contain any - // certificate providers, forcing the credentials implementation to use - // fallback creds. - if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, true); err != nil { - t.Fatal(err) + // Verify that a successful RPC can be made. + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) } - // Again, since xdsCredentials are not in use, no certificate providers - // should have been initialized by the CDS balancer. - sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer sCancel() - if _, err := providerCh.Receive(sCtx); err != context.DeadlineExceeded { - t.Fatal("cds balancer created certificate providers when not using xds credentials") + // Ensure that the xDS handshake info passed to NewSubConn is empty. + var gotHI *xdscredsinternal.HandshakeInfo + select { + case gotHI = <-handshakeInfoCh: + case <-ctx.Done(): + t.Fatal("Timeout when waiting to read handshake info passed to NewSubConn") + } + wantHI := xdscredsinternal.NewHandshakeInfo(nil, nil) + if !cmp.Equal(gotHI, wantHI) { + t.Fatalf("NewSubConn got handshake info %+v, want %+v", gotHI, wantHI) } } -// TestNoSecurityConfigWithXDSCreds tests the case where xdsCredentials are in -// use, but the CDS balancer receives a Cluster update without security -// configuration. Verifies that no certificate providers are created, and that -// the address attributes added as part of the intercepted NewSubConn() method -// indicate the use of fallback credentials. +// Tests the case where xDS credentials are in use, but the cds LB policy +// receives a Cluster update without security configuration. Verifies that the +// xDS handshake info passed to NewSubConn specified the use of fallback +// credentials. func (s) TestNoSecurityConfigWithXDSCreds(t *testing.T) { - // This creates a CDS balancer which uses xdsCredentials, pushes a - // ClientConnState update with a fake xdsClient, and makes sure that the CDS - // balancer registers a watch on the provided xdsClient. - xdsC, cdsB, edsB, tcc, cancel := setupWithXDSCreds(t) - defer func() { - cancel() - cdsB.Close() - }() - - // Override the provider builder function to push on a channel. We do not - // expect this function to be called as part of this test. - providerCh := testutils.NewChannel() - origBuildProvider := buildProvider - buildProvider = func(c map[string]*certprovider.BuildableConfig, id, cert string, wi, wr bool) (certprovider.Provider, error) { - p, err := origBuildProvider(c, id, cert, wi, wr) - providerCh.Send(nil) - return p, err - } - defer func() { buildProvider = origBuildProvider }() - - // Here we invoke the watch callback registered on the fake xdsClient. This - // will trigger the watch handler on the CDS balancer, which will attempt to - // create a new EDS balancer. The fake EDS balancer created above will be - // returned to the CDS balancer, because we have overridden the - // newChildBalancer function as part of test setup. No security config is - // passed to the CDS balancer as part of this update. - cdsUpdate := xdsresource.ClusterUpdate{ - ClusterName: serviceName, - LBPolicy: wrrLocalityLBConfigJSON, - } - wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfigJSON, noopODLBCfgJSON) - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { + // Register a wrapped cds LB policy for the duration of this test that writes + // the xDS handshake info passed to NewSubConn onto the given channel. + handshakeInfoCh := make(chan *xdscredsinternal.HandshakeInfo, 1) + registerWrappedCDSPolicyWithNewSubConnOverride(t, handshakeInfoCh) + + // Spin up an xDS management server. + mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + t.Cleanup(cleanup) + + // Create a grpc channel with xDS creds talking to a test server with + // insecure credentials. + cc, serverAddress := setupForSecurityTests(t, bootstrapContents, xdsClientCredsWithInsecureFallback(t), nil) + + // Configure cluster and endpoints resources in the management server. The + // cluster resource is not configured to return any security configuration. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } - // Make a NewSubConn and verify that the HandshakeInfo does not contain any - // certificate providers, forcing the credentials implementation to use - // fallback creds. - if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, true); err != nil { - t.Fatal(err) + // Verify that a successful RPC can be made. + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) } - // Again, since no security configuration was received, no certificate - // providers should have been initialized by the CDS balancer. - sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer sCancel() - if _, err := providerCh.Receive(sCtx); err != context.DeadlineExceeded { - t.Fatal("cds balancer created certificate providers when not using xds credentials") + // Ensure that the xDS handshake info passed to NewSubConn is empty. + var gotHI *xdscredsinternal.HandshakeInfo + select { + case gotHI = <-handshakeInfoCh: + case <-ctx.Done(): + t.Fatal("Timeout when waiting to read handshake info passed to NewSubConn") + } + wantHI := xdscredsinternal.NewHandshakeInfo(nil, nil) + if !cmp.Equal(gotHI, wantHI) { + t.Fatalf("NewSubConn got handshake info %+v, want %+v", gotHI, wantHI) + } + if !gotHI.UseFallbackCreds() { + t.Fatal("NewSubConn got hanshake info that does not specify the use of fallback creds") } } -// TestSecurityConfigNotFoundInBootstrap tests the case where the security -// config returned by the xDS server cannot be resolved based on the contents of -// the bootstrap config. Verifies that the balancer puts the channel in a failed -// state, and returns an error picker. +// Tests the case where the security config returned by the management server +// cannot be resolved based on the contents of the bootstrap config. Verifies +// that the cds LB policy puts the channel in TRANSIENT_FAILURE. func (s) TestSecurityConfigNotFoundInBootstrap(t *testing.T) { - // We test two cases here: - // 0: Bootstrap contains security config. But received plugin instance name - // is not found in the bootstrap config. - // 1: Bootstrap contains no security config. - for i := 0; i < 2; i++ { - // This creates a CDS balancer which uses xdsCredentials, pushes a - // ClientConnState update with a fake xdsClient, and makes sure that the CDS - // balancer registers a watch on the provided xdsClient. - xdsC, cdsB, edsB, tcc, cancel := setupWithXDSCreds(t) - defer func() { - cancel() - cdsB.Close() - }() - - if i == 0 { - // Set the bootstrap config used by the fake client. - xdsC.SetBootstrapConfig(bootstrapConfig) - } + // Spin up an xDS management server. + mgmtServer, nodeID, _, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + t.Cleanup(cleanup) + + // Ignore the bootstrap configuration returned by the above call to + // e2e.SetupManagementServer and create a new one that does not have + // ceritificate providers configuration. + bootstrapContents, err := xdsbootstrap.Contents(xdsbootstrap.Options{ + NodeID: nodeID, + ServerURI: mgmtServer.Address, + ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate, + }) + if err != nil { + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } - // Here we invoke the watch callback registered on the fake xdsClient. A bad - // security config is passed here. So, we expect the CDS balancer to not - // create an EDS balancer and instead reject this update and put the channel - // in a bad state. - xdsC.InvokeWatchClusterCallback(cdsUpdateWithMissingSecurityCfg, nil) - - // The CDS balancer has not yet created an EDS balancer. So, this bad - // watcher update should not be forwarded forwarded to our fake EDS balancer - // as an error. - sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer sCancel() - if err := edsB.waitForResolverError(sCtx, nil); err != context.DeadlineExceeded { - t.Fatal("eds balancer shouldn't get error (shouldn't be built yet)") - } + // Create a grpc channel with xDS creds. + cc, _ := setupForSecurityTests(t, bootstrapContents, xdsClientCredsWithInsecureFallback(t), nil) + + // Configure a cluster resource that contains security configuration, in the + // management server. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } - // Make sure the CDS balancer reports an error picker. - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - if err := tcc.WaitForErrPicker(ctx); err != nil { - t.Fatal(err) + for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatal("Timed out waiting for channel to enter TRANSIENT_FAILURE") } } } -// TestCertproviderStoreError tests the case where the certprovider.Store -// returns an error when the CDS balancer attempts to create a provider. -func (s) TestCertproviderStoreError(t *testing.T) { - // This creates a CDS balancer which uses xdsCredentials, pushes a - // ClientConnState update with a fake xdsClient, and makes sure that the CDS - // balancer registers a watch on the provided xdsClient. - xdsC, cdsB, edsB, tcc, cancel := setupWithXDSCreds(t) - defer func() { - cancel() - cdsB.Close() - }() - - // Override the provider builder function to return an error. - origBuildProvider := buildProvider - buildProvider = func(c map[string]*certprovider.BuildableConfig, id, cert string, wi, wr bool) (certprovider.Provider, error) { - return nil, errors.New("certprovider store error") - } - defer func() { buildProvider = origBuildProvider }() - - // Set the bootstrap config used by the fake client. - xdsC.SetBootstrapConfig(bootstrapConfig) - - // Here we invoke the watch callback registered on the fake xdsClient. Even - // though the received update is good, the certprovider.Store is configured - // to return an error. So, CDS balancer should reject this config and report - // an error. - xdsC.InvokeWatchClusterCallback(cdsUpdateWithGoodSecurityCfg, nil) - - // The CDS balancer has not yet created an EDS balancer. So, this bad - // watcher update should not be forwarded forwarded to our fake EDS balancer - // as an error. - sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer sCancel() - if err := edsB.waitForResolverError(sCtx, nil); err != context.DeadlineExceeded { - t.Fatal("eds balancer shouldn't get error (shouldn't be built yet)") - } - - // Make sure the CDS balancer reports an error picker. - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - if err := tcc.WaitForErrPicker(ctx); err != nil { - t.Fatal(err) - } +// A ceritificate provider builder that returns a nil Provider from the starter +// func passed to certprovider.NewBuildableConfig(). +type errCertProviderBuilder struct{} + +const errCertProviderName = "err-cert-provider" + +func (e errCertProviderBuilder) ParseConfig(any) (*certprovider.BuildableConfig, error) { + // Returning a nil Provider simulates the case where an error is encountered + // at the time of building the Provider. + bc := certprovider.NewBuildableConfig(errCertProviderName, nil, func(certprovider.BuildOptions) certprovider.Provider { return nil }) + return bc, nil } -func (s) TestSecurityConfigUpdate_BadToGood(t *testing.T) { - // This creates a CDS balancer which uses xdsCredentials, pushes a - // ClientConnState update with a fake xdsClient, and makes sure that the CDS - // balancer registers a watch on the provided xdsClient. - xdsC, cdsB, edsB, tcc, cancel := setupWithXDSCreds(t) - defer func() { - cancel() - cdsB.Close() - }() - - // Set the bootstrap config used by the fake client. - xdsC.SetBootstrapConfig(bootstrapConfig) - - // Here we invoke the watch callback registered on the fake xdsClient. A bad - // security config is passed here. So, we expect the CDS balancer to not - // create an EDS balancer and instead reject this update and put the channel - // in a bad state. - xdsC.InvokeWatchClusterCallback(cdsUpdateWithMissingSecurityCfg, nil) - - // The CDS balancer has not yet created an EDS balancer. So, this bad - // watcher update should not be forwarded forwarded to our fake EDS balancer - // as an error. - sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer sCancel() - if err := edsB.waitForResolverError(sCtx, nil); err != context.DeadlineExceeded { - t.Fatal("eds balancer shouldn't get error (shouldn't be built yet)") - } - - // Make sure the CDS balancer reports an error picker. - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - if err := tcc.WaitForErrPicker(ctx); err != nil { - t.Fatal(err) +func (e errCertProviderBuilder) Name() string { + return errCertProviderName +} + +func init() { + certprovider.Register(errCertProviderBuilder{}) +} + +// Tests the case where the certprovider.Store returns an error when the cds LB +// policy attempts to build a certificate provider. Verifies that the cds LB +// policy puts the channel in TRANSIENT_FAILURE. +func (s) TestCertproviderStoreError(t *testing.T) { + mgmtServer, nodeID, _, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + t.Cleanup(cleanup) + + // Ignore the bootstrap configuration returned by the above call to + // e2e.SetupManagementServer and create a new one that includes ceritificate + // providers configuration for errCertProviderBuilder. + providerCfg := json.RawMessage(fmt.Sprintf(`{ + "plugin_name": "%s", + "config": {} + }`, errCertProviderName)) + bootstrapContents, err := xdsbootstrap.Contents(xdsbootstrap.Options{ + NodeID: nodeID, + ServerURI: mgmtServer.Address, + CertificateProviders: map[string]json.RawMessage{e2e.ClientSideCertProviderInstance: providerCfg}, + ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate, + }) + if err != nil { + t.Fatalf("Failed to create bootstrap configuration: %v", err) } - // Here we invoke the watch callback registered on the fake xdsClient. This - // will trigger the watch handler on the CDS balancer, which will attempt to - // create a new EDS balancer. The fake EDS balancer created above will be - // returned to the CDS balancer, because we have overridden the - // newChildBalancer function as part of test setup. - wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfigJSON, noopODLBCfgJSON) - if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil { + // Create a grpc channel with xDS creds. + cc, _ := setupForSecurityTests(t, bootstrapContents, xdsClientCredsWithInsecureFallback(t), nil) + + // Configure a cluster resource that contains security configuration, in the + // management server. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } - // Make a NewSubConn and verify that attributes are added. - if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil { - t.Fatal(err) + for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatal("Timed out waiting for channel to enter TRANSIENT_FAILURE") + } } } -// TestGoodSecurityConfig tests the case where the CDS balancer receives -// security configuration as part of the Cluster resource which can be -// successfully resolved using the bootstrap file contents. Verifies that -// certificate providers are created, and that the NewSubConn() call adds -// appropriate address attributes. +// Tests the case where the cds LB policy receives security configuration as +// part of the Cluster resource that can be successfully resolved using the +// bootstrap file contents. Verifies that the connection between the client and +// the server is secure. func (s) TestGoodSecurityConfig(t *testing.T) { - // This creates a CDS balancer which uses xdsCredentials, pushes a - // ClientConnState update with a fake xdsClient, and makes sure that the CDS - // balancer registers a watch on the provided xdsClient. - xdsC, cdsB, edsB, tcc, cancel := setupWithXDSCreds(t) - defer func() { - cancel() - cdsB.Close() - }() - - // Set the bootstrap config used by the fake client. - xdsC.SetBootstrapConfig(bootstrapConfig) - - // Here we invoke the watch callback registered on the fake xdsClient. This - // will trigger the watch handler on the CDS balancer, which will attempt to - // create a new EDS balancer. The fake EDS balancer created above will be - // returned to the CDS balancer, because we have overridden the - // newChildBalancer function as part of test setup. - wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfigJSON, noopODLBCfgJSON) - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil { + // Spin up an xDS management server. + mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + t.Cleanup(cleanup) + + // Create a grpc channel with xDS creds talking to a test server with TLS + // credentials. + cc, serverAddress := setupForSecurityTests(t, bootstrapContents, xdsClientCredsWithInsecureFallback(t), tlsServerCreds(t)) + + // Configure cluster and endpoints resources in the management server. The + // cluster resource is configured to return security configuration. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } - // Make a NewSubConn and verify that attributes are added. - sc, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false) - if err != nil { - t.Fatal(err) - } - - // Invoke UpdateAddresses and verify that attributes are added. - dummyAddr := "bar-address" - addrs := []resolver.Address{{Addr: dummyAddr}} - edsB.parentCC.UpdateAddresses(sc, addrs) - select { - case <-ctx.Done(): - t.Fatal("timeout when waiting for addresses to be updated on the subConn") - case gotAddrs := <-tcc.UpdateAddressesAddrsCh: - if len(gotAddrs) != 1 { - t.Fatalf("UpdateAddresses expected 1 address, got %d", len(gotAddrs)) - } - if got, want := gotAddrs[0].Addr, addrs[0].Addr; got != want { - t.Fatalf("resolver.Address passed to parent ClientConn through UpdateAddresses() has address %q, want %q", got, want) - } - getHI := internal.GetXDSHandshakeInfoForTesting.(func(attr *attributes.Attributes) *xdscredsinternal.HandshakeInfo) - hi := getHI(gotAddrs[0].Attributes) - if hi == nil { - t.Fatal("resolver.Address passed to parent ClientConn through UpdateAddresses() doesn't contain attributes") - } + // Verify that a successful RPC can be made over a secure connection. + client := testgrpc.NewTestServiceClient(cc) + peer := &peer.Peer{} + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(peer)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) } + verifySecurityInformationFromPeer(t, peer, e2e.SecurityLevelMTLS) } -func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) { - // This creates a CDS balancer which uses xdsCredentials, pushes a - // ClientConnState update with a fake xdsClient, and makes sure that the CDS - // balancer registers a watch on the provided xdsClient. - xdsC, cdsB, edsB, tcc, cancel := setupWithXDSCreds(t) - defer func() { - cancel() - cdsB.Close() - }() - - // Set the bootstrap config used by the fake client. - xdsC.SetBootstrapConfig(bootstrapConfig) - - // Here we invoke the watch callback registered on the fake xdsClient. This - // will trigger the watch handler on the CDS balancer, which will attempt to - // create a new EDS balancer. The fake EDS balancer created above will be - // returned to the CDS balancer, because we have overridden the - // newChildBalancer function as part of test setup. - wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfigJSON, noopODLBCfgJSON) - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil { +// Tests the case where the cds LB policy receives security configuration as +// part of the Cluster resource that contains a certificate provider instance +// that is missing in the bootstrap file. Verifies that the channel moves to +// TRANSIENT_FAILURE. Subsequently, the cds LB policy receives a cluster +// resource that contains a certificate provider that is present in the +// bootstrap file. Verifies that the connection between the client and the +// server is secure. +func (s) TestSecurityConfigUpdate_BadToGood(t *testing.T) { + // Spin up an xDS management server. + mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + t.Cleanup(cleanup) + + // Create a grpc channel with xDS creds talking to a test server with TLS + // credentials. + cc, serverAddress := setupForSecurityTests(t, bootstrapContents, xdsClientCredsWithInsecureFallback(t), tlsServerCreds(t)) + + // Configure cluster and endpoints resources in the management server. The + // cluster resource contains security configuration with a certificate + // provider instance that is missing in the bootstrap configuration. + cluster := e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone) + cluster.TransportSocket = &v3corepb.TransportSocket{ + Name: "envoy.transport_sockets.tls", + ConfigType: &v3corepb.TransportSocket_TypedConfig{ + TypedConfig: testutils.MarshalAny(&v3tlspb.UpstreamTlsContext{ + CommonTlsContext: &v3tlspb.CommonTlsContext{ + ValidationContextType: &v3tlspb.CommonTlsContext_ValidationContextCertificateProviderInstance{ + ValidationContextCertificateProviderInstance: &v3tlspb.CommonTlsContext_CertificateProviderInstance{ + InstanceName: "unknown-certificate-provider-instance", + }, + }, + }, + }), + }, + } + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{cluster}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } - // Make a NewSubConn and verify that attributes are added. - if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil { - t.Fatal(err) + for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatal("Timed out waiting for channel to enter TRANSIENT_FAILURE") + } } - // Here we invoke the watch callback registered on the fake xdsClient with - // an update which contains bad security config. So, we expect the CDS - // balancer to forward this error to the EDS balancer and eventually the - // channel needs to be put in a bad state. - cdsUpdate := xdsresource.ClusterUpdate{ - ClusterName: serviceName, - LBPolicy: wrrLocalityLBConfigJSON, + // Update the management server with a Cluster resource that contains a + // certificate provider instance that is present in the bootstrap + // configuration. + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})}, + SkipValidation: true, } - if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { + if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } - // Make a NewSubConn and verify that fallback creds are used. - if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, true); err != nil { - t.Fatal(err) + // Verify that a successful RPC can be made over a secure connection. + client := testgrpc.NewTestServiceClient(cc) + peer := &peer.Peer{} + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(peer)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) } + verifySecurityInformationFromPeer(t, peer, e2e.SecurityLevelMTLS) } -// TestSecurityConfigUpdate_GoodToBad tests the case where the first security -// config returned by the xDS server is successful, but the second update cannot -// be resolved based on the contents of the bootstrap config. Verifies that the -// error is forwarded to the EDS balancer (which was created as part of the -// first successful update). -func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) { - // This creates a CDS balancer which uses xdsCredentials, pushes a - // ClientConnState update with a fake xdsClient, and makes sure that the CDS - // balancer registers a watch on the provided xdsClient. - xdsC, cdsB, edsB, tcc, cancel := setupWithXDSCreds(t) - defer func() { - cancel() - cdsB.Close() - }() - - // Set the bootstrap config used by the fake client. - xdsC.SetBootstrapConfig(bootstrapConfig) - - // Here we invoke the watch callback registered on the fake xdsClient. This - // will trigger the watch handler on the CDS balancer, which will attempt to - // create a new EDS balancer. The fake EDS balancer created above will be - // returned to the CDS balancer, because we have overridden the - // newChildBalancer function as part of test setup. - wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfigJSON, noopODLBCfgJSON) - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil { +// Tests the case where the cds LB policy receives security configuration as +// part of the Cluster resource that can be successfully resolved using the +// bootstrap file contents. Verifies that the connection between the client and +// the server is secure. Subsequently, the cds LB policy receives a cluster +// resource without security configuration. Verifies that this results in the +// use of fallback credentials, which in this case is insecure creds. +func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) { + // Spin up an xDS management server. + mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + t.Cleanup(cleanup) + + // Create a grpc channel with xDS creds talking to a test server with TLS + // credentials. + cc, serverAddress := setupForSecurityTests(t, bootstrapContents, xdsClientCredsWithInsecureFallback(t), tlsServerCreds(t)) + + // Configure cluster and endpoints resources in the management server. The + // cluster resource is configured to return security configuration. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } - // Make a NewSubConn and verify that attributes are added. - if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil { - t.Fatal(err) + // Verify that a successful RPC can be made over a secure connection. + client := testgrpc.NewTestServiceClient(cc) + peer := &peer.Peer{} + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(peer)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) } + verifySecurityInformationFromPeer(t, peer, e2e.SecurityLevelMTLS) - // Here we invoke the watch callback registered on the fake xdsClient with - // an update which contains bad security config. So, we expect the CDS - // balancer to forward this error to the EDS balancer and eventually the - // channel needs to be put in a bad state. - xdsC.InvokeWatchClusterCallback(cdsUpdateWithMissingSecurityCfg, nil) + // Start a test service backend that does not expect a secure connection. + insecureServer := stubserver.StartTestService(t, nil) + t.Cleanup(insecureServer.Stop) - // We manually check that an error is forwarded to the EDS balancer instead - // of using one of the helper methods on the testEDSBalancer, because all we - // care here is whether an error is sent to it or not. We don't care about - // the exact error. - gotErr, err := edsB.resolverErrCh.Receive(ctx) - if err != nil { - t.Fatal("timeout waiting for CDS balancer to forward error to EDS balancer upon receipt of bad security config") - } - if gotErr == nil { - t.Fatal("CDS balancer did not forward error to EDS balancer upon receipt of bad security config") + // Update the resources in the management server to contain no security + // configuration. This should result in the use of fallback credentials, + // which is insecure in our case. + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, insecureServer.Address)})}, + SkipValidation: true, } - - // Since the error being pushed here is not a resource-not-found-error, the - // registered watch should not be cancelled. - sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer sCancel() - if _, err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded { - t.Fatal("cluster watch cancelled for a non-resource-not-found-error") - } -} - -// TestSecurityConfigUpdate_GoodToGood tests the case where the CDS balancer -// receives two different but successful updates with security configuration. -// Verifies that appropriate providers are created, and that address attributes -// are added. -func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) { - // This creates a CDS balancer which uses xdsCredentials, pushes a - // ClientConnState update with a fake xdsClient, and makes sure that the CDS - // balancer registers a watch on the provided xdsClient. - xdsC, cdsB, edsB, tcc, cancel := setupWithXDSCreds(t) - defer func() { - cancel() - cdsB.Close() - }() - - // Override the provider builder function to push on a channel. - providerCh := testutils.NewChannel() - origBuildProvider := buildProvider - buildProvider = func(c map[string]*certprovider.BuildableConfig, id, cert string, wi, wr bool) (certprovider.Provider, error) { - p, err := origBuildProvider(c, id, cert, wi, wr) - providerCh.Send(nil) - return p, err - } - defer func() { buildProvider = origBuildProvider }() - - // Set the bootstrap config used by the fake client. - xdsC.SetBootstrapConfig(bootstrapConfig) - - // Here we invoke the watch callback registered on the fake xdsClient. This - // will trigger the watch handler on the CDS balancer, which will attempt to - // create a new EDS balancer. The fake EDS balancer created above will be - // returned to the CDS balancer, because we have overridden the - // newChildBalancer function as part of test setup. - cdsUpdate := xdsresource.ClusterUpdate{ - ClusterName: serviceName, - SecurityCfg: &xdsresource.SecurityConfig{ - RootInstanceName: "default1", - SubjectAltNameMatchers: testSANMatchers, - }, - LBPolicy: wrrLocalityLBConfigJSON, - } - wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfigJSON, noopODLBCfgJSON) - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { + if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } - // We specified only the root provider. So, expect only one provider here. - if _, err := providerCh.Receive(ctx); err != nil { - t.Fatalf("Failed to create certificate provider upon receipt of security config") + // Wait for the connection to move to the new backend that expects + // connections without security. + for ctx.Err() == nil { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(peer)); err != nil { + t.Logf("EmptyCall() failed: %v", err) + } + if peer.Addr.String() == insecureServer.Address { + break + } + } + if ctx.Err() != nil { + t.Fatal("Timed out when waiting for connection to switch to second backend") } + verifySecurityInformationFromPeer(t, peer, e2e.SecurityLevelNone) +} - // Make a NewSubConn and verify that attributes are added. - if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil { +// Tests the case where the cds LB policy receives security configuration as +// part of the Cluster resource that can be successfully resolved using the +// bootstrap file contents. Verifies that the connection between the client and +// the server is secure. Subsequently, the cds LB policy receives a cluster +// resource that is NACKed by the xDS client. Test verifies that the cds LB +// policy continues to use the previous good configuration, but the error from +// the xDS client is propagated to the child policy. +func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) { + // Register a wrapped clusterresolver LB policy (child policy of the cds LB + // policy) for the duration of this test that makes the resolver error + // pushed to it available to the test. + _, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t) + + // Spin up an xDS management server. + mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + t.Cleanup(cleanup) + + // Create a grpc channel with xDS creds talking to a test server with TLS + // credentials. + cc, serverAddress := setupForSecurityTests(t, bootstrapContents, xdsClientCredsWithInsecureFallback(t), tlsServerCreds(t)) + + // Configure cluster and endpoints resources in the management server. The + // cluster resource is configured to return security configuration. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } - // Push another update with a new security configuration. - cdsUpdate = xdsresource.ClusterUpdate{ - ClusterName: serviceName, - SecurityCfg: &xdsresource.SecurityConfig{ - RootInstanceName: "default2", - SubjectAltNameMatchers: testSANMatchers, + // Verify that a successful RPC can be made over a secure connection. + client := testgrpc.NewTestServiceClient(cc) + peer := &peer.Peer{} + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(peer)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + verifySecurityInformationFromPeer(t, peer, e2e.SecurityLevelMTLS) + + // Configure cluster and endpoints resources in the management server. The + // cluster resource contains security configuration with a certificate + // provider instance that is missing in the bootstrap configuration. + cluster := e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone) + cluster.TransportSocket = &v3corepb.TransportSocket{ + Name: "envoy.transport_sockets.tls", + ConfigType: &v3corepb.TransportSocket_TypedConfig{ + TypedConfig: testutils.MarshalAny(&v3tlspb.UpstreamTlsContext{ + CommonTlsContext: &v3tlspb.CommonTlsContext{ + ValidationContextType: &v3tlspb.CommonTlsContext_ValidationContextCertificateProviderInstance{ + ValidationContextCertificateProviderInstance: &v3tlspb.CommonTlsContext_CertificateProviderInstance{ + InstanceName: "unknown-certificate-provider-instance", + }, + }, + }, + }), }, - LBPolicy: wrrLocalityLBConfigJSON, } - if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{cluster}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } - // We specified only the root provider. So, expect only one provider here. - if _, err := providerCh.Receive(ctx); err != nil { - t.Fatalf("Failed to create certificate provider upon receipt of security config") + const wantNACKErr = "instance name \"unknown-certificate-provider-instance\" missing in bootstrap configuration" + select { + case err := <-resolverErrCh: + if !strings.Contains(err.Error(), wantNACKErr) { + t.Fatalf("Child policy got resolver error: %v, want err: %v", err, wantNACKErr) + } + case <-ctx.Done(): + t.Fatal("Timeout when waiting for resolver error to be pushed to the child policy") } - // Make a NewSubConn and verify that attributes are added. - if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil { - t.Fatal(err) + // Verify that a successful RPC can be made over a secure connection. + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) } - - // The HandshakeInfo type does not expose its internals. So, we cannot - // verify that the HandshakeInfo carried by the attributes have actually - // been changed. This will be covered in e2e/interop tests. - // TODO(easwars): Remove this TODO once appropriate e2e/intertop tests have - // been added. + verifySecurityInformationFromPeer(t, peer, e2e.SecurityLevelMTLS) } diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index b0edc13a4a58..050449c9643e 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -26,7 +26,6 @@ import ( "time" "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" @@ -35,7 +34,6 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/grpctest" - iserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" @@ -44,9 +42,6 @@ import ( "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal/balancer/clusterresolver" - "google.golang.org/grpc/xds/internal/balancer/outlierdetection" - "google.golang.org/grpc/xds/internal/balancer/wrrlocality" - "google.golang.org/grpc/xds/internal/testutils/fakeclient" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" @@ -71,26 +66,6 @@ const ( defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. ) -var ( - defaultTestAuthorityServerConfig = &bootstrap.ServerConfig{ - ServerURI: "self_server", - Creds: bootstrap.ChannelCreds{ - Type: "insecure", - }, - } - noopODLBCfg = outlierdetection.LBConfig{} - noopODLBCfgJSON, _ = json.Marshal(noopODLBCfg) - wrrLocalityLBConfig = &iserviceconfig.BalancerConfig{ - Name: wrrlocality.Name, - Config: &wrrlocality.LBConfig{ - ChildPolicy: &iserviceconfig.BalancerConfig{ - Name: "round_robin", - }, - }, - } - wrrLocalityLBConfigJSON, _ = json.Marshal(wrrLocalityLBConfig) -) - type s struct { grpctest.Tester } @@ -99,229 +74,6 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } -// cdsWatchInfo wraps the update and the error sent in a CDS watch callback. -type cdsWatchInfo struct { - update xdsresource.ClusterUpdate - err error -} - -// invokeWatchCb invokes the CDS watch callback registered by the cdsBalancer -// and waits for appropriate state to be pushed to the provided edsBalancer. -func invokeWatchCbAndWait(ctx context.Context, xdsC *fakeclient.Client, cdsW cdsWatchInfo, wantCCS balancer.ClientConnState, edsB *testEDSBalancer) error { - xdsC.InvokeWatchClusterCallback(cdsW.update, cdsW.err) - if cdsW.err != nil { - return edsB.waitForResolverError(ctx, cdsW.err) - } - return edsB.waitForClientConnUpdate(ctx, wantCCS) -} - -// testEDSBalancer is a fake edsBalancer used to verify different actions from -// the cdsBalancer. It contains a bunch of channels to signal different events -// to the test. -type testEDSBalancer struct { - // ccsCh is a channel used to signal the receipt of a ClientConn update. - ccsCh *testutils.Channel - // scStateCh is a channel used to signal the receipt of a SubConn update. - scStateCh *testutils.Channel - // resolverErrCh is a channel used to signal a resolver error. - resolverErrCh *testutils.Channel - // closeCh is a channel used to signal the closing of this balancer. - closeCh *testutils.Channel - exitIdleCh *testutils.Channel - // parentCC is the balancer.ClientConn passed to this test balancer as part - // of the Build() call. - parentCC balancer.ClientConn -} - -type subConnWithState struct { - sc balancer.SubConn - state balancer.SubConnState -} - -func newTestEDSBalancer() *testEDSBalancer { - return &testEDSBalancer{ - ccsCh: testutils.NewChannel(), - scStateCh: testutils.NewChannel(), - resolverErrCh: testutils.NewChannel(), - closeCh: testutils.NewChannel(), - exitIdleCh: testutils.NewChannel(), - } -} - -func (tb *testEDSBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { - tb.ccsCh.Send(ccs) - return nil -} - -func (tb *testEDSBalancer) ResolverError(err error) { - tb.resolverErrCh.Send(err) -} - -func (tb *testEDSBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { - tb.scStateCh.Send(subConnWithState{sc: sc, state: state}) -} - -func (tb *testEDSBalancer) Close() { - tb.closeCh.Send(struct{}{}) -} - -func (tb *testEDSBalancer) ExitIdle() { - tb.exitIdleCh.Send(struct{}{}) -} - -// waitForClientConnUpdate verifies if the testEDSBalancer receives the -// provided ClientConnState within a reasonable amount of time. -func (tb *testEDSBalancer) waitForClientConnUpdate(ctx context.Context, wantCCS balancer.ClientConnState) error { - ccs, err := tb.ccsCh.Receive(ctx) - if err != nil { - return err - } - gotCCS := ccs.(balancer.ClientConnState) - if xdsclient.FromResolverState(gotCCS.ResolverState) == nil { - return fmt.Errorf("want resolver state with XDSClient attached, got one without") - } - - // Calls into Cluster Resolver LB Config Equal(), which ignores JSON - // configuration but compares the Parsed Configuration of the JSON fields - // emitted from ParseConfig() on the cluster resolver. - if diff := cmp.Diff(gotCCS, wantCCS, cmpopts.IgnoreFields(resolver.State{}, "Attributes"), cmp.AllowUnexported(clusterresolver.LBConfig{})); diff != "" { - return fmt.Errorf("received unexpected ClientConnState, diff (-got +want): %v", diff) - } - return nil -} - -// waitForResolverError verifies if the testEDSBalancer receives the provided -// resolver error before the context expires. -func (tb *testEDSBalancer) waitForResolverError(ctx context.Context, wantErr error) error { - gotErr, err := tb.resolverErrCh.Receive(ctx) - if err != nil { - return err - } - if gotErr != wantErr { - return fmt.Errorf("received resolver error: %v, want %v", gotErr, wantErr) - } - return nil -} - -// cdsCCS is a helper function to construct a good update passed from the -// xdsResolver to the cdsBalancer. -func cdsCCS(cluster string, xdsC xdsclient.XDSClient) balancer.ClientConnState { - const cdsLBConfig = `{ - "loadBalancingConfig":[ - { - "cds_experimental":{ - "Cluster": "%s" - } - } - ] - }` - jsonSC := fmt.Sprintf(cdsLBConfig, cluster) - return balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{ - ServiceConfig: internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC), - }, xdsC), - BalancerConfig: &lbConfig{ClusterName: clusterName}, - } -} - -// edsCCS is a helper function to construct a Client Conn update which -// represents what the CDS Balancer passes to the Cluster Resolver. It calls -// into Cluster Resolver's ParseConfig to get the service config to fill out the -// Client Conn State. This is to fill out unexported parts of the Cluster -// Resolver config struct. Returns an empty Client Conn State if it encounters -// an error building out the Client Conn State. -func edsCCS(service string, countMax *uint32, enableLRS bool, xdslbpolicy json.RawMessage, odConfig json.RawMessage) balancer.ClientConnState { - builder := balancer.Get(clusterresolver.Name) - if builder == nil { - // Shouldn't happen, registered through imported Cluster Resolver, - // defensive programming. - logger.Errorf("%q LB policy is needed but not registered", clusterresolver.Name) - return balancer.ClientConnState{} // will fail the calling test eventually through error in diff. - } - crParser, ok := builder.(balancer.ConfigParser) - if !ok { - // Shouldn't happen, imported Cluster Resolver builder has this method. - logger.Errorf("%q LB policy does not implement a config parser", clusterresolver.Name) - return balancer.ClientConnState{} - } - discoveryMechanism := clusterresolver.DiscoveryMechanism{ - Type: clusterresolver.DiscoveryMechanismTypeEDS, - Cluster: service, - MaxConcurrentRequests: countMax, - OutlierDetection: odConfig, - } - if enableLRS { - discoveryMechanism.LoadReportingServer = defaultTestAuthorityServerConfig - } - lbCfg := &clusterresolver.LBConfig{ - DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{discoveryMechanism}, - XDSLBPolicy: xdslbpolicy, - } - - crLBCfgJSON, err := json.Marshal(lbCfg) - if err != nil { - // Shouldn't happen, since we just prepared struct. - logger.Errorf("cds_balancer: error marshalling prepared config: %v", lbCfg) - return balancer.ClientConnState{} - } - - var sc serviceconfig.LoadBalancingConfig - if sc, err = crParser.ParseConfig(crLBCfgJSON); err != nil { - logger.Errorf("cds_balancer: cluster_resolver config generated %v is invalid: %v", crLBCfgJSON, err) - return balancer.ClientConnState{} - } - - return balancer.ClientConnState{ - BalancerConfig: sc, - } -} - -// setup creates a cdsBalancer and an edsBalancer (and overrides the -// newChildBalancer function to return it), and also returns a cleanup function. -func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *testutils.TestClientConn, func()) { - t.Helper() - xdsC := fakeclient.NewClient() - builder := balancer.Get(cdsName) - if builder == nil { - t.Fatalf("balancer.Get(%q) returned nil", cdsName) - } - tcc := testutils.NewTestClientConn(t) - cdsB := builder.Build(tcc, balancer.BuildOptions{}) - - edsB := newTestEDSBalancer() - oldEDSBalancerBuilder := newChildBalancer - newChildBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) { - edsB.parentCC = cc - return edsB, nil - } - - return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() { - newChildBalancer = oldEDSBalancerBuilder - } -} - -// setupWithWatch does everything that setup does, and also pushes a ClientConn -// update to the cdsBalancer and waits for a CDS watch call to be registered. -func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *testutils.TestClientConn, func()) { - t.Helper() - - xdsC, cdsB, edsB, tcc, cancel := setup(t) - if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil { - t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err) - } - - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - gotCluster, err := xdsC.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) - } - if gotCluster != clusterName { - t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, clusterName) - } - return xdsC, cdsB, edsB, tcc, cancel -} - func waitForResourceNames(ctx context.Context, resourceNamesCh chan []string, wantNames []string) error { for ctx.Err() == nil { select {