diff --git a/internal/stubserver/stubserver.go b/internal/stubserver/stubserver.go new file mode 100644 index 000000000000..c97010dfe9a6 --- /dev/null +++ b/internal/stubserver/stubserver.go @@ -0,0 +1,165 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package stubserver is a stubbable implementation of +// google.golang.org/grpc/test/grpc_testing for testing purposes. +package stubserver + +import ( + "context" + "fmt" + "net" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/serviceconfig" + + testpb "google.golang.org/grpc/test/grpc_testing" +) + +// StubServer is a server that is easy to customize within individual test +// cases. +type StubServer struct { + // Guarantees we satisfy this interface; panics if unimplemented methods are called. + testpb.TestServiceServer + + // Customizable implementations of server handlers. + EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) + UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) + FullDuplexCallF func(stream testpb.TestService_FullDuplexCallServer) error + + // A client connected to this service the test may use. Created in Start(). + Client testpb.TestServiceClient + CC *grpc.ClientConn + S *grpc.Server + + // Parameters for Listen and Dial. Defaults will be used if these are empty + // before Start. + Network string + Address string + Target string + + cleanups []func() // Lambdas executed in Stop(); populated by Start(). + + // Set automatically if Target == "" + R *manual.Resolver +} + +// EmptyCall is the handler for testpb.EmptyCall +func (ss *StubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return ss.EmptyCallF(ctx, in) +} + +// UnaryCall is the handler for testpb.UnaryCall +func (ss *StubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return ss.UnaryCallF(ctx, in) +} + +// FullDuplexCall is the handler for testpb.FullDuplexCall +func (ss *StubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { + return ss.FullDuplexCallF(stream) +} + +// Start starts the server and creates a client connected to it. +func (ss *StubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error { + if ss.Network == "" { + ss.Network = "tcp" + } + if ss.Address == "" { + ss.Address = "localhost:0" + } + if ss.Target == "" { + ss.R = manual.NewBuilderWithScheme("whatever") + } + + lis, err := net.Listen(ss.Network, ss.Address) + if err != nil { + return fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err) + } + ss.Address = lis.Addr().String() + ss.cleanups = append(ss.cleanups, func() { lis.Close() }) + + s := grpc.NewServer(sopts...) + testpb.RegisterTestServiceServer(s, ss) + go s.Serve(lis) + ss.cleanups = append(ss.cleanups, s.Stop) + ss.S = s + + opts := append([]grpc.DialOption{grpc.WithInsecure()}, dopts...) + if ss.R != nil { + ss.Target = ss.R.Scheme() + ":///" + ss.Address + opts = append(opts, grpc.WithResolvers(ss.R)) + } + + cc, err := grpc.Dial(ss.Target, opts...) + if err != nil { + return fmt.Errorf("grpc.Dial(%q) = %v", ss.Target, err) + } + ss.CC = cc + if ss.R != nil { + ss.R.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}}) + } + if err := waitForReady(cc); err != nil { + return err + } + + ss.cleanups = append(ss.cleanups, func() { cc.Close() }) + + ss.Client = testpb.NewTestServiceClient(cc) + return nil +} + +// NewServiceConfig applies sc to ss.Client using the resolver (if present). +func (ss *StubServer) NewServiceConfig(sc string) { + if ss.R != nil { + ss.R.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}, ServiceConfig: parseCfg(ss.R, sc)}) + } +} + +func waitForReady(cc *grpc.ClientConn) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + for { + s := cc.GetState() + if s == connectivity.Ready { + return nil + } + if !cc.WaitForStateChange(ctx, s) { + // ctx got timeout or canceled. + return ctx.Err() + } + } +} + +// Stop stops ss and cleans up all resources it consumed. +func (ss *StubServer) Stop() { + for i := len(ss.cleanups) - 1; i >= 0; i-- { + ss.cleanups[i]() + } +} + +func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult { + g := r.CC.ParseServiceConfig(s) + if g.Err != nil { + panic(fmt.Sprintf("Error parsing config %q: %v", s, g.Err)) + } + return g +} diff --git a/test/authority_test.go b/test/authority_test.go index e6599b2fde04..f537ee450a42 100644 --- a/test/authority_test.go +++ b/test/authority_test.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" testpb "google.golang.org/grpc/test/grpc_testing" @@ -56,13 +57,13 @@ func runUnixTest(t *testing.T, address, target, expectedAuthority string, dialer if err := os.RemoveAll(address); err != nil { t.Fatalf("Error removing socket file %v: %v\n", address, err) } - ss := &stubServer{ - emptyCall: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { return authorityChecker(ctx, expectedAuthority) }, - network: "unix", - address: address, - target: target, + Network: "unix", + Address: address, + Target: target, } opts := []grpc.DialOption{} if dialer != nil { @@ -74,7 +75,7 @@ func runUnixTest(t *testing.T, address, target, expectedAuthority string, dialer defer ss.Stop() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}) + _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}) if err != nil { t.Errorf("us.client.EmptyCall(_, _) = _, %v; want _, nil", err) } @@ -152,19 +153,19 @@ func (s) TestUnixCustomDialer(t *testing.T) { func (s) TestColonPortAuthority(t *testing.T) { expectedAuthority := "" var authorityMu sync.Mutex - ss := &stubServer{ - emptyCall: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { authorityMu.Lock() defer authorityMu.Unlock() return authorityChecker(ctx, expectedAuthority) }, - network: "tcp", + Network: "tcp", } if err := ss.Start(nil); err != nil { t.Fatalf("Error starting endpoint server: %v", err) } defer ss.Stop() - _, port, err := net.SplitHostPort(ss.address) + _, port, err := net.SplitHostPort(ss.Address) if err != nil { t.Fatalf("Failed splitting host from post: %v", err) } @@ -180,7 +181,7 @@ func (s) TestColonPortAuthority(t *testing.T) { return (&net.Dialer{}).DialContext(ctx, "tcp", "localhost"+addr) })) if err != nil { - t.Fatalf("grpc.Dial(%q) = %v", ss.target, err) + t.Fatalf("grpc.Dial(%q) = %v", ss.Target, err) } defer cc.Close() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) diff --git a/test/balancer_test.go b/test/balancer_test.go index 7af5c81a0011..bc22036dbac3 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -40,6 +40,7 @@ import ( "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpcutil" imetadata "google.golang.org/grpc/internal/metadata" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" @@ -301,8 +302,8 @@ func testDoneLoads(t *testing.T, e env) { const testLoad = "test-load-,-should-be-orca" - ss := &stubServer{ - emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { grpc.SetTrailer(ctx, metadata.Pairs(loadMDKey, testLoad)) return &testpb.Empty{}, nil }, @@ -312,7 +313,7 @@ func testDoneLoads(t *testing.T, e env) { } defer ss.Stop() - tc := testpb.NewTestServiceClient(ss.cc) + tc := testpb.NewTestServiceClient(ss.CC) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -579,8 +580,8 @@ func (s) TestMetadataInAddressAttributes(t *testing.T) { t.Logf("Registered balancer %s...", mdBalancerName) testMDChan := make(chan []string, 1) - ss := &stubServer{ - emptyCall: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { md, ok := metadata.FromIncomingContext(ctx) if ok { select { @@ -602,7 +603,7 @@ func (s) TestMetadataInAddressAttributes(t *testing.T) { // The RPC should succeed with the expected md. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("EmptyCall() = _, %v, want _, ", err) } t.Log("Made an RPC which succeeded...") diff --git a/test/channelz_test.go b/test/channelz_test.go index 7c074961d771..47e7eb927169 100644 --- a/test/channelz_test.go +++ b/test/channelz_test.go @@ -38,6 +38,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -969,7 +970,7 @@ func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *t // Avoid overflowing connection level flow control window, which will lead to // transport being closed. te.serverInitialConnWindowSize = 65536 * 2 - ts := &stubServer{fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { + ts := &stubserver.StubServer{FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { stream.Send(&testpb.StreamingOutputCallResponse{}) <-stream.Context().Done() return status.Errorf(codes.DeadlineExceeded, "deadline exceeded or cancelled") diff --git a/test/context_canceled_test.go b/test/context_canceled_test.go index 781f63f0c04e..96ee69d8d521 100644 --- a/test/context_canceled_test.go +++ b/test/context_canceled_test.go @@ -26,14 +26,15 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/encoding/gzip" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" testpb "google.golang.org/grpc/test/grpc_testing" ) func (s) TestContextCanceled(t *testing.T) { - ss := &stubServer{ - fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { + ss := &stubserver.StubServer{ + FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { stream.SetTrailer(metadata.New(map[string]string{"a": "b"})) return status.Error(codes.PermissionDenied, "perm denied") }, @@ -51,7 +52,7 @@ func (s) TestContextCanceled(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), delay) defer cancel() - str, err := ss.client.FullDuplexCall(ctx) + str, err := ss.Client.FullDuplexCall(ctx) if err != nil { continue } @@ -121,8 +122,8 @@ func (s) TestContextCanceled(t *testing.T) { // first one, but `case ctx.Done()` wins the second one, the compression info // will be inconsistent, and it causes internal error. func (s) TestCancelWhileRecvingWithCompression(t *testing.T) { - ss := &stubServer{ - fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { + ss := &stubserver.StubServer{ + FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { for { if err := stream.Send(&testpb.StreamingOutputCallResponse{ Payload: nil, @@ -139,7 +140,7 @@ func (s) TestCancelWhileRecvingWithCompression(t *testing.T) { for i := 0; i < 10; i++ { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - s, err := ss.client.FullDuplexCall(ctx, grpc.UseCompressor(gzip.Name)) + s, err := ss.Client.FullDuplexCall(ctx, grpc.UseCompressor(gzip.Name)) if err != nil { t.Fatalf("failed to start bidi streaming RPC: %v", err) } diff --git a/test/end2end_test.go b/test/end2end_test.go index a3850d20874d..902e94241048 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -58,6 +58,7 @@ import ( "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/transport" "google.golang.org/grpc/metadata" @@ -5129,127 +5130,12 @@ func (fw *filterWriter) Write(p []byte) (n int, err error) { return fw.dst.Write(p) } -// stubServer is a server that is easy to customize within individual test -// cases. -type stubServer struct { - // Guarantees we satisfy this interface; panics if unimplemented methods are called. - testpb.TestServiceServer - - // Customizable implementations of server handlers. - emptyCall func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) - unaryCall func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) - fullDuplexCall func(stream testpb.TestService_FullDuplexCallServer) error - - // A client connected to this service the test may use. Created in Start(). - client testpb.TestServiceClient - cc *grpc.ClientConn - s *grpc.Server - - // Parameters for Listen and Dial. Defaults will be used if these are empty - // before Start. - network string - address string - target string - - cleanups []func() // Lambdas executed in Stop(); populated by Start(). - - r *manual.Resolver -} - -func (ss *stubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { - return ss.emptyCall(ctx, in) -} - -func (ss *stubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - return ss.unaryCall(ctx, in) -} - -func (ss *stubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { - return ss.fullDuplexCall(stream) -} - -// Start starts the server and creates a client connected to it. -func (ss *stubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error { - if ss.network == "" { - ss.network = "tcp" - } - if ss.address == "" { - ss.address = "localhost:0" - } - if ss.target == "" { - ss.r = manual.NewBuilderWithScheme("whatever") - } - - lis, err := net.Listen(ss.network, ss.address) - if err != nil { - return fmt.Errorf("net.Listen(%q, %q) = %v", ss.network, ss.address, err) - } - ss.address = lis.Addr().String() - ss.cleanups = append(ss.cleanups, func() { lis.Close() }) - - s := grpc.NewServer(sopts...) - testpb.RegisterTestServiceServer(s, ss) - go s.Serve(lis) - ss.cleanups = append(ss.cleanups, s.Stop) - ss.s = s - - opts := append([]grpc.DialOption{grpc.WithInsecure()}, dopts...) - if ss.r != nil { - ss.target = ss.r.Scheme() + ":///" + ss.address - opts = append(opts, grpc.WithResolvers(ss.r)) - } - - cc, err := grpc.Dial(ss.target, opts...) - if err != nil { - return fmt.Errorf("grpc.Dial(%q) = %v", ss.target, err) - } - ss.cc = cc - if ss.r != nil { - ss.r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.address}}}) - } - if err := ss.waitForReady(cc); err != nil { - return err - } - - ss.cleanups = append(ss.cleanups, func() { cc.Close() }) - - ss.client = testpb.NewTestServiceClient(cc) - return nil -} - -func (ss *stubServer) newServiceConfig(sc string) { - if ss.r != nil { - ss.r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.address}}, ServiceConfig: parseCfg(ss.r, sc)}) - } -} - -func (ss *stubServer) waitForReady(cc *grpc.ClientConn) error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - for { - s := cc.GetState() - if s == connectivity.Ready { - return nil - } - if !cc.WaitForStateChange(ctx, s) { - // ctx got timeout or canceled. - return ctx.Err() - } - } -} - -func (ss *stubServer) Stop() { - for i := len(ss.cleanups) - 1; i >= 0; i-- { - ss.cleanups[i]() - } -} - func (s) TestGRPCMethod(t *testing.T) { var method string var ok bool - ss := &stubServer{ - emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { method, ok = grpc.Method(ctx) return &testpb.Empty{}, nil }, @@ -5262,8 +5148,8 @@ func (s) TestGRPCMethod(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); err != nil { - t.Fatalf("ss.client.EmptyCall(_, _) = _, %v; want _, nil", err) + if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v; want _, nil", err) } if want := "/grpc.testing.TestService/EmptyCall"; !ok || method != want { @@ -5275,8 +5161,8 @@ func (s) TestUnaryProxyDoesNotForwardMetadata(t *testing.T) { const mdkey = "somedata" // endpoint ensures mdkey is NOT in metadata and returns an error if it is. - endpoint := &stubServer{ - emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + endpoint := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil { return nil, status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey) } @@ -5290,12 +5176,12 @@ func (s) TestUnaryProxyDoesNotForwardMetadata(t *testing.T) { // proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint // without explicitly copying the metadata. - proxy := &stubServer{ - emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + proxy := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil { return nil, status.Errorf(codes.Internal, "proxy: md=%v; want contains(%q)", md, mdkey) } - return endpoint.client.EmptyCall(ctx, in) + return endpoint.Client.EmptyCall(ctx, in) }, } if err := proxy.Start(nil); err != nil { @@ -5309,12 +5195,12 @@ func (s) TestUnaryProxyDoesNotForwardMetadata(t *testing.T) { ctx = metadata.NewOutgoingContext(ctx, md) // Sanity check that endpoint properly errors when it sees mdkey. - _, err := endpoint.client.EmptyCall(ctx, &testpb.Empty{}) + _, err := endpoint.Client.EmptyCall(ctx, &testpb.Empty{}) if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal { - t.Fatalf("endpoint.client.EmptyCall(_, _) = _, %v; want _, ", err) + t.Fatalf("endpoint.Client.EmptyCall(_, _) = _, %v; want _, ", err) } - if _, err := proxy.client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + if _, err := proxy.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatal(err.Error()) } } @@ -5337,8 +5223,8 @@ func (s) TestStreamingProxyDoesNotForwardMetadata(t *testing.T) { } // endpoint ensures mdkey is NOT in metadata and returns an error if it is. - endpoint := &stubServer{ - fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { + endpoint := &stubserver.StubServer{ + FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { ctx := stream.Context() if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil { return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey) @@ -5353,13 +5239,13 @@ func (s) TestStreamingProxyDoesNotForwardMetadata(t *testing.T) { // proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint // without explicitly copying the metadata. - proxy := &stubServer{ - fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { + proxy := &stubserver.StubServer{ + FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { ctx := stream.Context() if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil { return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey) } - return doFDC(ctx, endpoint.client) + return doFDC(ctx, endpoint.Client) }, } if err := proxy.Start(nil); err != nil { @@ -5373,13 +5259,13 @@ func (s) TestStreamingProxyDoesNotForwardMetadata(t *testing.T) { ctx = metadata.NewOutgoingContext(ctx, md) // Sanity check that endpoint properly errors when it sees mdkey in ctx. - err := doFDC(ctx, endpoint.client) + err := doFDC(ctx, endpoint.Client) if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal { t.Fatalf("stream.Recv() = _, %v; want _, ", err) } - if err := doFDC(ctx, proxy.client); err != nil { - t.Fatalf("doFDC(_, proxy.client) = %v; want nil", err) + if err := doFDC(ctx, proxy.Client); err != nil { + t.Fatalf("doFDC(_, proxy.Client) = %v; want nil", err) } } @@ -5390,8 +5276,8 @@ func (s) TestStatsTagsAndTrace(t *testing.T) { // endpoint ensures Tags() and Trace() in context match those that were added // by the client and returns an error if not. - endpoint := &stubServer{ - emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + endpoint := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { md, _ := metadata.FromIncomingContext(ctx) if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, tags) { return nil, status.Errorf(codes.Internal, "stats.Tags(%v)=%v; want %v", ctx, tg, tags) @@ -5428,12 +5314,12 @@ func (s) TestStatsTagsAndTrace(t *testing.T) { } for _, tc := range testCases { - _, err := endpoint.client.EmptyCall(tc.ctx, &testpb.Empty{}) + _, err := endpoint.Client.EmptyCall(tc.ctx, &testpb.Empty{}) if tc.want == codes.OK && err != nil { - t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, nil", tc.ctx, err) + t.Fatalf("endpoint.Client.EmptyCall(%v, _) = _, %v; want _, nil", tc.ctx, err) } if s, ok := status.FromError(err); !ok || s.Code() != tc.want { - t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, ", tc.ctx, err, tc.want) + t.Fatalf("endpoint.Client.EmptyCall(%v, _) = _, %v; want _, ", tc.ctx, err, tc.want) } } } @@ -5450,8 +5336,8 @@ func (s) TestTapTimeout(t *testing.T) { }), } - ss := &stubServer{ - emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { <-ctx.Done() return nil, status.Errorf(codes.Canceled, ctx.Err().Error()) }, @@ -5465,18 +5351,18 @@ func (s) TestTapTimeout(t *testing.T) { for i := 0; i < 10; i++ { // Set our own deadline in case the server hangs. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - res, err := ss.client.EmptyCall(ctx, &testpb.Empty{}) + res, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}) cancel() if s, ok := status.FromError(err); !ok || s.Code() != codes.Canceled { - t.Fatalf("ss.client.EmptyCall(ctx, _) = %v, %v; want nil, ", res, err) + t.Fatalf("ss.Client.EmptyCall(ctx, _) = %v, %v; want nil, ", res, err) } } } func (s) TestClientWriteFailsAfterServerClosesStream(t *testing.T) { - ss := &stubServer{ - fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { + ss := &stubserver.StubServer{ + FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { return status.Errorf(codes.Internal, "") }, } @@ -5487,7 +5373,7 @@ func (s) TestClientWriteFailsAfterServerClosesStream(t *testing.T) { defer ss.Stop() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - stream, err := ss.client.FullDuplexCall(ctx) + stream, err := ss.Client.FullDuplexCall(ctx) if err != nil { t.Fatalf("Error while creating stream: %v", err) } @@ -6337,8 +6223,8 @@ func testCompressorRegister(t *testing.T, e env) { } func (s) TestServeExitsWhenListenerClosed(t *testing.T) { - ss := &stubServer{ - emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, } @@ -6390,8 +6276,8 @@ func (s) TestStatusInvalidUTF8Message(t *testing.T) { wantMsg = "���" ) - ss := &stubServer{ - emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { return nil, status.Errorf(codes.Internal, origMsg) }, } @@ -6403,8 +6289,8 @@ func (s) TestStatusInvalidUTF8Message(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); status.Convert(err).Message() != wantMsg { - t.Fatalf("ss.client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, status.Convert(err).Message(), wantMsg) + if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); status.Convert(err).Message() != wantMsg { + t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, status.Convert(err).Message(), wantMsg) } } @@ -6419,8 +6305,8 @@ func (s) TestStatusInvalidUTF8Details(t *testing.T) { wantMsg = "���" ) - ss := &stubServer{ - emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { st := status.New(codes.Internal, origMsg) st, err := st.WithDetails(&testpb.Empty{}) if err != nil { @@ -6437,10 +6323,10 @@ func (s) TestStatusInvalidUTF8Details(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}) + _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}) st := status.Convert(err) if st.Message() != wantMsg { - t.Fatalf("ss.client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, st.Message(), wantMsg) + t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, st.Message(), wantMsg) } if len(st.Details()) != 0 { // Details should be dropped on the server side. @@ -6539,8 +6425,8 @@ func (s) TestDisabledIOBuffers(t *testing.T) { Payload: payload, } - ss := &stubServer{ - fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { + ss := &stubserver.StubServer{ + FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { for { in, err := stream.Recv() if err == io.EOF { @@ -7220,8 +7106,8 @@ func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult { func (s) TestClientCancellationPropagatesUnary(t *testing.T) { wg := &sync.WaitGroup{} called, done := make(chan struct{}), make(chan struct{}) - ss := &stubServer{ - emptyCall: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { close(called) <-ctx.Done() err := ctx.Err() @@ -7241,8 +7127,8 @@ func (s) TestClientCancellationPropagatesUnary(t *testing.T) { wg.Add(1) go func() { - if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Canceled { - t.Errorf("ss.client.EmptyCall() = _, %v; want _, Code()=codes.Canceled", err) + if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Canceled { + t.Errorf("ss.Client.EmptyCall() = _, %v; want _, Code()=codes.Canceled", err) } wg.Done() }() @@ -7283,8 +7169,8 @@ func (badGzipCompressor) Type() string { } func (s) TestGzipBadChecksum(t *testing.T) { - ss := &stubServer{ - unaryCall: func(ctx context.Context, _ *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + ss := &stubserver.StubServer{ + UnaryCallF: func(ctx context.Context, _ *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, } @@ -7300,18 +7186,18 @@ func (s) TestGzipBadChecksum(t *testing.T) { if err != nil { t.Fatalf("Unexpected error from newPayload: %v", err) } - if _, err := ss.client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: p}); err == nil || + if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: p}); err == nil || status.Code(err) != codes.Internal || !strings.Contains(status.Convert(err).Message(), gzip.ErrChecksum.Error()) { - t.Errorf("ss.client.UnaryCall(_) = _, %v\n\twant: _, status(codes.Internal, contains %q)", err, gzip.ErrChecksum) + t.Errorf("ss.Client.UnaryCall(_) = _, %v\n\twant: _, status(codes.Internal, contains %q)", err, gzip.ErrChecksum) } } // When an RPC is canceled, it's possible that the last Recv() returns before // all call options' after are executed. func (s) TestCanceledRPCCallOptionRace(t *testing.T) { - ss := &stubServer{ - fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { + ss := &stubserver.StubServer{ + FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { err := stream.Send(&testpb.StreamingOutputCallResponse{}) if err != nil { return err @@ -7337,7 +7223,7 @@ func (s) TestCanceledRPCCallOptionRace(t *testing.T) { var p peer.Peer ctx, cancel := context.WithCancel(ctx) defer cancel() - stream, err := ss.client.FullDuplexCall(ctx, grpc.Peer(&p)) + stream, err := ss.Client.FullDuplexCall(ctx, grpc.Peer(&p)) if err != nil { t.Errorf("_.FullDuplexCall(_) = _, %v", err) return diff --git a/test/goaway_test.go b/test/goaway_test.go index 55f79ebc8548..6ef11e26419d 100644 --- a/test/goaway_test.go +++ b/test/goaway_test.go @@ -25,6 +25,7 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/keepalive" testpb "google.golang.org/grpc/test/grpc_testing" ) @@ -40,8 +41,8 @@ func (s) TestGracefulClientOnGoAway(t *testing.T) { const maxConnAge = 100 * time.Millisecond const testTime = maxConnAge * 10 - ss := &stubServer{ - emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, } diff --git a/test/gracefulstop_test.go b/test/gracefulstop_test.go index 3da75ea1b51b..6058fb8b333c 100644 --- a/test/gracefulstop_test.go +++ b/test/gracefulstop_test.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/status" testpb "google.golang.org/grpc/test/grpc_testing" ) @@ -107,8 +108,8 @@ func (s) TestGracefulStop(t *testing.T) { } d := func(ctx context.Context, _ string) (net.Conn, error) { return dlis.Dial(ctx) } - ss := &stubServer{ - fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { + ss := &stubserver.StubServer{ + FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { _, err := stream.Recv() if err != nil { return err diff --git a/test/insecure_creds_test.go b/test/insecure_creds_test.go index dd56a8c46ee5..19f8bb8b791b 100644 --- a/test/insecure_creds_test.go +++ b/test/insecure_creds_test.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/peer" "google.golang.org/grpc/status" @@ -84,8 +85,8 @@ func (s) TestInsecureCreds(t *testing.T) { for _, test := range tests { t.Run(test.desc, func(t *testing.T) { - ss := &stubServer{ - emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { if !test.serverInsecureCreds { return &testpb.Empty{}, nil } @@ -167,8 +168,8 @@ func (s) TestInsecureCredsWithPerRPCCredentials(t *testing.T) { } for _, test := range tests { t.Run(test.desc, func(t *testing.T) { - ss := &stubServer{ - emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, } diff --git a/test/local_creds_test.go b/test/local_creds_test.go index b9115f0d5ac8..3933bb39635b 100644 --- a/test/local_creds_test.go +++ b/test/local_creds_test.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/local" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/peer" "google.golang.org/grpc/status" @@ -37,8 +38,8 @@ import ( ) func testLocalCredsE2ESucceed(network, address string) error { - ss := &stubServer{ - emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { pr, ok := peer.FromContext(ctx) if !ok { return nil, status.Error(codes.DataLoss, "Failed to get peer from ctx") @@ -159,8 +160,8 @@ func spoofDialer(addr net.Addr) func(target string, t time.Duration) (net.Conn, } func testLocalCredsE2EFail(dopts []grpc.DialOption) error { - ss := &stubServer{ - emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, } diff --git a/test/resolver_test.go b/test/resolver_test.go index 6f50047edadc..ab154f0ea29a 100644 --- a/test/resolver_test.go +++ b/test/resolver_test.go @@ -27,6 +27,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" @@ -45,13 +46,13 @@ func (f funcConfigSelector) SelectConfig(i iresolver.RPCInfo) *iresolver.RPCConf func (s) TestConfigSelector(t *testing.T) { gotContextChan := testutils.NewChannelWithSize(1) - ss := &stubServer{ - emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { gotContextChan.SendContext(ctx, ctx) return &testpb.Empty{}, nil }, } - ss.r = manual.NewBuilderWithScheme("confSel") + ss.R = manual.NewBuilderWithScheme("confSel") if err := ss.Start(nil); err != nil { t.Fatalf("Error starting endpoint server: %v", err) @@ -134,8 +135,8 @@ func (s) TestConfigSelector(t *testing.T) { t.Run(tc.name, func(t *testing.T) { var gotInfo *iresolver.RPCInfo state := iresolver.SetConfigSelector(resolver.State{ - Addresses: []resolver.Address{{Addr: ss.address}}, - ServiceConfig: parseCfg(ss.r, "{}"), + Addresses: []resolver.Address{{Addr: ss.Address}}, + ServiceConfig: parseCfg(ss.R, "{}"), }, funcConfigSelector{ f: func(i iresolver.RPCInfo) *iresolver.RPCConfig { gotInfo = &i @@ -146,12 +147,12 @@ func (s) TestConfigSelector(t *testing.T) { return cfg }, }) - ss.r.UpdateState(state) // Blocks until config selector is applied + ss.R.UpdateState(state) // Blocks until config selector is applied onCommittedCalled = false ctx := metadata.NewOutgoingContext(ctx, tc.md) startTime := time.Now() - if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("client.EmptyCall(_, _) = _, %v; want _, nil", err) } diff --git a/test/retry_test.go b/test/retry_test.go index f0ab380e96bc..f93c9ac053f7 100644 --- a/test/retry_test.go +++ b/test/retry_test.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/envconfig" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" testpb "google.golang.org/grpc/test/grpc_testing" @@ -46,8 +47,8 @@ func enableRetry() func() { func (s) TestRetryUnary(t *testing.T) { defer enableRetry()() i := -1 - ss := &stubServer{ - emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { i++ switch i { case 0, 2, 5: @@ -62,7 +63,7 @@ func (s) TestRetryUnary(t *testing.T) { t.Fatalf("Error starting endpoint server: %v", err) } defer ss.Stop() - ss.newServiceConfig(`{ + ss.NewServiceConfig(`{ "methodConfig": [{ "name": [{"service": "grpc.testing.TestService"}], "waitForReady": true, @@ -79,7 +80,7 @@ func (s) TestRetryUnary(t *testing.T) { if ctx.Err() != nil { t.Fatalf("Timed out waiting for service config update") } - if ss.cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil { + if ss.CC.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil { break } time.Sleep(time.Millisecond) @@ -100,7 +101,7 @@ func (s) TestRetryUnary(t *testing.T) { } for _, tc := range testCases { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}) + _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}) cancel() if status.Code(err) != tc.code { t.Fatalf("EmptyCall(_, _) = _, %v; want _, ", err, tc.code) @@ -116,8 +117,8 @@ func (s) TestRetryDisabledByDefault(t *testing.T) { return } i := -1 - ss := &stubServer{ - emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { i++ switch i { case 0: @@ -130,7 +131,7 @@ func (s) TestRetryDisabledByDefault(t *testing.T) { t.Fatalf("Error starting endpoint server: %v", err) } defer ss.Stop() - ss.newServiceConfig(`{ + ss.NewServiceConfig(`{ "methodConfig": [{ "name": [{"service": "grpc.testing.TestService"}], "waitForReady": true, @@ -147,7 +148,7 @@ func (s) TestRetryDisabledByDefault(t *testing.T) { if ctx.Err() != nil { t.Fatalf("Timed out waiting for service config update") } - if ss.cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil { + if ss.CC.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil { break } time.Sleep(time.Millisecond) @@ -162,7 +163,7 @@ func (s) TestRetryDisabledByDefault(t *testing.T) { } for _, tc := range testCases { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}) + _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}) cancel() if status.Code(err) != tc.code { t.Fatalf("EmptyCall(_, _) = _, %v; want _, ", err, tc.code) @@ -176,8 +177,8 @@ func (s) TestRetryDisabledByDefault(t *testing.T) { func (s) TestRetryThrottling(t *testing.T) { defer enableRetry()() i := -1 - ss := &stubServer{ - emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { i++ switch i { case 0, 3, 6, 10, 11, 12, 13, 14, 16, 18: @@ -190,7 +191,7 @@ func (s) TestRetryThrottling(t *testing.T) { t.Fatalf("Error starting endpoint server: %v", err) } defer ss.Stop() - ss.newServiceConfig(`{ + ss.NewServiceConfig(`{ "methodConfig": [{ "name": [{"service": "grpc.testing.TestService"}], "waitForReady": true, @@ -212,7 +213,7 @@ func (s) TestRetryThrottling(t *testing.T) { if ctx.Err() != nil { t.Fatalf("Timed out waiting for service config update") } - if ss.cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil { + if ss.CC.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil { break } time.Sleep(time.Millisecond) @@ -238,7 +239,7 @@ func (s) TestRetryThrottling(t *testing.T) { } for _, tc := range testCases { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}) + _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}) cancel() if status.Code(err) != tc.code { t.Errorf("EmptyCall(_, _) = _, %v; want _, ", err, tc.code) @@ -485,8 +486,8 @@ func (s) TestRetryStreaming(t *testing.T) { var serverOpIter int var serverOps []serverOp - ss := &stubServer{ - fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { + ss := &stubserver.StubServer{ + FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { for serverOpIter < len(serverOps) { op := serverOps[serverOpIter] serverOpIter++ @@ -501,7 +502,7 @@ func (s) TestRetryStreaming(t *testing.T) { t.Fatalf("Error starting endpoint server: %v", err) } defer ss.Stop() - ss.newServiceConfig(`{ + ss.NewServiceConfig(`{ "methodConfig": [{ "name": [{"service": "grpc.testing.TestService"}], "waitForReady": true, @@ -518,7 +519,7 @@ func (s) TestRetryStreaming(t *testing.T) { if ctx.Err() != nil { t.Fatalf("Timed out waiting for service config update") } - if ss.cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil { + if ss.CC.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil { break } time.Sleep(time.Millisecond) @@ -532,7 +533,7 @@ func (s) TestRetryStreaming(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - stream, err := ss.client.FullDuplexCall(ctx) + stream, err := ss.Client.FullDuplexCall(ctx) if err != nil { t.Fatalf("%v: Error while creating stream: %v", tc.desc, err) } diff --git a/test/server_test.go b/test/server_test.go index 41466157a19f..97f352328873 100644 --- a/test/server_test.go +++ b/test/server_test.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/status" testpb "google.golang.org/grpc/test/grpc_testing" ) @@ -118,8 +119,8 @@ func (s) TestChainUnaryServerInterceptor(t *testing.T) { grpc.ChainUnaryInterceptor(firstInt, secondInt, lastInt), } - ss := &stubServer{ - unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + ss := &stubserver.StubServer{ + UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 0) if err != nil { return nil, status.Errorf(codes.Aborted, "failed to make payload: %v", err) @@ -137,9 +138,9 @@ func (s) TestChainUnaryServerInterceptor(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - resp, err := ss.client.UnaryCall(ctx, &testpb.SimpleRequest{}) + resp, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{}) if s, ok := status.FromError(err); !ok || s.Code() != codes.OK { - t.Fatalf("ss.client.UnaryCall(ctx, _) = %v, %v; want nil, ", resp, err) + t.Fatalf("ss.Client.UnaryCall(ctx, _) = %v, %v; want nil, ", resp, err) } respBytes := resp.Payload.GetBody() @@ -173,8 +174,8 @@ func (s) TestChainOnBaseUnaryServerInterceptor(t *testing.T) { grpc.ChainUnaryInterceptor(chainInt), } - ss := &stubServer{ - emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, } @@ -185,9 +186,9 @@ func (s) TestChainOnBaseUnaryServerInterceptor(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - resp, err := ss.client.EmptyCall(ctx, &testpb.Empty{}) + resp, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}) if s, ok := status.FromError(err); !ok || s.Code() != codes.OK { - t.Fatalf("ss.client.EmptyCall(ctx, _) = %v, %v; want nil, ", resp, err) + t.Fatalf("ss.Client.EmptyCall(ctx, _) = %v, %v; want nil, ", resp, err) } } @@ -249,8 +250,8 @@ func (s) TestChainStreamServerInterceptor(t *testing.T) { grpc.ChainStreamInterceptor(firstInt, secondInt, lastInt), } - ss := &stubServer{ - fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { + ss := &stubserver.StubServer{ + FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { if callCounts[0] != 1 { return status.Errorf(codes.Internal, "callCounts[0] should be 1, but got=%d", callCounts[0]) } @@ -274,7 +275,7 @@ func (s) TestChainStreamServerInterceptor(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - stream, err := ss.client.FullDuplexCall(ctx) + stream, err := ss.Client.FullDuplexCall(ctx) if err != nil { t.Fatalf("failed to FullDuplexCall: %v", err) } diff --git a/test/stream_cleanup_test.go b/test/stream_cleanup_test.go index 77d9477cf17e..83dd68549e99 100644 --- a/test/stream_cleanup_test.go +++ b/test/stream_cleanup_test.go @@ -26,6 +26,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/status" testpb "google.golang.org/grpc/test/grpc_testing" ) @@ -35,13 +36,13 @@ func (s) TestStreamCleanup(t *testing.T) { const bodySize = 2 * initialWindowSize // Something that is not going to fit in a single window const callRecvMsgSize uint = 1 // The maximum message size the client can receive - ss := &stubServer{ - unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + ss := &stubserver.StubServer{ + UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{Payload: &testpb.Payload{ Body: make([]byte, bodySize), }}, nil }, - emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, } @@ -52,10 +53,10 @@ func (s) TestStreamCleanup(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if _, err := ss.client.UnaryCall(ctx, &testpb.SimpleRequest{}); status.Code(err) != codes.ResourceExhausted { + if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{}); status.Code(err) != codes.ResourceExhausted { t.Fatalf("should fail with ResourceExhausted, message's body size: %v, maximum message size the client can receive: %v", bodySize, callRecvMsgSize) } - if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("should succeed, err: %v", err) } } @@ -66,8 +67,8 @@ func (s) TestStreamCleanupAfterSendStatus(t *testing.T) { serverReturnedStatus := make(chan struct{}) - ss := &stubServer{ - fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { + ss := &stubserver.StubServer{ + FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { defer func() { close(serverReturnedStatus) }() @@ -90,7 +91,7 @@ func (s) TestStreamCleanupAfterSendStatus(t *testing.T) { // empty. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - stream, err := ss.client.FullDuplexCall(ctx) + stream, err := ss.Client.FullDuplexCall(ctx) if err != nil { t.Fatalf("FullDuplexCall= _, %v; want _, ", err) } @@ -115,7 +116,7 @@ func (s) TestStreamCleanupAfterSendStatus(t *testing.T) { gracefulStopDone := make(chan struct{}) go func() { defer close(gracefulStopDone) - ss.s.GracefulStop() + ss.S.GracefulStop() }() // 4. Make sure the stream is not broken.