Skip to content

Commit

Permalink
don't enqueue stream for sending on reordered MAX_STREAM_DATA frames (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Feb 3, 2024
1 parent 07ec324 commit 198de32
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 22 deletions.
4 changes: 3 additions & 1 deletion internal/flowcontrol/base_flow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ func (c *baseFlowController) AddBytesSent(n protocol.ByteCount) {
}

// UpdateSendWindow is called after receiving a MAX_{STREAM_}DATA frame.
func (c *baseFlowController) UpdateSendWindow(offset protocol.ByteCount) {
func (c *baseFlowController) UpdateSendWindow(offset protocol.ByteCount) (updated bool) {
if offset > c.sendWindow {
c.sendWindow = offset
return true
}
return false
}

func (c *baseFlowController) sendWindowSize() protocol.ByteCount {
Expand Down
4 changes: 2 additions & 2 deletions internal/flowcontrol/base_flow_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ var _ = Describe("Base Flow controller", func() {
})

It("does not decrease the flow control window", func() {
controller.UpdateSendWindow(20)
Expect(controller.UpdateSendWindow(20)).To(BeTrue())
Expect(controller.sendWindowSize()).To(Equal(protocol.ByteCount(20)))
controller.UpdateSendWindow(10)
Expect(controller.UpdateSendWindow(10)).To(BeFalse())
Expect(controller.sendWindowSize()).To(Equal(protocol.ByteCount(20)))
})

Expand Down
7 changes: 3 additions & 4 deletions internal/flowcontrol/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import "github.com/quic-go/quic-go/internal/protocol"
type flowController interface {
// for sending
SendWindowSize() protocol.ByteCount
UpdateSendWindow(protocol.ByteCount)
UpdateSendWindow(protocol.ByteCount) (updated bool)
AddBytesSent(protocol.ByteCount)
// for receiving
AddBytesRead(protocol.ByteCount)
Expand All @@ -16,12 +16,11 @@ type flowController interface {
// A StreamFlowController is a flow controller for a QUIC stream.
type StreamFlowController interface {
flowController
// for receiving
// UpdateHighestReceived should be called when a new highest offset is received
// UpdateHighestReceived is called when a new highest offset is received
// final has to be to true if this is the final offset of the stream,
// as contained in a STREAM frame with FIN bit, and the RESET_STREAM frame
UpdateHighestReceived(offset protocol.ByteCount, final bool) error
// Abandon should be called when reading from the stream is aborted early,
// Abandon is called when reading from the stream is aborted early,
// and there won't be any further calls to AddBytesRead.
Abandon()
}
Expand Down
14 changes: 8 additions & 6 deletions internal/mocks/connection_flow_controller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 8 additions & 6 deletions internal/mocks/stream_flow_controller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions send_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,13 @@ func (s *sendStream) cancelWriteImpl(errorCode qerr.StreamErrorCode, remote bool
}

func (s *sendStream) updateSendWindow(limit protocol.ByteCount) {
updated := s.flowController.UpdateSendWindow(limit)
if !updated { // duplicate or reordered MAX_STREAM_DATA frame
return
}
s.mutex.Lock()
hasStreamData := s.dataForWriting != nil || s.nextFrame != nil
s.mutex.Unlock()

s.flowController.UpdateSendWindow(limit)
if hasStreamData {
s.sender.onHasStreamData(s.streamID)
}
Expand Down
20 changes: 19 additions & 1 deletion send_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ var _ = Describe("Send Stream", func() {
})

It("says when it has data for sending", func() {
mockFC.EXPECT().UpdateSendWindow(gomock.Any())
mockFC.EXPECT().UpdateSendWindow(gomock.Any()).Return(true)
mockSender.EXPECT().onHasStreamData(streamID)
done := make(chan struct{})
go func() {
Expand All @@ -698,6 +698,24 @@ var _ = Describe("Send Stream", func() {
str.closeForShutdown(nil)
Eventually(done).Should(BeClosed())
})

It("doesn't say it has data for sending if the MAX_STREAM_DATA frame was reordered", func() {
mockFC.EXPECT().UpdateSendWindow(gomock.Any()).Return(false) // reordered frame
mockSender.EXPECT().onHasStreamData(streamID)
done := make(chan struct{})
go func() {
defer GinkgoRecover()
_, err := str.Write([]byte("foobar"))
Expect(err).ToNot(HaveOccurred())
close(done)
}()
waitForWrite()
// don't expect any calls to onHasStreamData
str.updateSendWindow(42)
// make sure the Write go routine returns
str.closeForShutdown(nil)
Eventually(done).Should(BeClosed())
})
})

Context("stream cancellations", func() {
Expand Down

0 comments on commit 198de32

Please sign in to comment.