Skip to content

Commit

Permalink
use a ring buffer for the datagram queue (#4223)
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jan 1, 2024
1 parent 1fce81f commit 22b7f77
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 13 deletions.
18 changes: 8 additions & 10 deletions datagram_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

"github.com/quic-go/quic-go/internal/utils"
"github.com/quic-go/quic-go/internal/utils/ringbuffer"
"github.com/quic-go/quic-go/internal/wire"
)

Expand All @@ -15,8 +16,8 @@ const (

type datagramQueue struct {
sendMx sync.Mutex
sendQueue []*wire.DatagramFrame // TODO: this could be a ring buffer
sent chan struct{} // used to notify Add that a datagram was dequeued
sendQueue ringbuffer.RingBuffer[*wire.DatagramFrame]
sent chan struct{} // used to notify Add that a datagram was dequeued

rcvMx sync.Mutex
rcvQueue [][]byte
Expand Down Expand Up @@ -47,8 +48,8 @@ func (h *datagramQueue) Add(f *wire.DatagramFrame) error {
h.sendMx.Lock()

for {
if len(h.sendQueue) < maxDatagramSendQueueLen {
h.sendQueue = append(h.sendQueue, f)
if h.sendQueue.Len() < maxDatagramSendQueueLen {
h.sendQueue.PushBack(f)
h.sendMx.Unlock()
h.hasData()
return nil
Expand All @@ -72,19 +73,16 @@ func (h *datagramQueue) Add(f *wire.DatagramFrame) error {
func (h *datagramQueue) Peek() *wire.DatagramFrame {
h.sendMx.Lock()
defer h.sendMx.Unlock()
if len(h.sendQueue) == 0 {
if h.sendQueue.Empty() {
return nil
}
return h.sendQueue[0]
return h.sendQueue.PeekFront()
}

func (h *datagramQueue) Pop() {
h.sendMx.Lock()
defer h.sendMx.Unlock()
if len(h.sendQueue) == 0 {
panic("datagramQueue BUG: Pop called for nil frame")
}
h.sendQueue = h.sendQueue[1:]
_ = h.sendQueue.PopFront()
select {
case h.sent <- struct{}{}:
default:
Expand Down
12 changes: 11 additions & 1 deletion internal/utils/ringbuffer/ringbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type RingBuffer[T any] struct {
full bool
}

// Init preallocs a buffer with a certain size.
// Init preallocates a buffer with a certain size.
func (r *RingBuffer[T]) Init(size int) {
r.ring = make([]T, size)
}
Expand Down Expand Up @@ -62,6 +62,16 @@ func (r *RingBuffer[T]) PopFront() T {
return t
}

// PeekFront returns the next element.
// It must not be called when the buffer is empty, that means that
// callers might need to check if there are elements in the buffer first.
func (r *RingBuffer[T]) PeekFront() T {
if r.Empty() {
panic("github.com/quic-go/quic-go/internal/utils/ringbuffer: peek from an empty queue")
}
return r.ring[r.headPos]
}

// Grow the maximum size of the queue.
// This method assume the queue is full.
func (r *RingBuffer[T]) grow() {
Expand Down
16 changes: 14 additions & 2 deletions internal/utils/ringbuffer/ringbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ import (
)

var _ = Describe("RingBuffer", func() {
It("push and pop", func() {
It("push, peek and pop", func() {
r := RingBuffer[int]{}
Expect(len(r.ring)).To(Equal(0))
Expect(func() { r.PopFront() }).To(Panic())
r.PushBack(1)
r.PushBack(2)
r.PushBack(3)
Expect(r.PeekFront()).To(Equal(1))
Expect(r.PeekFront()).To(Equal(1))
Expect(r.PopFront()).To(Equal(1))
Expect(r.PeekFront()).To(Equal(2))
Expect(r.PopFront()).To(Equal(2))
r.PushBack(4)
r.PushBack(5)
Expand All @@ -25,7 +28,16 @@ var _ = Describe("RingBuffer", func() {
Expect(r.PopFront()).To(Equal(5))
Expect(r.PopFront()).To(Equal(6))
})
It("clear", func() {

It("panics when Peek or Pop are called on an empty buffer", func() {
r := RingBuffer[string]{}
Expect(r.Empty()).To(BeTrue())
Expect(r.Len()).To(BeZero())
Expect(func() { r.PeekFront() }).To(Panic())
Expect(func() { r.PopFront() }).To(Panic())
})

It("clearing", func() {
r := RingBuffer[int]{}
r.Init(2)
r.PushBack(1)
Expand Down

0 comments on commit 22b7f77

Please sign in to comment.