From 167593b38cf631be267ebcd8d612b7c58138d8c4 Mon Sep 17 00:00:00 2001 From: Damien Neil Date: Wed, 26 Jul 2023 11:46:09 -0400 Subject: [PATCH] quic: create and accept streams Add minimal API surface for creating streams, basic loop for sending stream-related frames. No limits, data, or lifetime management yet. RFC 9000, Sections 2 and 3. For golang/go#58547 Change-Id: I2c167b9363d0121b8a8776309d165b0f47f8f090 Reviewed-on: https://go-review.googlesource.com/c/net/+/514115 Reviewed-by: Jonathan Amsterdam TryBot-Result: Gopher Robot Run-TryBot: Damien Neil --- internal/quic/conn.go | 17 ++- internal/quic/conn_loss.go | 10 ++ internal/quic/conn_loss_test.go | 32 +++++ internal/quic/conn_recv.go | 15 +- internal/quic/conn_send.go | 7 + internal/quic/conn_streams.go | 215 +++++++++++++++++++++++++++++ internal/quic/conn_streams_test.go | 144 +++++++++++++++++++ internal/quic/conn_test.go | 11 ++ internal/quic/packet_parser.go | 3 + internal/quic/quic.go | 1 + internal/quic/stream.go | 151 ++++++++++++++++++++ internal/quic/stream_test.go | 33 +++++ 12 files changed, 637 insertions(+), 2 deletions(-) create mode 100644 internal/quic/conn_streams.go create mode 100644 internal/quic/conn_streams_test.go create mode 100644 internal/quic/stream.go create mode 100644 internal/quic/stream_test.go diff --git a/internal/quic/conn.go b/internal/quic/conn.go index 77ecea0d6..5601b989e 100644 --- a/internal/quic/conn.go +++ b/internal/quic/conn.go @@ -32,6 +32,7 @@ type Conn struct { acks [numberSpaceCount]ackState // indexed by number space connIDState connIDState loss lossState + streams streamsState // errForPeer is set when the connection is being closed. errForPeer error @@ -105,6 +106,7 @@ func newConn(now time.Time, side connSide, initialConnID []byte, peerAddr netip. // TODO: PMTU discovery. const maxDatagramSize = 1200 c.loss.init(c.side, maxDatagramSize, now) + c.streamsInit() c.startTLS(now, initialConnID, transportParameters{ initialSrcConnID: c.connIDState.srcConnID(), @@ -178,7 +180,10 @@ func (c *Conn) receiveTransportParameters(p transportParameters) error { return nil } -type timerEvent struct{} +type ( + timerEvent struct{} + wakeEvent struct{} +) // loop is the connection main loop. // @@ -250,6 +255,8 @@ func (c *Conn) loop(now time.Time) { return } c.loss.advance(now, c.handleAckOrLoss) + case wakeEvent: + // We're being woken up to try sending some frames. case func(time.Time, *Conn): // Send a func to msgc to run it on the main Conn goroutine m(now, c) @@ -269,6 +276,14 @@ func (c *Conn) sendMsg(m any) { } } +// wake wakes up the conn's loop. +func (c *Conn) wake() { + select { + case c.msgc <- wakeEvent{}: + default: + } +} + // runOnLoop executes a function within the conn's loop goroutine. func (c *Conn) runOnLoop(f func(now time.Time, c *Conn)) error { donec := make(chan struct{}) diff --git a/internal/quic/conn_loss.go b/internal/quic/conn_loss.go index 57570d086..ca178089d 100644 --- a/internal/quic/conn_loss.go +++ b/internal/quic/conn_loss.go @@ -44,6 +44,16 @@ func (c *Conn) handleAckOrLoss(space numberSpace, sent *sentPacket, fate packetF case frameTypeCrypto: start, end := sent.nextRange() c.crypto[space].ackOrLoss(start, end, fate) + case frameTypeStreamBase, + frameTypeStreamBase | streamFinBit: + id := streamID(sent.nextInt()) + start, end := sent.nextRange() + s := c.streamForID(id) + if s == nil { + continue + } + fin := f&streamFinBit != 0 + s.ackOrLossData(sent.num, start, end, fin, fate) case frameTypeNewConnectionID: seq := int64(sent.nextInt()) c.connIDState.ackOrLossNewConnectionID(sent.num, seq, fate) diff --git a/internal/quic/conn_loss_test.go b/internal/quic/conn_loss_test.go index 021c86c87..3c9e6149a 100644 --- a/internal/quic/conn_loss_test.go +++ b/internal/quic/conn_loss_test.go @@ -106,6 +106,38 @@ func TestLostCRYPTOFrame(t *testing.T) { }) } +func TestLostStreamFrameEmpty(t *testing.T) { + // A STREAM frame opening a stream, but containing no stream data, should + // be retransmitted if lost. + lostFrameTest(t, func(t *testing.T, pto bool) { + ctx := canceledContext() + tc := newTestConn(t, clientSide, func(p *transportParameters) { + p.initialMaxStreamDataBidiRemote = 100 + }) + tc.handshake() + tc.ignoreFrame(frameTypeAck) + + c, err := tc.conn.NewStream(ctx) + if err != nil { + t.Fatalf("NewStream: %v", err) + } + c.Write(nil) // open the stream + tc.wantFrame("created bidirectional stream 0", + packetType1RTT, debugFrameStream{ + id: newStreamID(clientSide, bidiStream, 0), + data: []byte{}, + }) + + tc.triggerLossOrPTO(packetType1RTT, pto) + tc.wantFrame("resent stream frame", + packetType1RTT, debugFrameStream{ + id: newStreamID(clientSide, bidiStream, 0), + data: []byte{}, + }) + }) + +} + func TestLostNewConnectionIDFrame(t *testing.T) { // "New connection IDs are [...] retransmitted if the packet containing them is lost." // https://www.rfc-editor.org/rfc/rfc9000#section-13.3-3.13 diff --git a/internal/quic/conn_recv.go b/internal/quic/conn_recv.go index 7992a619f..45ef3844e 100644 --- a/internal/quic/conn_recv.go +++ b/internal/quic/conn_recv.go @@ -181,7 +181,7 @@ func (c *Conn) handleFrames(now time.Time, ptype packetType, space numberSpace, if !frameOK(c, ptype, __01) { return } - _, _, _, _, n = consumeStreamFrame(payload) + n = c.handleStreamFrame(now, space, payload) case frameTypeMaxData: if !frameOK(c, ptype, __01) { return @@ -290,6 +290,19 @@ func (c *Conn) handleCryptoFrame(now time.Time, space numberSpace, payload []byt return n } +func (c *Conn) handleStreamFrame(now time.Time, space numberSpace, payload []byte) int { + id, off, fin, b, n := consumeStreamFrame(payload) + if n < 0 { + return -1 + } + if s := c.streamForFrame(now, id, recvStream); s != nil { + if err := s.handleData(off, b, fin); err != nil { + c.abort(now, err) + } + } + return n +} + func (c *Conn) handleNewConnectionIDFrame(now time.Time, space numberSpace, payload []byte) int { seq, retire, connID, resetToken, n := consumeNewConnectionIDFrame(payload) if n < 0 { diff --git a/internal/quic/conn_send.go b/internal/quic/conn_send.go index d410548a9..6e6fbc585 100644 --- a/internal/quic/conn_send.go +++ b/internal/quic/conn_send.go @@ -254,6 +254,13 @@ func (c *Conn) appendFrames(now time.Time, space numberSpace, pnum packetNumber, c.testSendPing.setSent(pnum) } + // All stream-related frames. This should come last in the packet, + // so large amounts of STREAM data don't crowd out other frames + // we may need to send. + if !c.appendStreamFrames(&c.w, pnum, pto) { + return + } + // If this is a PTO probe and we haven't added an ack-eliciting frame yet, // add a PING to make this an ack-eliciting probe. // diff --git a/internal/quic/conn_streams.go b/internal/quic/conn_streams.go new file mode 100644 index 000000000..82e902860 --- /dev/null +++ b/internal/quic/conn_streams.go @@ -0,0 +1,215 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.21 + +package quic + +import ( + "context" + "sync" + "sync/atomic" + "time" +) + +type streamsState struct { + queue queue[*Stream] // new, peer-created streams + + streamsMu sync.Mutex + streams map[streamID]*Stream + opened [streamTypeCount]int64 // number of streams opened by us + + // Streams with frames to send are stored in a circular linked list. + // sendHead is the next stream to write, or nil if there are no streams + // with data to send. sendTail is the last stream to write. + needSend atomic.Bool + sendMu sync.Mutex + sendHead *Stream + sendTail *Stream +} + +func (c *Conn) streamsInit() { + c.streams.streams = make(map[streamID]*Stream) + c.streams.queue = newQueue[*Stream]() +} + +// AcceptStream waits for and returns the next stream created by the peer. +func (c *Conn) AcceptStream(ctx context.Context) (*Stream, error) { + return c.streams.queue.get(ctx) +} + +// NewStream creates a stream. +// +// If the peer's maximum stream limit for the connection has been reached, +// NewStream blocks until the limit is increased or the context expires. +func (c *Conn) NewStream(ctx context.Context) (*Stream, error) { + return c.newLocalStream(ctx, bidiStream) +} + +// NewSendOnlyStream creates a unidirectional, send-only stream. +// +// If the peer's maximum stream limit for the connection has been reached, +// NewSendOnlyStream blocks until the limit is increased or the context expires. +func (c *Conn) NewSendOnlyStream(ctx context.Context) (*Stream, error) { + return c.newLocalStream(ctx, uniStream) +} + +func (c *Conn) newLocalStream(ctx context.Context, typ streamType) (*Stream, error) { + // TODO: Stream limits. + c.streams.streamsMu.Lock() + defer c.streams.streamsMu.Unlock() + + num := c.streams.opened[typ] + c.streams.opened[typ]++ + + s := newStream(c, newStreamID(c.side, typ, num)) + c.streams.streams[s.id] = s + return s, nil +} + +// streamFrameType identifies which direction of a stream, +// from the local perspective, a frame is associated with. +// +// For example, STREAM is a recvStream frame, +// because it carries data from the peer to us. +type streamFrameType uint8 + +const ( + sendStream = streamFrameType(iota) // for example, MAX_DATA + recvStream // for example, STREAM_DATA_BLOCKED +) + +// streamForID returns the stream with the given id. +// If the stream does not exist, it returns nil. +func (c *Conn) streamForID(id streamID) *Stream { + c.streams.streamsMu.Lock() + defer c.streams.streamsMu.Unlock() + return c.streams.streams[id] +} + +// streamForFrame returns the stream with the given id. +// If the stream does not exist, it may be created. +// +// streamForFrame aborts the connection if the stream id, state, and frame type don't align. +// For example, it aborts the connection with a STREAM_STATE error if a MAX_DATA frame +// is received for a receive-only stream, or if the peer attempts to create a stream that +// should be originated locally. +// +// streamForFrame returns nil if the stream no longer exists or if an error occurred. +func (c *Conn) streamForFrame(now time.Time, id streamID, ftype streamFrameType) *Stream { + if id.streamType() == uniStream { + if (id.initiator() == c.side) != (ftype == sendStream) { + // Received an invalid frame for unidirectional stream. + // For example, a RESET_STREAM frame for a send-only stream. + c.abort(now, localTransportError(errStreamState)) + return nil + } + } + + c.streams.streamsMu.Lock() + defer c.streams.streamsMu.Unlock() + if s := c.streams.streams[id]; s != nil { + return s + } + // TODO: Check for closed streams, once we support closing streams. + if id.initiator() == c.side { + c.abort(now, localTransportError(errStreamState)) + return nil + } + s := newStream(c, id) + c.streams.streams[id] = s + c.streams.queue.put(s) + return s +} + +// queueStreamForSend marks a stream as containing frames that need sending. +func (c *Conn) queueStreamForSend(s *Stream) { + c.streams.sendMu.Lock() + defer c.streams.sendMu.Unlock() + if s.next != nil { + // Already in the queue. + return + } + if c.streams.sendHead == nil { + // The queue was empty. + c.streams.sendHead = s + c.streams.sendTail = s + s.next = s + } else { + // Insert this stream at the end of the queue. + c.streams.sendTail.next = s + c.streams.sendTail = s + } + c.streams.needSend.Store(true) + c.wake() +} + +// appendStreamFrames writes stream-related frames to the current packet. +// +// It returns true if no more frames need appending, +// false if not everything fit in the current packet. +func (c *Conn) appendStreamFrames(w *packetWriter, pnum packetNumber, pto bool) bool { + if pto { + return c.appendStreamFramesPTO(w, pnum) + } + if !c.streams.needSend.Load() { + return true + } + c.streams.sendMu.Lock() + defer c.streams.sendMu.Unlock() + for { + s := c.streams.sendHead + const pto = false + if !s.appendInFrames(w, pnum, pto) { + return false + } + avail := w.avail() + if !s.appendOutFrames(w, pnum, pto) { + // We've sent some data for this stream, but it still has more to send. + // If the stream got a reasonable chance to put data in a packet, + // advance sendHead to the next stream in line, to avoid starvation. + // We'll come back to this stream after going through the others. + // + // If the packet was already mostly out of space, leave sendHead alone + // and come back to this stream again on the next packet. + if avail > 512 { + c.streams.sendHead = s.next + c.streams.sendTail = s + } + return false + } + s.next = nil + if s == c.streams.sendTail { + // This was the last stream. + c.streams.sendHead = nil + c.streams.sendTail = nil + c.streams.needSend.Store(false) + return true + } + // We've sent all data for this stream, so remove it from the list. + c.streams.sendTail.next = s.next + c.streams.sendHead = s.next + s.next = nil + } +} + +// appendStreamFramesPTO writes stream-related frames to the current packet +// for a PTO probe. +// +// It returns true if no more frames need appending, +// false if not everything fit in the current packet. +func (c *Conn) appendStreamFramesPTO(w *packetWriter, pnum packetNumber) bool { + c.streams.sendMu.Lock() + defer c.streams.sendMu.Unlock() + for _, s := range c.streams.streams { + const pto = true + if !s.appendInFrames(w, pnum, pto) { + return false + } + if !s.appendOutFrames(w, pnum, pto) { + return false + } + } + return true +} diff --git a/internal/quic/conn_streams_test.go b/internal/quic/conn_streams_test.go new file mode 100644 index 000000000..8481a604c --- /dev/null +++ b/internal/quic/conn_streams_test.go @@ -0,0 +1,144 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.21 + +package quic + +import ( + "context" + "testing" +) + +func TestStreamsCreate(t *testing.T) { + ctx := canceledContext() + tc := newTestConn(t, clientSide, func(p *transportParameters) { + p.initialMaxStreamDataBidiLocal = 100 + p.initialMaxStreamDataBidiRemote = 100 + }) + tc.handshake() + + c, err := tc.conn.NewStream(ctx) + if err != nil { + t.Fatalf("NewStream: %v", err) + } + c.Write(nil) // open the stream + tc.wantFrame("created bidirectional stream 0", + packetType1RTT, debugFrameStream{ + id: 0, // client-initiated, bidi, number 0 + data: []byte{}, + }) + + c, err = tc.conn.NewSendOnlyStream(ctx) + if err != nil { + t.Fatalf("NewStream: %v", err) + } + c.Write(nil) // open the stream + tc.wantFrame("created unidirectional stream 0", + packetType1RTT, debugFrameStream{ + id: 2, // client-initiated, uni, number 0 + data: []byte{}, + }) + + c, err = tc.conn.NewStream(ctx) + if err != nil { + t.Fatalf("NewStream: %v", err) + } + c.Write(nil) // open the stream + tc.wantFrame("created bidirectional stream 1", + packetType1RTT, debugFrameStream{ + id: 4, // client-initiated, uni, number 4 + data: []byte{}, + }) +} + +func TestStreamsAccept(t *testing.T) { + ctx := canceledContext() + tc := newTestConn(t, serverSide) + tc.handshake() + + tc.writeFrames(packetType1RTT, + debugFrameStream{ + id: 0, // client-initiated, bidi, number 0 + }, + debugFrameStream{ + id: 2, // client-initiated, uni, number 0 + }, + debugFrameStream{ + id: 4, // client-initiated, bidi, number 1 + }) + + for _, accept := range []struct { + id streamID + readOnly bool + }{ + {0, false}, + {2, true}, + {4, false}, + } { + s, err := tc.conn.AcceptStream(ctx) + if err != nil { + t.Fatalf("conn.AcceptStream() = %v, want stream %v", err, accept.id) + } + if got, want := s.id, accept.id; got != want { + t.Fatalf("conn.AcceptStream() = stream %v, want %v", got, want) + } + if got, want := s.IsReadOnly(), accept.readOnly; got != want { + t.Fatalf("stream %v: s.IsReadOnly() = %v, want %v", accept.id, got, want) + } + } + + _, err := tc.conn.AcceptStream(ctx) + if err != context.Canceled { + t.Fatalf("conn.AcceptStream() = %v, want context.Canceled", err) + } +} + +func TestStreamsStreamNotCreated(t *testing.T) { + // "An endpoint MUST terminate the connection with error STREAM_STATE_ERROR + // if it receives a STREAM frame for a locally initiated stream that has + // not yet been created [...]" + // https://www.rfc-editor.org/rfc/rfc9000.html#section-19.8-3 + tc := newTestConn(t, serverSide) + tc.handshake() + + tc.writeFrames(packetType1RTT, + debugFrameStream{ + id: 1, // server-initiated, bidi, number 0 + }) + tc.wantFrame("peer sent STREAM frame for an uncreated local stream", + packetType1RTT, debugFrameConnectionCloseTransport{ + code: errStreamState, + }) +} + +func TestStreamsStreamSendOnly(t *testing.T) { + // "An endpoint MUST terminate the connection with error STREAM_STATE_ERROR + // if it receives a STREAM frame for a locally initiated stream that has + // not yet been created [...]" + // https://www.rfc-editor.org/rfc/rfc9000.html#section-19.8-3 + ctx := canceledContext() + tc := newTestConn(t, serverSide) + tc.handshake() + + c, err := tc.conn.NewSendOnlyStream(ctx) + if err != nil { + t.Fatalf("NewStream: %v", err) + } + c.Write(nil) // open the stream + tc.wantFrame("created unidirectional stream 0", + packetType1RTT, debugFrameStream{ + id: 3, // server-initiated, uni, number 0 + data: []byte{}, + }) + + tc.writeFrames(packetType1RTT, + debugFrameStream{ + id: 3, // server-initiated, bidi, number 0 + }) + tc.wantFrame("peer sent STREAM frame for a send-only stream", + packetType1RTT, debugFrameConnectionCloseTransport{ + code: errStreamState, + }) +} diff --git a/internal/quic/conn_test.go b/internal/quic/conn_test.go index 317ca8f81..1fe1e7b84 100644 --- a/internal/quic/conn_test.go +++ b/internal/quic/conn_test.go @@ -725,3 +725,14 @@ func (tc *testConnListener) sendDatagram(p []byte, addr netip.AddrPort) error { tc.sentDatagrams = append(tc.sentDatagrams, append([]byte(nil), p...)) return nil } + +// canceledContext returns a canceled Context. +// +// Functions which take a context preference progress over cancelation. +// For example, a read with a canceled context will return data if any is available. +// Tests use canceled contexts to perform non-blocking operations. +func canceledContext() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx +} diff --git a/internal/quic/packet_parser.go b/internal/quic/packet_parser.go index 052007897..9a00da756 100644 --- a/internal/quic/packet_parser.go +++ b/internal/quic/packet_parser.go @@ -330,6 +330,9 @@ func consumeStreamFrame(b []byte) (id streamID, off int64, fin bool, data []byte data = b[n:] n += len(data) } + if off+int64(len(data)) >= 1<<62 { + return 0, 0, false, nil, -1 + } return streamID(idInt), off, fin, data, n } diff --git a/internal/quic/quic.go b/internal/quic/quic.go index 84ce2bda1..8cd61aed0 100644 --- a/internal/quic/quic.go +++ b/internal/quic/quic.go @@ -112,6 +112,7 @@ type streamType uint8 const ( bidiStream = streamType(iota) uniStream + streamTypeCount ) func (s streamType) String() string { diff --git a/internal/quic/stream.go b/internal/quic/stream.go new file mode 100644 index 000000000..b55f927e0 --- /dev/null +++ b/internal/quic/stream.go @@ -0,0 +1,151 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.21 + +package quic + +import ( + "context" + "errors" +) + +type Stream struct { + id streamID + conn *Conn + + // outgate's lock guards all send-related state. + // + // The gate condition is set if a write to the stream will not block, + // either because the stream has available flow control or because + // the write will fail. + outgate gate + outopened sentVal // set if we should open the stream + + prev, next *Stream // guarded by streamsState.sendMu +} + +func newStream(c *Conn, id streamID) *Stream { + s := &Stream{ + conn: c, + id: id, + outgate: newGate(), + } + + // Lock and unlock outgate to update the stream writability state. + s.outgate.lock() + s.outUnlock() + + return s +} + +// IsReadOnly reports whether the stream is read-only +// (a unidirectional stream created by the peer). +func (s *Stream) IsReadOnly() bool { + return s.id.streamType() == uniStream && s.id.initiator() != s.conn.side +} + +// IsWriteOnly reports whether the stream is write-only +// (a unidirectional stream created locally). +func (s *Stream) IsWriteOnly() bool { + return s.id.streamType() == uniStream && s.id.initiator() == s.conn.side +} + +// Read reads data from the stream. +// See ReadContext for more details. +func (s *Stream) Read(b []byte) (n int, err error) { + return s.ReadContext(context.Background(), b) +} + +// ReadContext reads data from the stream. +// +// ReadContext returns as soon as at least one byte of data is available. +// +// If the peer closes the stream cleanly, ReadContext returns io.EOF after +// returning all data sent by the peer. +// If the peer terminates reads abruptly, ReadContext returns StreamResetError. +func (s *Stream) ReadContext(ctx context.Context, b []byte) (n int, err error) { + // TODO: implement + return 0, errors.New("unimplemented") +} + +// Write writes data to the stream. +// See WriteContext for more details. +func (s *Stream) Write(b []byte) (n int, err error) { + return s.WriteContext(context.Background(), b) +} + +// WriteContext writes data to the stream. +// +// WriteContext writes data to the stream write buffer. +// Buffered data is only sent when the buffer is sufficiently full. +// Call the Flush method to ensure buffered data is sent. +// +// If the peer aborts reads on the stream, ReadContext returns StreamResetError. +func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error) { + if s.IsReadOnly() { + return 0, errors.New("write to read-only stream") + } + if len(b) > 0 { + // TODO: implement + return 0, errors.New("unimplemented") + } + if err := s.outgate.waitAndLockContext(ctx); err != nil { + return 0, err + } + defer s.outUnlock() + + // Set outopened to send a STREAM frame with no data, + // opening the stream on the peer. + s.outopened.set() + + return n, nil +} + +// outUnlock unlocks s.outgate. +// It sets the gate condition if writes to s will not block. +// If s has frames to write, it notifies the Conn. +func (s *Stream) outUnlock() { + if s.outopened.shouldSend() { + s.conn.queueStreamForSend(s) + } + canSend := true // TODO: set sendability status based on flow control + s.outgate.unlock(canSend) +} + +// handleData handles data received in a STREAM frame. +func (s *Stream) handleData(off int64, b []byte, fin bool) error { + // TODO + return nil +} + +// ackOrLossData handles the fate of a STREAM frame. +func (s *Stream) ackOrLossData(pnum packetNumber, start, end int64, fin bool, fate packetFate) { + s.outgate.lock() + defer s.outUnlock() + s.outopened.ackOrLoss(pnum, fate) +} + +func (s *Stream) appendInFrames(w *packetWriter, pnum packetNumber, pto bool) bool { + // TODO: STOP_SENDING + // TODO: MAX_STREAM_DATA + return true +} + +func (s *Stream) appendOutFrames(w *packetWriter, pnum packetNumber, pto bool) bool { + // TODO: RESET_STREAM + // TODO: STREAM_DATA_BLOCKED + // TODO: STREAM frames with data + if s.outopened.shouldSendPTO(pto) { + off := int64(0) + size := 0 + fin := false + _, added := w.appendStreamFrame(s.id, off, size, fin) + if !added { + return false + } + s.outopened.setSent(pnum) + } + return true +} diff --git a/internal/quic/stream_test.go b/internal/quic/stream_test.go new file mode 100644 index 000000000..8ae9dbc82 --- /dev/null +++ b/internal/quic/stream_test.go @@ -0,0 +1,33 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.21 + +package quic + +import ( + "reflect" + "testing" +) + +func TestStreamOffsetTooLarge(t *testing.T) { + // "Receipt of a frame that exceeds [2^62-1] MUST be treated as a + // connection error of type FRAME_ENCODING_ERROR or FLOW_CONTROL_ERROR." + // https://www.rfc-editor.org/rfc/rfc9000.html#section-19.8-9 + tc := newTestConn(t, serverSide) + tc.handshake() + + tc.writeFrames(packetType1RTT, + debugFrameStream{ + id: newStreamID(clientSide, bidiStream, 0), + off: 1<<62 - 1, + data: []byte{0}, + }) + got, _ := tc.readFrame() + want1 := debugFrameConnectionCloseTransport{code: errFrameEncoding} + want2 := debugFrameConnectionCloseTransport{code: errFlowControl} + if !reflect.DeepEqual(got, want1) && !reflect.DeepEqual(got, want2) { + t.Fatalf("STREAM offset exceeds 2^62-1\ngot: %v\nwant: %v\n or: %v", got, want1, want2) + } +}