Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue up to 32 DATAGRAM frames to send #4222

Merged
merged 1 commit into from
Jan 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
queue up to 32 DATAGRAM frames to send
  • Loading branch information
marten-seemann committed Jan 1, 2024
commit 93d8263559ae31cb38e42071a1f9499a7b302bea
2 changes: 1 addition & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2359,7 +2359,7 @@ func (s *connection) SendDatagram(p []byte) error {
}
f.Data = make([]byte, len(p))
copy(f.Data, p)
return s.datagramQueue.AddAndWait(f)
return s.datagramQueue.Add(f)
}

func (s *connection) ReceiveDatagram(ctx context.Context) ([]byte, error) {
Expand Down
87 changes: 50 additions & 37 deletions datagram_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ import (
"context"
"sync"

"github.com/quic-go/quic-go/internal/protocol"
"github.com/quic-go/quic-go/internal/utils"
"github.com/quic-go/quic-go/internal/wire"
)

const (
maxDatagramSendQueueLen = 32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this configurable to let the users experiment with different queue size?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really want to add additional API surface here, but if your experimentation finds that this queue is too small, we can increase the queue size.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

magic environment variables would be useful here. Can label it alpha or beta to let people know it's going away

maxDatagramRcvQueueLen = 128
)

type datagramQueue struct {
sendQueue chan *wire.DatagramFrame
nextFrame *wire.DatagramFrame
sendMx sync.Mutex
sendQueue []*wire.DatagramFrame // TODO: this could be a ring buffer
sent chan struct{} // used to notify Add that a datagram was dequeued

rcvMx sync.Mutex
rcvQueue [][]byte
Expand All @@ -22,60 +27,68 @@ type datagramQueue struct {

hasData func()

dequeued chan struct{}

logger utils.Logger
}

func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue {
return &datagramQueue{
hasData: hasData,
sendQueue: make(chan *wire.DatagramFrame, 1),
rcvd: make(chan struct{}, 1),
dequeued: make(chan struct{}),
closed: make(chan struct{}),
logger: logger,
hasData: hasData,
rcvd: make(chan struct{}, 1),
sent: make(chan struct{}, 1),
closed: make(chan struct{}),
logger: logger,
}
}

// AddAndWait queues a new DATAGRAM frame for sending.
// It blocks until the frame has been dequeued.
func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error {
select {
case h.sendQueue <- f:
h.hasData()
case <-h.closed:
return h.closeErr
}
// Add queues a new DATAGRAM frame for sending.
// Up to 32 DATAGRAM frames will be queued.
// Once that limit is reached, Add blocks until the queue size has reduced.
func (h *datagramQueue) Add(f *wire.DatagramFrame) error {
h.sendMx.Lock()

select {
case <-h.dequeued:
return nil
case <-h.closed:
return h.closeErr
for {
if len(h.sendQueue) < maxDatagramSendQueueLen {
h.sendQueue = append(h.sendQueue, f)
h.sendMx.Unlock()
h.hasData()
return nil
}
select {
case <-h.sent: // drain the queue so we don't loop immediately
default:
}
h.sendMx.Unlock()
select {
case <-h.closed:
return h.closeErr
case <-h.sent:
}
h.sendMx.Lock()
}
}

// Peek gets the next DATAGRAM frame for sending.
// If actually sent out, Pop needs to be called before the next call to Peek.
func (h *datagramQueue) Peek() *wire.DatagramFrame {
if h.nextFrame != nil {
return h.nextFrame
}
select {
case h.nextFrame = <-h.sendQueue:
h.dequeued <- struct{}{}
default:
h.sendMx.Lock()
defer h.sendMx.Unlock()
if len(h.sendQueue) == 0 {
return nil
}
return h.nextFrame
return h.sendQueue[0]
}

func (h *datagramQueue) Pop() {
if h.nextFrame == nil {
h.sendMx.Lock()
defer h.sendMx.Unlock()
if len(h.sendQueue) == 0 {
panic("datagramQueue BUG: Pop called for nil frame")
}
h.nextFrame = nil
h.sendQueue = h.sendQueue[1:]
select {
case h.sent <- struct{}{}:
default:
}
}

// HandleDatagramFrame handles a received DATAGRAM frame.
Expand All @@ -84,7 +97,7 @@ func (h *datagramQueue) HandleDatagramFrame(f *wire.DatagramFrame) {
copy(data, f.Data)
var queued bool
h.rcvMx.Lock()
if len(h.rcvQueue) < protocol.DatagramRcvQueueLen {
if len(h.rcvQueue) < maxDatagramRcvQueueLen {
h.rcvQueue = append(h.rcvQueue, data)
queued = true
select {
Expand All @@ -94,7 +107,7 @@ func (h *datagramQueue) HandleDatagramFrame(f *wire.DatagramFrame) {
}
h.rcvMx.Unlock()
if !queued && h.logger.Debug() {
h.logger.Debugf("Discarding DATAGRAM frame (%d bytes payload)", len(f.Data))
h.logger.Debugf("Discarding received DATAGRAM frame (%d bytes payload)", len(f.Data))
}
}

Expand Down
59 changes: 35 additions & 24 deletions datagram_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package quic
import (
"context"
"errors"
"time"

"github.com/quic-go/quic-go/internal/utils"
"github.com/quic-go/quic-go/internal/wire"
Expand All @@ -26,55 +27,65 @@ var _ = Describe("Datagram Queue", func() {
})

It("queues a datagram", func() {
done := make(chan struct{})
frame := &wire.DatagramFrame{Data: []byte("foobar")}
go func() {
defer GinkgoRecover()
defer close(done)
Expect(queue.AddAndWait(frame)).To(Succeed())
}()

Eventually(queued).Should(HaveLen(1))
Consistently(done).ShouldNot(BeClosed())
Expect(queue.Add(frame)).To(Succeed())
Expect(queued).To(HaveLen(1))
f := queue.Peek()
Expect(f.Data).To(Equal([]byte("foobar")))
Eventually(done).Should(BeClosed())
queue.Pop()
Expect(queue.Peek()).To(BeNil())
})

It("returns the same datagram multiple times, when Pop isn't called", func() {
sent := make(chan struct{}, 1)
It("blocks when the maximum number of datagrams have been queued", func() {
for i := 0; i < maxDatagramSendQueueLen; i++ {
Expect(queue.Add(&wire.DatagramFrame{Data: []byte{0}})).To(Succeed())
}
errChan := make(chan error, 1)
go func() {
defer GinkgoRecover()
Expect(queue.AddAndWait(&wire.DatagramFrame{Data: []byte("foo")})).To(Succeed())
sent <- struct{}{}
Expect(queue.AddAndWait(&wire.DatagramFrame{Data: []byte("bar")})).To(Succeed())
sent <- struct{}{}
errChan <- queue.Add(&wire.DatagramFrame{Data: []byte("foobar")})
}()
Consistently(errChan, 50*time.Millisecond).ShouldNot(Receive())
Expect(queue.Peek()).ToNot(BeNil())
Consistently(errChan, 50*time.Millisecond).ShouldNot(Receive())
queue.Pop()
Eventually(errChan).Should(Receive(BeNil()))
for i := 1; i < maxDatagramSendQueueLen; i++ {
queue.Pop()
}
f := queue.Peek()
Expect(f).ToNot(BeNil())
Expect(f.Data).To(Equal([]byte("foobar")))
})

It("returns the same datagram multiple times, when Pop isn't called", func() {
Expect(queue.Add(&wire.DatagramFrame{Data: []byte("foo")})).To(Succeed())
Expect(queue.Add(&wire.DatagramFrame{Data: []byte("bar")})).To(Succeed())

Eventually(queued).Should(HaveLen(1))
Eventually(queued).Should(HaveLen(2))
f := queue.Peek()
Expect(f.Data).To(Equal([]byte("foo")))
Eventually(sent).Should(Receive())
Expect(queue.Peek()).To(Equal(f))
Expect(queue.Peek()).To(Equal(f))
queue.Pop()
Eventually(func() *wire.DatagramFrame { f = queue.Peek(); return f }).ShouldNot(BeNil())
f = queue.Peek()
Expect(f).ToNot(BeNil())
Expect(f.Data).To(Equal([]byte("bar")))
})

It("closes", func() {
for i := 0; i < maxDatagramSendQueueLen; i++ {
Expect(queue.Add(&wire.DatagramFrame{Data: []byte("foo")})).To(Succeed())
}
errChan := make(chan error, 1)
go func() {
defer GinkgoRecover()
errChan <- queue.AddAndWait(&wire.DatagramFrame{Data: []byte("foobar")})
errChan <- queue.Add(&wire.DatagramFrame{Data: []byte("foo")})
}()

Consistently(errChan).ShouldNot(Receive())
queue.CloseWithError(errors.New("test error"))
Eventually(errChan).Should(Receive(MatchError("test error")))
Consistently(errChan, 25*time.Millisecond).ShouldNot(Receive())
testErr := errors.New("test error")
queue.CloseWithError(testErr)
Eventually(errChan).Should(Receive(MatchError(testErr)))
})
})

Expand Down
3 changes: 0 additions & 3 deletions internal/protocol/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,6 @@ const MaxPostHandshakeCryptoFrameSize = 1000
// but must ensure that a maximum size ACK frame fits into one packet.
const MaxAckFrameSize ByteCount = 1000

// DatagramRcvQueueLen is the length of the receive queue for DATAGRAM frames (RFC 9221)
const DatagramRcvQueueLen = 128

// MaxNumAckRanges is the maximum number of ACK ranges that we send in an ACK frame.
// It also serves as a limit for the packet history.
// If at any point we keep track of more ranges, old ranges are discarded.
Expand Down
6 changes: 3 additions & 3 deletions packet_packer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ var _ = Describe("Packet packer", func() {
go func() {
defer GinkgoRecover()
defer close(done)
datagramQueue.AddAndWait(f)
datagramQueue.Add(f)
}()
// make sure the DATAGRAM has actually been queued
time.Sleep(scaleDuration(20 * time.Millisecond))
Expand Down Expand Up @@ -630,7 +630,7 @@ var _ = Describe("Packet packer", func() {
go func() {
defer GinkgoRecover()
defer close(done)
datagramQueue.AddAndWait(f)
datagramQueue.Add(f)
}()
// make sure the DATAGRAM has actually been queued
time.Sleep(scaleDuration(20 * time.Millisecond))
Expand Down Expand Up @@ -659,7 +659,7 @@ var _ = Describe("Packet packer", func() {
go func() {
defer GinkgoRecover()
defer close(done)
datagramQueue.AddAndWait(f)
datagramQueue.Add(f)
}()
// make sure the DATAGRAM has actually been queued
time.Sleep(scaleDuration(20 * time.Millisecond))
Expand Down