Skip to content

Commit

Permalink
test: move stubServer to separate package in internal (#4081)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Dec 4, 2020
1 parent 9db56a0 commit 0d6a24f
Show file tree
Hide file tree
Showing 14 changed files with 316 additions and 253 deletions.
165 changes: 165 additions & 0 deletions internal/stubserver/stubserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package stubserver is a stubbable implementation of
// google.golang.org/grpc/test/grpc_testing for testing purposes.
package stubserver

import (
"context"
"fmt"
"net"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"

testpb "google.golang.org/grpc/test/grpc_testing"
)

// StubServer is a server that is easy to customize within individual test
// cases.
type StubServer struct {
// Guarantees we satisfy this interface; panics if unimplemented methods are called.
testpb.TestServiceServer

// Customizable implementations of server handlers.
EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error)
UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
FullDuplexCallF func(stream testpb.TestService_FullDuplexCallServer) error

// A client connected to this service the test may use. Created in Start().
Client testpb.TestServiceClient
CC *grpc.ClientConn
S *grpc.Server

// Parameters for Listen and Dial. Defaults will be used if these are empty
// before Start.
Network string
Address string
Target string

cleanups []func() // Lambdas executed in Stop(); populated by Start().

// Set automatically if Target == ""
R *manual.Resolver
}

// EmptyCall is the handler for testpb.EmptyCall
func (ss *StubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return ss.EmptyCallF(ctx, in)
}

// UnaryCall is the handler for testpb.UnaryCall
func (ss *StubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return ss.UnaryCallF(ctx, in)
}

// FullDuplexCall is the handler for testpb.FullDuplexCall
func (ss *StubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
return ss.FullDuplexCallF(stream)
}

// Start starts the server and creates a client connected to it.
func (ss *StubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error {
if ss.Network == "" {
ss.Network = "tcp"
}
if ss.Address == "" {
ss.Address = "localhost:0"
}
if ss.Target == "" {
ss.R = manual.NewBuilderWithScheme("whatever")
}

lis, err := net.Listen(ss.Network, ss.Address)
if err != nil {
return fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err)
}
ss.Address = lis.Addr().String()
ss.cleanups = append(ss.cleanups, func() { lis.Close() })

s := grpc.NewServer(sopts...)
testpb.RegisterTestServiceServer(s, ss)
go s.Serve(lis)
ss.cleanups = append(ss.cleanups, s.Stop)
ss.S = s

opts := append([]grpc.DialOption{grpc.WithInsecure()}, dopts...)
if ss.R != nil {
ss.Target = ss.R.Scheme() + ":https:///" + ss.Address
opts = append(opts, grpc.WithResolvers(ss.R))
}

cc, err := grpc.Dial(ss.Target, opts...)
if err != nil {
return fmt.Errorf("grpc.Dial(%q) = %v", ss.Target, err)
}
ss.CC = cc
if ss.R != nil {
ss.R.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}})
}
if err := waitForReady(cc); err != nil {
return err
}

ss.cleanups = append(ss.cleanups, func() { cc.Close() })

ss.Client = testpb.NewTestServiceClient(cc)
return nil
}

// NewServiceConfig applies sc to ss.Client using the resolver (if present).
func (ss *StubServer) NewServiceConfig(sc string) {
if ss.R != nil {
ss.R.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}, ServiceConfig: parseCfg(ss.R, sc)})
}
}

func waitForReady(cc *grpc.ClientConn) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for {
s := cc.GetState()
if s == connectivity.Ready {
return nil
}
if !cc.WaitForStateChange(ctx, s) {
// ctx got timeout or canceled.
return ctx.Err()
}
}
}

// Stop stops ss and cleans up all resources it consumed.
func (ss *StubServer) Stop() {
for i := len(ss.cleanups) - 1; i >= 0; i-- {
ss.cleanups[i]()
}
}

func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
g := r.CC.ParseServiceConfig(s)
if g.Err != nil {
panic(fmt.Sprintf("Error parsing config %q: %v", s, g.Err))
}
return g
}
23 changes: 12 additions & 11 deletions test/authority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing"
Expand Down Expand Up @@ -56,13 +57,13 @@ func runUnixTest(t *testing.T, address, target, expectedAuthority string, dialer
if err := os.RemoveAll(address); err != nil {
t.Fatalf("Error removing socket file %v: %v\n", address, err)
}
ss := &stubServer{
emptyCall: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
return authorityChecker(ctx, expectedAuthority)
},
network: "unix",
address: address,
target: target,
Network: "unix",
Address: address,
Target: target,
}
opts := []grpc.DialOption{}
if dialer != nil {
Expand All @@ -74,7 +75,7 @@ func runUnixTest(t *testing.T, address, target, expectedAuthority string, dialer
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err := ss.client.EmptyCall(ctx, &testpb.Empty{})
_, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
if err != nil {
t.Errorf("us.client.EmptyCall(_, _) = _, %v; want _, nil", err)
}
Expand Down Expand Up @@ -152,19 +153,19 @@ func (s) TestUnixCustomDialer(t *testing.T) {
func (s) TestColonPortAuthority(t *testing.T) {
expectedAuthority := ""
var authorityMu sync.Mutex
ss := &stubServer{
emptyCall: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
authorityMu.Lock()
defer authorityMu.Unlock()
return authorityChecker(ctx, expectedAuthority)
},
network: "tcp",
Network: "tcp",
}
if err := ss.Start(nil); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
_, port, err := net.SplitHostPort(ss.address)
_, port, err := net.SplitHostPort(ss.Address)
if err != nil {
t.Fatalf("Failed splitting host from post: %v", err)
}
Expand All @@ -180,7 +181,7 @@ func (s) TestColonPortAuthority(t *testing.T) {
return (&net.Dialer{}).DialContext(ctx, "tcp", "localhost"+addr)
}))
if err != nil {
t.Fatalf("grpc.Dial(%q) = %v", ss.target, err)
t.Fatalf("grpc.Dial(%q) = %v", ss.Target, err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down
13 changes: 7 additions & 6 deletions test/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
imetadata "google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -301,8 +302,8 @@ func testDoneLoads(t *testing.T, e env) {

const testLoad = "test-load-,-should-be-orca"

ss := &stubServer{
emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
grpc.SetTrailer(ctx, metadata.Pairs(loadMDKey, testLoad))
return &testpb.Empty{}, nil
},
Expand All @@ -312,7 +313,7 @@ func testDoneLoads(t *testing.T, e env) {
}
defer ss.Stop()

tc := testpb.NewTestServiceClient(ss.cc)
tc := testpb.NewTestServiceClient(ss.CC)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -579,8 +580,8 @@ func (s) TestMetadataInAddressAttributes(t *testing.T) {
t.Logf("Registered balancer %s...", mdBalancerName)

testMDChan := make(chan []string, 1)
ss := &stubServer{
emptyCall: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
md, ok := metadata.FromIncomingContext(ctx)
if ok {
select {
Expand All @@ -602,7 +603,7 @@ func (s) TestMetadataInAddressAttributes(t *testing.T) {
// The RPC should succeed with the expected md.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}
t.Log("Made an RPC which succeeded...")
Expand Down
3 changes: 2 additions & 1 deletion test/channelz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
Expand Down Expand Up @@ -969,7 +970,7 @@ func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *t
// Avoid overflowing connection level flow control window, which will lead to
// transport being closed.
te.serverInitialConnWindowSize = 65536 * 2
ts := &stubServer{fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
ts := &stubserver.StubServer{FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
stream.Send(&testpb.StreamingOutputCallResponse{})
<-stream.Context().Done()
return status.Errorf(codes.DeadlineExceeded, "deadline exceeded or cancelled")
Expand Down
13 changes: 7 additions & 6 deletions test/context_canceled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing"
)

func (s) TestContextCanceled(t *testing.T) {
ss := &stubServer{
fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
ss := &stubserver.StubServer{
FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
stream.SetTrailer(metadata.New(map[string]string{"a": "b"}))
return status.Error(codes.PermissionDenied, "perm denied")
},
Expand All @@ -51,7 +52,7 @@ func (s) TestContextCanceled(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), delay)
defer cancel()

str, err := ss.client.FullDuplexCall(ctx)
str, err := ss.Client.FullDuplexCall(ctx)
if err != nil {
continue
}
Expand Down Expand Up @@ -121,8 +122,8 @@ func (s) TestContextCanceled(t *testing.T) {
// first one, but `case ctx.Done()` wins the second one, the compression info
// will be inconsistent, and it causes internal error.
func (s) TestCancelWhileRecvingWithCompression(t *testing.T) {
ss := &stubServer{
fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
ss := &stubserver.StubServer{
FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
for {
if err := stream.Send(&testpb.StreamingOutputCallResponse{
Payload: nil,
Expand All @@ -139,7 +140,7 @@ func (s) TestCancelWhileRecvingWithCompression(t *testing.T) {

for i := 0; i < 10; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
s, err := ss.client.FullDuplexCall(ctx, grpc.UseCompressor(gzip.Name))
s, err := ss.Client.FullDuplexCall(ctx, grpc.UseCompressor(gzip.Name))
if err != nil {
t.Fatalf("failed to start bidi streaming RPC: %v", err)
}
Expand Down
Loading

0 comments on commit 0d6a24f

Please sign in to comment.