Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't enqueue stream when receiving reordered MAX_STREAM_DATA frames #4269

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/flowcontrol/base_flow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ func (c *baseFlowController) AddBytesSent(n protocol.ByteCount) {
}

// UpdateSendWindow is called after receiving a MAX_{STREAM_}DATA frame.
func (c *baseFlowController) UpdateSendWindow(offset protocol.ByteCount) {
func (c *baseFlowController) UpdateSendWindow(offset protocol.ByteCount) (updated bool) {
if offset > c.sendWindow {
c.sendWindow = offset
return true
}
return false
}

func (c *baseFlowController) sendWindowSize() protocol.ByteCount {
Expand Down
4 changes: 2 additions & 2 deletions internal/flowcontrol/base_flow_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ var _ = Describe("Base Flow controller", func() {
})

It("does not decrease the flow control window", func() {
controller.UpdateSendWindow(20)
Expect(controller.UpdateSendWindow(20)).To(BeTrue())
Expect(controller.sendWindowSize()).To(Equal(protocol.ByteCount(20)))
controller.UpdateSendWindow(10)
Expect(controller.UpdateSendWindow(10)).To(BeFalse())
Expect(controller.sendWindowSize()).To(Equal(protocol.ByteCount(20)))
})

Expand Down
7 changes: 3 additions & 4 deletions internal/flowcontrol/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import "github.com/quic-go/quic-go/internal/protocol"
type flowController interface {
// for sending
SendWindowSize() protocol.ByteCount
UpdateSendWindow(protocol.ByteCount)
UpdateSendWindow(protocol.ByteCount) (updated bool)
AddBytesSent(protocol.ByteCount)
// for receiving
AddBytesRead(protocol.ByteCount)
Expand All @@ -16,12 +16,11 @@ type flowController interface {
// A StreamFlowController is a flow controller for a QUIC stream.
type StreamFlowController interface {
flowController
// for receiving
// UpdateHighestReceived should be called when a new highest offset is received
// UpdateHighestReceived is called when a new highest offset is received
// final has to be to true if this is the final offset of the stream,
// as contained in a STREAM frame with FIN bit, and the RESET_STREAM frame
UpdateHighestReceived(offset protocol.ByteCount, final bool) error
// Abandon should be called when reading from the stream is aborted early,
// Abandon is called when reading from the stream is aborted early,
// and there won't be any further calls to AddBytesRead.
Abandon()
}
Expand Down
14 changes: 8 additions & 6 deletions internal/mocks/connection_flow_controller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 8 additions & 6 deletions internal/mocks/stream_flow_controller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions send_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,13 @@ func (s *sendStream) cancelWriteImpl(errorCode qerr.StreamErrorCode, remote bool
}

func (s *sendStream) updateSendWindow(limit protocol.ByteCount) {
updated := s.flowController.UpdateSendWindow(limit)
if !updated { // duplicate or reordered MAX_STREAM_DATA frame
return
}
s.mutex.Lock()
hasStreamData := s.dataForWriting != nil || s.nextFrame != nil
s.mutex.Unlock()

s.flowController.UpdateSendWindow(limit)
if hasStreamData {
s.sender.onHasStreamData(s.streamID)
}
Expand Down
20 changes: 19 additions & 1 deletion send_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ var _ = Describe("Send Stream", func() {
})

It("says when it has data for sending", func() {
mockFC.EXPECT().UpdateSendWindow(gomock.Any())
mockFC.EXPECT().UpdateSendWindow(gomock.Any()).Return(true)
mockSender.EXPECT().onHasStreamData(streamID)
done := make(chan struct{})
go func() {
Expand All @@ -698,6 +698,24 @@ var _ = Describe("Send Stream", func() {
str.closeForShutdown(nil)
Eventually(done).Should(BeClosed())
})

It("doesn't say it has data for sending if the MAX_STREAM_DATA frame was reordered", func() {
mockFC.EXPECT().UpdateSendWindow(gomock.Any()).Return(false) // reordered frame
mockSender.EXPECT().onHasStreamData(streamID)
done := make(chan struct{})
go func() {
defer GinkgoRecover()
_, err := str.Write([]byte("foobar"))
Expect(err).ToNot(HaveOccurred())
close(done)
}()
waitForWrite()
// don't expect any calls to onHasStreamData
str.updateSendWindow(42)
// make sure the Write go routine returns
str.closeForShutdown(nil)
Eventually(done).Should(BeClosed())
})
})

Context("stream cancellations", func() {
Expand Down
Loading