Skip to content

Commit

Permalink
http3: refactor the client's and server's unidirectional stream handl…
Browse files Browse the repository at this point in the history
…ing (#4387)

The logic is almost identical, so it makes sense to refactor it into
a shared implementation.
  • Loading branch information
marten-seemann committed Mar 24, 2024
1 parent 49b9965 commit 268208f
Show file tree
Hide file tree
Showing 6 changed files with 440 additions and 584 deletions.
104 changes: 19 additions & 85 deletions http3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ type client struct {
dialer dialFunc
handshakeErr error

receivedSettings chan struct{} // closed once the server's SETTINGS frame was processed
settings *Settings // set once receivedSettings is closed
hconn *connection

requestWriter *requestWriter

Expand Down Expand Up @@ -115,15 +114,14 @@ func newClient(hostname string, tlsConf *tls.Config, opts *roundTripperOpts, con
tlsConf.NextProtos = []string{versionToALPN(conf.Versions[0])}

return &client{
hostname: authorityAddr("https", hostname),
tlsConf: tlsConf,
requestWriter: newRequestWriter(logger),
receivedSettings: make(chan struct{}),
decoder: qpack.NewDecoder(func(hf qpack.HeaderField) {}),
config: conf,
opts: opts,
dialer: dialer,
logger: logger,
hostname: authorityAddr("https", hostname),
tlsConf: tlsConf,
requestWriter: newRequestWriter(logger),
decoder: qpack.NewDecoder(func(hf qpack.HeaderField) {}),
config: conf,
opts: opts,
dialer: dialer,
logger: logger,
}, nil
}

Expand Down Expand Up @@ -151,7 +149,14 @@ func (c *client) dial(ctx context.Context) error {
if c.opts.StreamHijacker != nil {
go c.handleBidirectionalStreams(conn)
}
go c.handleUnidirectionalStreams(conn)
c.hconn = newConnection(
conn,
c.opts.EnableDatagram,
c.opts.UniStreamHijacker,
protocol.PerspectiveClient,
c.logger,
)
go c.hconn.HandleUnidirectionalStreams()
return nil
}

Expand Down Expand Up @@ -191,77 +196,6 @@ func (c *client) handleBidirectionalStreams(conn quic.EarlyConnection) {
}
}

func (c *client) handleUnidirectionalStreams(conn quic.EarlyConnection) {
var rcvdControlStream atomic.Bool

for {
str, err := conn.AcceptUniStream(context.Background())
if err != nil {
c.logger.Debugf("accepting unidirectional stream failed: %s", err)
return
}

go func(str quic.ReceiveStream) {
streamType, err := quicvarint.Read(quicvarint.NewReader(str))
if err != nil {
if c.opts.UniStreamHijacker != nil && c.opts.UniStreamHijacker(StreamType(streamType), conn, str, err) {
return
}
c.logger.Debugf("reading stream type on stream %d failed: %s", str.StreamID(), err)
return
}
// We're only interested in the control stream here.
switch streamType {
case streamTypeControlStream:
case streamTypeQPACKEncoderStream, streamTypeQPACKDecoderStream:
// Our QPACK implementation doesn't use the dynamic table yet.
// TODO: check that only one stream of each type is opened.
return
case streamTypePushStream:
// We never increased the Push ID, so we don't expect any push streams.
conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeIDError), "")
return
default:
if c.opts.UniStreamHijacker != nil && c.opts.UniStreamHijacker(StreamType(streamType), conn, str, nil) {
return
}
str.CancelRead(quic.StreamErrorCode(ErrCodeStreamCreationError))
return
}
// Only a single control stream is allowed.
if isFirstControlStr := rcvdControlStream.CompareAndSwap(false, true); !isFirstControlStr {
conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeStreamCreationError), "duplicate control stream")
return
}
f, err := parseNextFrame(str, nil)
if err != nil {
conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameError), "")
return
}
sf, ok := f.(*settingsFrame)
if !ok {
conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeMissingSettings), "")
return
}
c.settings = &Settings{
EnableDatagram: sf.Datagram,
EnableExtendedConnect: sf.ExtendedConnect,
Other: sf.Other,
}
close(c.receivedSettings)
if !sf.Datagram {
return
}
// If datagram support was enabled on our side as well as on the server side,
// we can expect it to have been negotiated both on the transport and on the HTTP/3 layer.
// Note: ConnectionState() will block until the handshake is complete (relevant when using 0-RTT).
if c.opts.EnableDatagram && !conn.ConnectionState().SupportsDatagrams {
conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeSettingsError), "missing QUIC Datagram support")
}
}(str)
}
}

func (c *client) Close() error {
conn := c.conn.Load()
if conn == nil {
Expand Down Expand Up @@ -326,11 +260,11 @@ func (c *client) roundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Respon
if opt.CheckSettings != nil {
// wait for the server's SETTINGS frame to arrive
select {
case <-c.receivedSettings:
case <-c.hconn.ReceivedSettings():
case <-conn.Context().Done():
return nil, context.Cause(conn.Context())
}
if err := opt.CheckSettings(*c.settings); err != nil {
if err := opt.CheckSettings(*c.hconn.Settings()); err != nil {
return nil, err
}
}
Expand Down
Loading

0 comments on commit 268208f

Please sign in to comment.