diff --git a/call.go b/call.go index a67a3db02eb4..788c89c16f96 100644 --- a/call.go +++ b/call.go @@ -27,11 +27,6 @@ import ( // // All errors returned by Invoke are compatible with the status package. func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error { - if err := cc.idlenessMgr.OnCallBegin(); err != nil { - return err - } - defer cc.idlenessMgr.OnCallEnd() - // allow interceptor to see all applicable call options, which means those // configured as defaults from dial option as well as per-call options opts = combine(cc.dopts.callOptions, opts) diff --git a/clientconn.go b/clientconn.go index d53d91d5d9f3..ff7fea102288 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1091,8 +1091,8 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) { ac.cancel() ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx) - // We have to defer here because GracefulClose => Close => onClose, which - // requires locking ac.mu. + // We have to defer here because GracefulClose => onClose, which requires + // locking ac.mu. if ac.transport != nil { defer ac.transport.GracefulClose() ac.transport = nil @@ -1680,16 +1680,7 @@ func (ac *addrConn) tearDown(err error) { ac.updateConnectivityState(connectivity.Shutdown, nil) ac.cancel() ac.curAddr = resolver.Address{} - if err == errConnDrain && curTr != nil { - // GracefulClose(...) may be executed multiple times when - // i) receiving multiple GoAway frames from the server; or - // ii) there are concurrent name resolver/Balancer triggered - // address removal and GoAway. - // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu. - ac.mu.Unlock() - curTr.GracefulClose() - ac.mu.Lock() - } + channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{ Desc: "Subchannel deleted", Severity: channelz.CtInfo, @@ -1703,6 +1694,29 @@ func (ac *addrConn) tearDown(err error) { // being deleted right away. channelz.RemoveEntry(ac.channelzID) ac.mu.Unlock() + + // We have to release the lock before the call to GracefulClose/Close here + // because both of them call onClose(), which requires locking ac.mu. + if curTr != nil { + if err == errConnDrain { + // Close the transport gracefully when the subConn is being shutdown. + // + // GracefulClose() may be executed multiple times if: + // - multiple GoAway frames are received from the server + // - there are concurrent name resolver or balancer triggered + // address removal and GoAway + curTr.GracefulClose() + } else { + // Hard close the transport when the channel is entering idle or is + // being shutdown. In the case where the channel is being shutdown, + // closing of transports is also taken care of by cancelation of cc.ctx. + // But in the case where the channel is entering idle, we need to + // explicitly close the transports here. Instead of distinguishing + // between these two cases, it is simpler to close the transport + // unconditionally here. + curTr.Close(err) + } + } } func (ac *addrConn) getState() connectivity.State { diff --git a/internal/idle/idle_e2e_test.go b/internal/idle/idle_e2e_test.go index de88046bc362..84b4ba7bba34 100644 --- a/internal/idle/idle_e2e_test.go +++ b/internal/idle/idle_e2e_test.go @@ -22,6 +22,7 @@ import ( "context" "errors" "fmt" + "io" "strings" "testing" "time" @@ -141,7 +142,8 @@ func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) { } // Tests the case where channel idleness is enabled by passing a small value for -// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE. +// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE, and +// the connection to the backend is closed. func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) { // Create a ClientConn with a short idle_timeout. r := manual.NewBuilderWithScheme("whatever") @@ -158,7 +160,8 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) { t.Cleanup(func() { cc.Close() }) // Start a test backend and push an address update via the resolver. - backend := stubserver.StartTestService(t, nil) + lis := testutils.NewListenerWrapper(t, nil) + backend := stubserver.StartTestService(t, &stubserver.StubServer{Listener: lis}) t.Cleanup(backend.Stop) r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) @@ -167,6 +170,13 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) { defer cancel() testutils.AwaitState(ctx, t, cc, connectivity.Ready) + // Retrieve the wrapped conn from the listener. + v, err := lis.NewConnCh.Receive(ctx) + if err != nil { + t.Fatalf("Failed to retrieve conn from test listener: %v", err) + } + conn := v.(*testutils.ConnWrapper) + // Verify that the ClientConn moves to IDLE as there is no activity. testutils.AwaitState(ctx, t, cc, connectivity.Idle) @@ -174,85 +184,123 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) { if err := channelzTraceEventFound(ctx, "entering idle mode"); err != nil { t.Fatal(err) } + + // Verify that the previously open connection is closed. + if _, err := conn.CloseCh.Receive(ctx); err != nil { + t.Fatalf("Failed when waiting for connection to be closed after channel entered IDLE: %v", err) + } } // Tests the case where channel idleness is enabled by passing a small value for // idle_timeout. Verifies that a READY channel with an ongoing RPC stays READY. func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) { - // Create a ClientConn with a short idle_timeout. - r := manual.NewBuilderWithScheme("whatever") - dopts := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithResolvers(r), - grpc.WithIdleTimeout(defaultTestShortIdleTimeout), - grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), - } - cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) - if err != nil { - t.Fatalf("grpc.Dial() failed: %v", err) - } - t.Cleanup(func() { cc.Close() }) - - // Start a test backend which keeps a unary RPC call active by blocking on a - // channel that is closed by the test later on. Also push an address update - // via the resolver. - blockCh := make(chan struct{}) - backend := &stubserver.StubServer{ - EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { - <-blockCh - return &testpb.Empty{}, nil + tests := []struct { + name string + makeRPC func(ctx context.Context, client testgrpc.TestServiceClient) error + }{ + { + name: "unary", + makeRPC: func(ctx context.Context, client testgrpc.TestServiceClient) error { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + return fmt.Errorf("EmptyCall RPC failed: %v", err) + } + return nil + }, + }, + { + name: "streaming", + makeRPC: func(ctx context.Context, client testgrpc.TestServiceClient) error { + stream, err := client.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("FullDuplexCall RPC failed: %v", err) + } + if _, err := stream.Recv(); err != nil && err != io.EOF { + t.Fatalf("stream.Recv() failed: %v", err) + } + return nil + }, }, } - if err := backend.StartServer(); err != nil { - t.Fatalf("Failed to start backend: %v", err) - } - t.Cleanup(backend.Stop) - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) - - // Verify that the ClientConn moves to READY. - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - testutils.AwaitState(ctx, t, cc, connectivity.Ready) - // Spawn a goroutine which checks expected state transitions and idleness - // channelz trace events. It eventually closes `blockCh`, thereby unblocking - // the server RPC handler and the unary call below. - errCh := make(chan error, 1) - go func() { - defer close(blockCh) - // Verify that the ClientConn stays in READY. - sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout) - defer sCancel() - testutils.AwaitNoStateChange(sCtx, t, cc, connectivity.Ready) - - // Verify that there are no idleness related channelz events. - if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil { - errCh <- err - return - } - if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil { - errCh <- err - return - } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // Create a ClientConn with a short idle_timeout. + r := manual.NewBuilderWithScheme("whatever") + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithIdleTimeout(defaultTestShortIdleTimeout), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + // Start a test backend which keeps a unary RPC call active by blocking on a + // channel that is closed by the test later on. Also push an address update + // via the resolver. + blockCh := make(chan struct{}) + backend := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + <-blockCh + return &testpb.Empty{}, nil + }, + FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { + <-blockCh + return nil + }, + } + if err := backend.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + t.Cleanup(backend.Stop) + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) + + // Verify that the ClientConn moves to READY. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + testutils.AwaitState(ctx, t, cc, connectivity.Ready) + + // Spawn a goroutine which checks expected state transitions and idleness + // channelz trace events. + errCh := make(chan error, 1) + go func() { + defer close(blockCh) + + // Verify that the ClientConn stays in READY. + sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout) + defer sCancel() + if cc.WaitForStateChange(sCtx, connectivity.Ready) { + errCh <- fmt.Errorf("state changed from %q to %q when no state change was expected", connectivity.Ready, cc.GetState()) + return + } - // Unblock the unary RPC on the server. - errCh <- nil - }() + // Verify that there are no idleness related channelz events. + // + // TODO: Improve the checks here. If these log strings are + // changed in the code, these checks will continue to pass. + if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil { + errCh <- err + return + } + errCh <- channelzTraceEventNotFound(ctx, "exiting idle mode") + }() - // Make a unary RPC that blocks on the server, thereby ensuring that the - // count of active RPCs on the client is non-zero. - client := testgrpc.NewTestServiceClient(cc) - if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { - t.Errorf("EmptyCall RPC failed: %v", err) - } + if err := test.makeRPC(ctx, testgrpc.NewTestServiceClient(cc)); err != nil { + t.Fatalf("%s rpc failed: %v", test.name, err) + } - select { - case err := <-errCh: - if err != nil { - t.Fatal(err) - } - case <-ctx.Done(): - t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE") + select { + case err := <-errCh: + if err != nil { + t.Fatal(err) + } + case <-ctx.Done(): + t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE") + } + }) } } diff --git a/stream.go b/stream.go index 421a41f8854f..b14b2fbea2eb 100644 --- a/stream.go +++ b/stream.go @@ -158,11 +158,6 @@ type ClientStream interface { // If none of the above happen, a goroutine and a context will be leaked, and grpc // will not call the optionally-configured stats handler with a stats.End message. func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { - if err := cc.idlenessMgr.OnCallBegin(); err != nil { - return nil, err - } - defer cc.idlenessMgr.OnCallEnd() - // allow interceptor to see all applicable call options, which means those // configured as defaults from dial option as well as per-call options opts = combine(cc.dopts.callOptions, opts) @@ -179,6 +174,16 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { + // Start tracking the RPC for idleness purposes. This is where a stream is + // created for both streaming and unary RPCs, and hence is a good place to + // track active RPC count. + if err := cc.idlenessMgr.OnCallBegin(); err != nil { + return nil, err + } + // Add a calloption, to decrement the active call count, that gets executed + // when the RPC completes. + opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...) + if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok { // validate md if err := imetadata.Validate(md); err != nil {