Skip to content

Commit

Permalink
quic: create and accept streams
Browse files Browse the repository at this point in the history
Add minimal API surface for creating streams,
basic loop for sending stream-related frames.

No limits, data, or lifetime management yet.

RFC 9000, Sections 2 and 3.

For golang/go#58547

Change-Id: I2c167b9363d0121b8a8776309d165b0f47f8f090
Reviewed-on: https://go-review.googlesource.com/c/net/+/514115
Reviewed-by: Jonathan Amsterdam <[email protected]>
TryBot-Result: Gopher Robot <[email protected]>
Run-TryBot: Damien Neil <[email protected]>
  • Loading branch information
neild committed Aug 2, 2023
1 parent 8ffa475 commit 167593b
Show file tree
Hide file tree
Showing 12 changed files with 637 additions and 2 deletions.
17 changes: 16 additions & 1 deletion internal/quic/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Conn struct {
acks [numberSpaceCount]ackState // indexed by number space
connIDState connIDState
loss lossState
streams streamsState

// errForPeer is set when the connection is being closed.
errForPeer error
Expand Down Expand Up @@ -105,6 +106,7 @@ func newConn(now time.Time, side connSide, initialConnID []byte, peerAddr netip.
// TODO: PMTU discovery.
const maxDatagramSize = 1200
c.loss.init(c.side, maxDatagramSize, now)
c.streamsInit()

c.startTLS(now, initialConnID, transportParameters{
initialSrcConnID: c.connIDState.srcConnID(),
Expand Down Expand Up @@ -178,7 +180,10 @@ func (c *Conn) receiveTransportParameters(p transportParameters) error {
return nil
}

type timerEvent struct{}
type (
timerEvent struct{}
wakeEvent struct{}
)

// loop is the connection main loop.
//
Expand Down Expand Up @@ -250,6 +255,8 @@ func (c *Conn) loop(now time.Time) {
return
}
c.loss.advance(now, c.handleAckOrLoss)
case wakeEvent:
// We're being woken up to try sending some frames.
case func(time.Time, *Conn):
// Send a func to msgc to run it on the main Conn goroutine
m(now, c)
Expand All @@ -269,6 +276,14 @@ func (c *Conn) sendMsg(m any) {
}
}

// wake wakes up the conn's loop.
func (c *Conn) wake() {
select {
case c.msgc <- wakeEvent{}:
default:
}
}

// runOnLoop executes a function within the conn's loop goroutine.
func (c *Conn) runOnLoop(f func(now time.Time, c *Conn)) error {
donec := make(chan struct{})
Expand Down
10 changes: 10 additions & 0 deletions internal/quic/conn_loss.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ func (c *Conn) handleAckOrLoss(space numberSpace, sent *sentPacket, fate packetF
case frameTypeCrypto:
start, end := sent.nextRange()
c.crypto[space].ackOrLoss(start, end, fate)
case frameTypeStreamBase,
frameTypeStreamBase | streamFinBit:
id := streamID(sent.nextInt())
start, end := sent.nextRange()
s := c.streamForID(id)
if s == nil {
continue
}
fin := f&streamFinBit != 0
s.ackOrLossData(sent.num, start, end, fin, fate)
case frameTypeNewConnectionID:
seq := int64(sent.nextInt())
c.connIDState.ackOrLossNewConnectionID(sent.num, seq, fate)
Expand Down
32 changes: 32 additions & 0 deletions internal/quic/conn_loss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,38 @@ func TestLostCRYPTOFrame(t *testing.T) {
})
}

func TestLostStreamFrameEmpty(t *testing.T) {
// A STREAM frame opening a stream, but containing no stream data, should
// be retransmitted if lost.
lostFrameTest(t, func(t *testing.T, pto bool) {
ctx := canceledContext()
tc := newTestConn(t, clientSide, func(p *transportParameters) {
p.initialMaxStreamDataBidiRemote = 100
})
tc.handshake()
tc.ignoreFrame(frameTypeAck)

c, err := tc.conn.NewStream(ctx)
if err != nil {
t.Fatalf("NewStream: %v", err)
}
c.Write(nil) // open the stream
tc.wantFrame("created bidirectional stream 0",
packetType1RTT, debugFrameStream{
id: newStreamID(clientSide, bidiStream, 0),
data: []byte{},
})

tc.triggerLossOrPTO(packetType1RTT, pto)
tc.wantFrame("resent stream frame",
packetType1RTT, debugFrameStream{
id: newStreamID(clientSide, bidiStream, 0),
data: []byte{},
})
})

}

func TestLostNewConnectionIDFrame(t *testing.T) {
// "New connection IDs are [...] retransmitted if the packet containing them is lost."
// https://www.rfc-editor.org/rfc/rfc9000#section-13.3-3.13
Expand Down
15 changes: 14 additions & 1 deletion internal/quic/conn_recv.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (c *Conn) handleFrames(now time.Time, ptype packetType, space numberSpace,
if !frameOK(c, ptype, __01) {
return
}
_, _, _, _, n = consumeStreamFrame(payload)
n = c.handleStreamFrame(now, space, payload)
case frameTypeMaxData:
if !frameOK(c, ptype, __01) {
return
Expand Down Expand Up @@ -290,6 +290,19 @@ func (c *Conn) handleCryptoFrame(now time.Time, space numberSpace, payload []byt
return n
}

func (c *Conn) handleStreamFrame(now time.Time, space numberSpace, payload []byte) int {
id, off, fin, b, n := consumeStreamFrame(payload)
if n < 0 {
return -1
}
if s := c.streamForFrame(now, id, recvStream); s != nil {
if err := s.handleData(off, b, fin); err != nil {
c.abort(now, err)
}
}
return n
}

func (c *Conn) handleNewConnectionIDFrame(now time.Time, space numberSpace, payload []byte) int {
seq, retire, connID, resetToken, n := consumeNewConnectionIDFrame(payload)
if n < 0 {
Expand Down
7 changes: 7 additions & 0 deletions internal/quic/conn_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,13 @@ func (c *Conn) appendFrames(now time.Time, space numberSpace, pnum packetNumber,
c.testSendPing.setSent(pnum)
}

// All stream-related frames. This should come last in the packet,
// so large amounts of STREAM data don't crowd out other frames
// we may need to send.
if !c.appendStreamFrames(&c.w, pnum, pto) {
return
}

// If this is a PTO probe and we haven't added an ack-eliciting frame yet,
// add a PING to make this an ack-eliciting probe.
//
Expand Down
215 changes: 215 additions & 0 deletions internal/quic/conn_streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build go1.21

package quic

import (
"context"
"sync"
"sync/atomic"
"time"
)

type streamsState struct {
queue queue[*Stream] // new, peer-created streams

streamsMu sync.Mutex
streams map[streamID]*Stream
opened [streamTypeCount]int64 // number of streams opened by us

// Streams with frames to send are stored in a circular linked list.
// sendHead is the next stream to write, or nil if there are no streams
// with data to send. sendTail is the last stream to write.
needSend atomic.Bool
sendMu sync.Mutex
sendHead *Stream
sendTail *Stream
}

func (c *Conn) streamsInit() {
c.streams.streams = make(map[streamID]*Stream)
c.streams.queue = newQueue[*Stream]()
}

// AcceptStream waits for and returns the next stream created by the peer.
func (c *Conn) AcceptStream(ctx context.Context) (*Stream, error) {
return c.streams.queue.get(ctx)
}

// NewStream creates a stream.
//
// If the peer's maximum stream limit for the connection has been reached,
// NewStream blocks until the limit is increased or the context expires.
func (c *Conn) NewStream(ctx context.Context) (*Stream, error) {
return c.newLocalStream(ctx, bidiStream)
}

// NewSendOnlyStream creates a unidirectional, send-only stream.
//
// If the peer's maximum stream limit for the connection has been reached,
// NewSendOnlyStream blocks until the limit is increased or the context expires.
func (c *Conn) NewSendOnlyStream(ctx context.Context) (*Stream, error) {
return c.newLocalStream(ctx, uniStream)
}

func (c *Conn) newLocalStream(ctx context.Context, typ streamType) (*Stream, error) {
// TODO: Stream limits.
c.streams.streamsMu.Lock()
defer c.streams.streamsMu.Unlock()

num := c.streams.opened[typ]
c.streams.opened[typ]++

s := newStream(c, newStreamID(c.side, typ, num))
c.streams.streams[s.id] = s
return s, nil
}

// streamFrameType identifies which direction of a stream,
// from the local perspective, a frame is associated with.
//
// For example, STREAM is a recvStream frame,
// because it carries data from the peer to us.
type streamFrameType uint8

const (
sendStream = streamFrameType(iota) // for example, MAX_DATA
recvStream // for example, STREAM_DATA_BLOCKED
)

// streamForID returns the stream with the given id.
// If the stream does not exist, it returns nil.
func (c *Conn) streamForID(id streamID) *Stream {
c.streams.streamsMu.Lock()
defer c.streams.streamsMu.Unlock()
return c.streams.streams[id]
}

// streamForFrame returns the stream with the given id.
// If the stream does not exist, it may be created.
//
// streamForFrame aborts the connection if the stream id, state, and frame type don't align.
// For example, it aborts the connection with a STREAM_STATE error if a MAX_DATA frame
// is received for a receive-only stream, or if the peer attempts to create a stream that
// should be originated locally.
//
// streamForFrame returns nil if the stream no longer exists or if an error occurred.
func (c *Conn) streamForFrame(now time.Time, id streamID, ftype streamFrameType) *Stream {
if id.streamType() == uniStream {
if (id.initiator() == c.side) != (ftype == sendStream) {
// Received an invalid frame for unidirectional stream.
// For example, a RESET_STREAM frame for a send-only stream.
c.abort(now, localTransportError(errStreamState))
return nil
}
}

c.streams.streamsMu.Lock()
defer c.streams.streamsMu.Unlock()
if s := c.streams.streams[id]; s != nil {
return s
}
// TODO: Check for closed streams, once we support closing streams.
if id.initiator() == c.side {
c.abort(now, localTransportError(errStreamState))
return nil
}
s := newStream(c, id)
c.streams.streams[id] = s
c.streams.queue.put(s)
return s
}

// queueStreamForSend marks a stream as containing frames that need sending.
func (c *Conn) queueStreamForSend(s *Stream) {
c.streams.sendMu.Lock()
defer c.streams.sendMu.Unlock()
if s.next != nil {
// Already in the queue.
return
}
if c.streams.sendHead == nil {
// The queue was empty.
c.streams.sendHead = s
c.streams.sendTail = s
s.next = s
} else {
// Insert this stream at the end of the queue.
c.streams.sendTail.next = s
c.streams.sendTail = s
}
c.streams.needSend.Store(true)
c.wake()
}

// appendStreamFrames writes stream-related frames to the current packet.
//
// It returns true if no more frames need appending,
// false if not everything fit in the current packet.
func (c *Conn) appendStreamFrames(w *packetWriter, pnum packetNumber, pto bool) bool {
if pto {
return c.appendStreamFramesPTO(w, pnum)
}
if !c.streams.needSend.Load() {
return true
}
c.streams.sendMu.Lock()
defer c.streams.sendMu.Unlock()
for {
s := c.streams.sendHead
const pto = false
if !s.appendInFrames(w, pnum, pto) {
return false
}
avail := w.avail()
if !s.appendOutFrames(w, pnum, pto) {
// We've sent some data for this stream, but it still has more to send.
// If the stream got a reasonable chance to put data in a packet,
// advance sendHead to the next stream in line, to avoid starvation.
// We'll come back to this stream after going through the others.
//
// If the packet was already mostly out of space, leave sendHead alone
// and come back to this stream again on the next packet.
if avail > 512 {
c.streams.sendHead = s.next
c.streams.sendTail = s
}
return false
}
s.next = nil
if s == c.streams.sendTail {
// This was the last stream.
c.streams.sendHead = nil
c.streams.sendTail = nil
c.streams.needSend.Store(false)
return true
}
// We've sent all data for this stream, so remove it from the list.
c.streams.sendTail.next = s.next
c.streams.sendHead = s.next
s.next = nil
}
}

// appendStreamFramesPTO writes stream-related frames to the current packet
// for a PTO probe.
//
// It returns true if no more frames need appending,
// false if not everything fit in the current packet.
func (c *Conn) appendStreamFramesPTO(w *packetWriter, pnum packetNumber) bool {
c.streams.sendMu.Lock()
defer c.streams.sendMu.Unlock()
for _, s := range c.streams.streams {
const pto = true
if !s.appendInFrames(w, pnum, pto) {
return false
}
if !s.appendOutFrames(w, pnum, pto) {
return false
}
}
return true
}
Loading

0 comments on commit 167593b

Please sign in to comment.