Skip to content

Commit

Permalink
channelz: wait for clean up before next test (#2797)
Browse files Browse the repository at this point in the history
  • Loading branch information
lyuxuan committed May 2, 2019
1 parent a940832 commit 42baa8b
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 45 deletions.
5 changes: 4 additions & 1 deletion channelz/service/service_sktopt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ func protoToSocketOption(skopts []*channelzpb.SocketOption) *channelz.SocketOpti
}

func TestGetSocketOptions(t *testing.T) {
channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorage()
defer cleanupWrapper(czCleanup, t)
ss := []*dummySocket{
{
socketOptions: &channelz.SocketOptionData{
Expand All @@ -139,8 +140,10 @@ func TestGetSocketOptions(t *testing.T) {
svr := newCZServer()
ids := make([]int64, len(ss))
svrID := channelz.RegisterServer(&dummyServer{}, "")
defer channelz.RemoveEntry(svrID)
for i, s := range ss {
ids[i] = channelz.RegisterNormalSocket(s, svrID, strconv.Itoa(i))
defer channelz.RemoveEntry(ids[i])
}
for i, s := range ss {
resp, _ := svr.GetSocket(context.Background(), &channelzpb.GetSocketRequest{SocketId: ids[i]})
Expand Down
58 changes: 46 additions & 12 deletions channelz/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ func init() {
channelz.TurnOn()
}

func cleanupWrapper(cleanup func() error, t *testing.T) {
if err := cleanup(); err != nil {
t.Error(err)
}
}

type protoToSocketOptFunc func([]*channelzpb.SocketOption) *channelz.SocketOptionData

// protoToSocketOpt is used in function socketProtoToStruct to extract socket option
Expand Down Expand Up @@ -305,9 +311,11 @@ func TestGetTopChannels(t *testing.T) {
},
{},
}
channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorage()
defer cleanupWrapper(czCleanup, t)
for _, c := range tcs {
channelz.RegisterChannel(c, 0, "")
id := channelz.RegisterChannel(c, 0, "")
defer channelz.RemoveEntry(id)
}
s := newCZServer()
resp, _ := s.GetTopChannels(context.Background(), &channelzpb.GetTopChannelsRequest{StartChannelId: 0})
Expand All @@ -320,7 +328,8 @@ func TestGetTopChannels(t *testing.T) {
}
}
for i := 0; i < 50; i++ {
channelz.RegisterChannel(tcs[0], 0, "")
id := channelz.RegisterChannel(tcs[0], 0, "")
defer channelz.RemoveEntry(id)
}
resp, _ = s.GetTopChannels(context.Background(), &channelzpb.GetTopChannelsRequest{StartChannelId: 0})
if resp.GetEnd() {
Expand Down Expand Up @@ -349,9 +358,11 @@ func TestGetServers(t *testing.T) {
lastCallStartedTimestamp: time.Now().UTC(),
},
}
channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorage()
defer cleanupWrapper(czCleanup, t)
for _, s := range ss {
channelz.RegisterServer(s, "")
id := channelz.RegisterServer(s, "")
defer channelz.RemoveEntry(id)
}
svr := newCZServer()
resp, _ := svr.GetServers(context.Background(), &channelzpb.GetServersRequest{StartServerId: 0})
Expand All @@ -364,7 +375,8 @@ func TestGetServers(t *testing.T) {
}
}
for i := 0; i < 50; i++ {
channelz.RegisterServer(ss[0], "")
id := channelz.RegisterServer(ss[0], "")
defer channelz.RemoveEntry(id)
}
resp, _ = svr.GetServers(context.Background(), &channelzpb.GetServersRequest{StartServerId: 0})
if resp.GetEnd() {
Expand All @@ -373,13 +385,18 @@ func TestGetServers(t *testing.T) {
}

func TestGetServerSockets(t *testing.T) {
channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorage()
defer cleanupWrapper(czCleanup, t)
svrID := channelz.RegisterServer(&dummyServer{}, "")
defer channelz.RemoveEntry(svrID)
refNames := []string{"listen socket 1", "normal socket 1", "normal socket 2"}
ids := make([]int64, 3)
ids[0] = channelz.RegisterListenSocket(&dummySocket{}, svrID, refNames[0])
ids[1] = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[1])
ids[2] = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[2])
for _, id := range ids {
defer channelz.RemoveEntry(id)
}
svr := newCZServer()
resp, _ := svr.GetServerSockets(context.Background(), &channelzpb.GetServerSocketsRequest{ServerId: svrID, StartSocketId: 0})
if !resp.GetEnd() {
Expand All @@ -395,7 +412,8 @@ func TestGetServerSockets(t *testing.T) {
}

for i := 0; i < 50; i++ {
channelz.RegisterNormalSocket(&dummySocket{}, svrID, "")
id := channelz.RegisterNormalSocket(&dummySocket{}, svrID, "")
defer channelz.RemoveEntry(id)
}
resp, _ = svr.GetServerSockets(context.Background(), &channelzpb.GetServerSocketsRequest{ServerId: svrID, StartSocketId: 0})
if resp.GetEnd() {
Expand All @@ -406,13 +424,18 @@ func TestGetServerSockets(t *testing.T) {
// This test makes a GetServerSockets with a non-zero start ID, and expect only
// sockets with ID >= the given start ID.
func TestGetServerSocketsNonZeroStartID(t *testing.T) {
channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorage()
defer cleanupWrapper(czCleanup, t)
svrID := channelz.RegisterServer(&dummyServer{}, "")
defer channelz.RemoveEntry(svrID)
refNames := []string{"listen socket 1", "normal socket 1", "normal socket 2"}
ids := make([]int64, 3)
ids[0] = channelz.RegisterListenSocket(&dummySocket{}, svrID, refNames[0])
ids[1] = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[1])
ids[2] = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[2])
for _, id := range ids {
defer channelz.RemoveEntry(id)
}
svr := newCZServer()
// Make GetServerSockets with startID = ids[1]+1, so socket-1 won't be
// included in the response.
Expand All @@ -431,7 +454,8 @@ func TestGetServerSocketsNonZeroStartID(t *testing.T) {
}

func TestGetChannel(t *testing.T) {
channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorage()
defer cleanupWrapper(czCleanup, t)
refNames := []string{"top channel 1", "nested channel 1", "sub channel 2", "nested channel 3"}
ids := make([]int64, 4)
ids[0] = channelz.RegisterChannel(&dummyChannel{}, 0, refNames[0])
Expand Down Expand Up @@ -475,6 +499,9 @@ func TestGetChannel(t *testing.T) {
Desc: "Resolver returns an empty address list",
Severity: channelz.CtWarning,
})
for _, id := range ids {
defer channelz.RemoveEntry(id)
}
svr := newCZServer()
resp, _ := svr.GetChannel(context.Background(), &channelzpb.GetChannelRequest{ChannelId: ids[0]})
metrics := resp.GetChannel()
Expand Down Expand Up @@ -530,7 +557,8 @@ func TestGetSubChannel(t *testing.T) {
subchanConnectivityChange = fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready)
subChanPickNewAddress = fmt.Sprintf("Subchannel picks a new address %q to connect", "0.0.0.0")
)
channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorage()
defer cleanupWrapper(czCleanup, t)
refNames := []string{"top channel 1", "sub channel 1", "socket 1", "socket 2"}
ids := make([]int64, 4)
ids[0] = channelz.RegisterChannel(&dummyChannel{}, 0, refNames[0])
Expand All @@ -557,6 +585,9 @@ func TestGetSubChannel(t *testing.T) {
Desc: subChanPickNewAddress,
Severity: channelz.CtINFO,
})
for _, id := range ids {
defer channelz.RemoveEntry(id)
}
svr := newCZServer()
resp, _ := svr.GetSubchannel(context.Background(), &channelzpb.GetSubchannelRequest{SubchannelId: ids[1]})
metrics := resp.GetSubchannel()
Expand Down Expand Up @@ -598,7 +629,8 @@ func TestGetSubChannel(t *testing.T) {
}

func TestGetSocket(t *testing.T) {
channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorage()
defer cleanupWrapper(czCleanup, t)
ss := []*dummySocket{
{
streamsStarted: 10,
Expand Down Expand Up @@ -673,8 +705,10 @@ func TestGetSocket(t *testing.T) {
svr := newCZServer()
ids := make([]int64, len(ss))
svrID := channelz.RegisterServer(&dummyServer{}, "")
defer channelz.RemoveEntry(svrID)
for i, s := range ss {
ids[i] = channelz.RegisterNormalSocket(s, svrID, strconv.Itoa(i))
defer channelz.RemoveEntry(ids[i])
}
for i, s := range ss {
resp, _ := svr.GetSocket(context.Background(), &channelzpb.GetSocketRequest{SocketId: ids[i]})
Expand Down
30 changes: 29 additions & 1 deletion internal/channelz/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package channelz

import (
"fmt"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -95,9 +96,14 @@ func (d *dbWrapper) get() *channelMap {

// NewChannelzStorage initializes channelz data storage and id generator.
//
// This function returns a cleanup function to wait for all channelz state to be reset by the
// grpc goroutines when those entities get closed. By using this cleanup function, we make sure tests
// don't mess up each other, i.e. lingering goroutine from previous test doing entity removal happen
// to remove some entity just register by the new test, since the id space is the same.
//
// Note: This function is exported for testing purpose only. User should not call
// it in most cases.
func NewChannelzStorage() {
func NewChannelzStorage() (cleanup func() error) {
db.set(&channelMap{
topLevelChannels: make(map[int64]struct{}),
channels: make(map[int64]*channel),
Expand All @@ -107,6 +113,28 @@ func NewChannelzStorage() {
subChannels: make(map[int64]*subChannel),
})
idGen.reset()
return func() error {
var err error
cm := db.get()
if cm == nil {
return nil
}
for i := 0; i < 1000; i++ {
cm.mu.Lock()
if len(cm.topLevelChannels) == 0 && len(cm.servers) == 0 && len(cm.channels) == 0 && len(cm.subChannels) == 0 && len(cm.listenSockets) == 0 && len(cm.normalSockets) == 0 {
cm.mu.Unlock()
// all things stored in the channelz map have been cleared.
return nil
}
cm.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}

cm.mu.Lock()
err = fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets))
cm.mu.Unlock()
return err
}
}

// GetTopChannels returns a slice of top channel's ChannelMetric, along with a
Expand Down
3 changes: 2 additions & 1 deletion test/channelz_linux_go110_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func (s) TestCZSocketMetricsSocketOption(t *testing.T) {
}

func testCZSocketMetricsSocketOption(t *testing.T, e env) {
channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorage()
defer czCleanupWrapper(czCleanup, t)
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
defer te.tearDown()
Expand Down
Loading

0 comments on commit 42baa8b

Please sign in to comment.