Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http3: refactor the client's and server's unidirectional stream handling #4387

Merged
merged 1 commit into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
http3: refactor the client's and server's unidirectional stream handling
The logic is almost identical, so it makes sense to refactor it into
a shared implemenation.
  • Loading branch information
marten-seemann committed Mar 23, 2024
commit 89b0c8d7514292df54cd2ad98a4a805cf3cb23d1
104 changes: 19 additions & 85 deletions http3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ type client struct {
dialer dialFunc
handshakeErr error

receivedSettings chan struct{} // closed once the server's SETTINGS frame was processed
settings *Settings // set once receivedSettings is closed
hconn *connection

requestWriter *requestWriter

Expand Down Expand Up @@ -115,15 +114,14 @@ func newClient(hostname string, tlsConf *tls.Config, opts *roundTripperOpts, con
tlsConf.NextProtos = []string{versionToALPN(conf.Versions[0])}

return &client{
hostname: authorityAddr("https", hostname),
tlsConf: tlsConf,
requestWriter: newRequestWriter(logger),
receivedSettings: make(chan struct{}),
decoder: qpack.NewDecoder(func(hf qpack.HeaderField) {}),
config: conf,
opts: opts,
dialer: dialer,
logger: logger,
hostname: authorityAddr("https", hostname),
tlsConf: tlsConf,
requestWriter: newRequestWriter(logger),
decoder: qpack.NewDecoder(func(hf qpack.HeaderField) {}),
config: conf,
opts: opts,
dialer: dialer,
logger: logger,
}, nil
}

Expand Down Expand Up @@ -151,7 +149,14 @@ func (c *client) dial(ctx context.Context) error {
if c.opts.StreamHijacker != nil {
go c.handleBidirectionalStreams(conn)
}
go c.handleUnidirectionalStreams(conn)
c.hconn = newConnection(
conn,
c.opts.EnableDatagram,
c.opts.UniStreamHijacker,
protocol.PerspectiveClient,
c.logger,
)
go c.hconn.HandleUnidirectionalStreams()
return nil
}

Expand Down Expand Up @@ -191,77 +196,6 @@ func (c *client) handleBidirectionalStreams(conn quic.EarlyConnection) {
}
}

func (c *client) handleUnidirectionalStreams(conn quic.EarlyConnection) {
var rcvdControlStream atomic.Bool

for {
str, err := conn.AcceptUniStream(context.Background())
if err != nil {
c.logger.Debugf("accepting unidirectional stream failed: %s", err)
return
}

go func(str quic.ReceiveStream) {
streamType, err := quicvarint.Read(quicvarint.NewReader(str))
if err != nil {
if c.opts.UniStreamHijacker != nil && c.opts.UniStreamHijacker(StreamType(streamType), conn, str, err) {
return
}
c.logger.Debugf("reading stream type on stream %d failed: %s", str.StreamID(), err)
return
}
// We're only interested in the control stream here.
switch streamType {
case streamTypeControlStream:
case streamTypeQPACKEncoderStream, streamTypeQPACKDecoderStream:
// Our QPACK implementation doesn't use the dynamic table yet.
// TODO: check that only one stream of each type is opened.
return
case streamTypePushStream:
// We never increased the Push ID, so we don't expect any push streams.
conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeIDError), "")
return
default:
if c.opts.UniStreamHijacker != nil && c.opts.UniStreamHijacker(StreamType(streamType), conn, str, nil) {
return
}
str.CancelRead(quic.StreamErrorCode(ErrCodeStreamCreationError))
return
}
// Only a single control stream is allowed.
if isFirstControlStr := rcvdControlStream.CompareAndSwap(false, true); !isFirstControlStr {
conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeStreamCreationError), "duplicate control stream")
return
}
f, err := parseNextFrame(str, nil)
if err != nil {
conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameError), "")
return
}
sf, ok := f.(*settingsFrame)
if !ok {
conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeMissingSettings), "")
return
}
c.settings = &Settings{
EnableDatagram: sf.Datagram,
EnableExtendedConnect: sf.ExtendedConnect,
Other: sf.Other,
}
close(c.receivedSettings)
if !sf.Datagram {
return
}
// If datagram support was enabled on our side as well as on the server side,
// we can expect it to have been negotiated both on the transport and on the HTTP/3 layer.
// Note: ConnectionState() will block until the handshake is complete (relevant when using 0-RTT).
if c.opts.EnableDatagram && !conn.ConnectionState().SupportsDatagrams {
conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeSettingsError), "missing QUIC Datagram support")
}
}(str)
}
}

func (c *client) Close() error {
conn := c.conn.Load()
if conn == nil {
Expand Down Expand Up @@ -326,11 +260,11 @@ func (c *client) roundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Respon
if opt.CheckSettings != nil {
// wait for the server's SETTINGS frame to arrive
select {
case <-c.receivedSettings:
case <-c.hconn.ReceivedSettings():
case <-conn.Context().Done():
return nil, context.Cause(conn.Context())
}
if err := opt.CheckSettings(*c.settings); err != nil {
if err := opt.CheckSettings(*c.hconn.Settings()); err != nil {
return nil, err
}
}
Expand Down
Loading
Loading