Skip to content

Commit

Permalink
fix handling of CancelRead after receiving a RESET_STREAM
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Apr 26, 2024
1 parent 329224b commit 82beee9
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 26 deletions.
45 changes: 23 additions & 22 deletions receive_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ type receiveStream struct {
// Note that for local cancellations, this doesn't necessarily mean that we know the final offset yet.
errorRead bool
completed bool // set once we've called streamSender.onStreamCompleted
cancelledRemotely bool
cancelledLocally bool
cancelErr *StreamError
closeForShutdownErr error
cancelReadErr error
resetRemotelyErr *StreamError

readChan chan struct{}
readOnce chan struct{} // cap: 1, to protect against concurrent use of Read
Expand Down Expand Up @@ -94,9 +95,6 @@ func (s *receiveStream) Read(p []byte) (int, error) {
s.mutex.Unlock()

if completed {
if err != io.EOF {
s.flowController.Abandon()
}
s.sender.onStreamCompleted(s.streamID)
}
return n, err
Expand All @@ -106,7 +104,9 @@ func (s *receiveStream) isNewlyCompleted() bool {
// We're done with the stream once:
// 1. The application has consumed the io.EOF or the cancellation error
// 2. We know the final offset (for flow control accounting)
isNewlyCompleted := !s.completed && s.errorRead && s.finalOffset != protocol.MaxByteCount
fmt.Printf("cancelled locally: %t, cancelled remotely: %t, error read: %t\n", s.cancelledLocally, s.cancelledRemotely, s.errorRead)
isNewlyCompleted := !s.completed && s.finalOffset != protocol.MaxByteCount &&
(s.cancelledLocally || (s.cancelledRemotely && s.errorRead) || (!s.cancelledLocally && !s.cancelledRemotely && s.errorRead))
if isNewlyCompleted {
s.completed = true
}
Expand All @@ -117,11 +117,8 @@ func (s *receiveStream) readImpl(p []byte) (int, error) {
if s.currentFrameIsLast && s.currentFrame == nil {
return 0, io.EOF
}
if s.cancelReadErr != nil {
return 0, s.cancelReadErr
}
if s.resetRemotelyErr != nil {
return 0, s.resetRemotelyErr
if s.cancelledRemotely || s.cancelledLocally {
return 0, s.cancelErr
}
if s.closeForShutdownErr != nil {
return 0, s.closeForShutdownErr
Expand All @@ -142,11 +139,8 @@ func (s *receiveStream) readImpl(p []byte) (int, error) {
if s.closeForShutdownErr != nil {
return bytesRead, s.closeForShutdownErr
}
if s.cancelReadErr != nil {
return bytesRead, s.cancelReadErr
}
if s.resetRemotelyErr != nil {
return bytesRead, s.resetRemotelyErr
if s.cancelledRemotely || s.cancelledLocally {
return 0, s.cancelErr
}

deadline := s.deadline
Expand Down Expand Up @@ -194,7 +188,7 @@ func (s *receiveStream) readImpl(p []byte) (int, error) {

// when a RESET_STREAM was received, the flow controller was already
// informed about the final byteOffset for this stream
if s.resetRemotelyErr == nil {
if !s.cancelledRemotely {
s.flowController.AddBytesRead(protocol.ByteCount(m))
}

Expand Down Expand Up @@ -224,6 +218,7 @@ func (s *receiveStream) CancelRead(errorCode StreamErrorCode) {
s.mutex.Lock()
s.cancelReadImpl(errorCode)
completed := s.isNewlyCompleted()
// finalOffsetKnown := s.finalOffset != protocol.MaxByteCount
s.mutex.Unlock()

if completed {
Expand All @@ -233,10 +228,14 @@ func (s *receiveStream) CancelRead(errorCode StreamErrorCode) {
}

func (s *receiveStream) cancelReadImpl(errorCode qerr.StreamErrorCode) {
if s.errorRead || s.cancelReadErr != nil || s.resetRemotelyErr != nil {
if s.cancelledLocally { // duplicate call to CancelRead
return
}
s.cancelledLocally = true
if s.errorRead || s.cancelledRemotely {
return
}
s.cancelReadErr = &StreamError{StreamID: s.streamID, ErrorCode: errorCode, Remote: false}
s.cancelErr = &StreamError{StreamID: s.streamID, ErrorCode: errorCode, Remote: false}
s.signalRead()
s.sender.queueControlFrame(&wire.StopSendingFrame{
StreamID: s.streamID,
Expand Down Expand Up @@ -266,7 +265,7 @@ func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /*
newlyRcvdFinalOffset = s.finalOffset == protocol.MaxByteCount
s.finalOffset = maxOffset
}
if s.cancelReadErr != nil {
if s.cancelledLocally {
return newlyRcvdFinalOffset, nil
}
if err := s.frameQueue.Push(frame.Data, frame.Offset, frame.PutBack); err != nil {
Expand Down Expand Up @@ -298,10 +297,12 @@ func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame)
s.finalOffset = frame.FinalSize

// ignore duplicate RESET_STREAM frames for this stream (after checking their final offset)
if s.resetRemotelyErr != nil {
if s.cancelledRemotely {
return nil
}
s.resetRemotelyErr = &StreamError{
s.flowController.Abandon()
s.cancelledRemotely = true
s.cancelErr = &StreamError{
StreamID: s.streamID,
ErrorCode: frame.ErrorCode,
Remote: true,
Expand Down
12 changes: 8 additions & 4 deletions receive_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,10 +542,12 @@ var _ = Describe("Receive Stream", func() {

It("doesn't send a STOP_SENDING frame, if the stream was already reset", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true)
mockFC.EXPECT().Abandon().MinTimes(1)
Expect(str.handleResetStreamFrame(&wire.ResetStreamFrame{
StreamID: streamID,
FinalSize: 42,
})).To(Succeed())
mockSender.EXPECT().onStreamCompleted(gomock.Any())
str.CancelRead(1234)
})

Expand All @@ -557,9 +559,9 @@ var _ = Describe("Receive Stream", func() {
})).To(Succeed())
mockFC.EXPECT().Abandon()
mockSender.EXPECT().queueControlFrame(gomock.Any())
mockSender.EXPECT().onStreamCompleted(streamID)
str.CancelRead(1234)
// read the error
mockSender.EXPECT().onStreamCompleted(streamID)
n, err := str.Read([]byte{0})
Expect(err).To(HaveOccurred())
Expect(n).To(BeZero())
Expand Down Expand Up @@ -651,6 +653,7 @@ var _ = Describe("Receive Stream", func() {

It("ignores duplicate RESET_STREAM frames", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true).Times(2)
mockFC.EXPECT().Abandon()
Expect(str.handleResetStreamFrame(rst)).To(Succeed())
Expect(str.handleResetStreamFrame(rst)).To(Succeed())
})
Expand All @@ -663,13 +666,14 @@ var _ = Describe("Receive Stream", func() {
Fin: true,
})).To(Succeed())
mockSender.EXPECT().queueControlFrame(gomock.Any())
mockFC.EXPECT().Abandon().MinTimes(1)
mockSender.EXPECT().onStreamCompleted(streamID)
str.CancelRead(1234)
mockFC.EXPECT().Abandon()
Expect(str.handleResetStreamFrame(rst)).To(Succeed())
// now read the error
mockSender.EXPECT().onStreamCompleted(streamID)
_, err := str.Read([]byte{0})
n, err := str.Read([]byte{0})
Expect(err).To(HaveOccurred())
Expect(n).To(BeZero())
})

It("doesn't do anything when it was closed for shutdown", func() {
Expand Down

0 comments on commit 82beee9

Please sign in to comment.