Skip to content

Commit

Permalink
http3: remove Settingser, StreamCreator, return Connection from Hijacker
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Apr 10, 2024
1 parent ee698b3 commit 9397f32
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 85 deletions.
49 changes: 3 additions & 46 deletions http3/body.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package http3
import (
"context"
"io"
"net"

"github.com/quic-go/quic-go"
)
Expand All @@ -17,33 +16,10 @@ type HTTPStreamer interface {
HTTPStream() Stream
}

type StreamCreator interface {
// Context returns a context that is cancelled when the underlying connection is closed.
Context() context.Context
OpenStream() (quic.Stream, error)
OpenStreamSync(context.Context) (quic.Stream, error)
OpenUniStream() (quic.SendStream, error)
OpenUniStreamSync(context.Context) (quic.SendStream, error)
LocalAddr() net.Addr
RemoteAddr() net.Addr
ConnectionState() quic.ConnectionState
}

var _ StreamCreator = quic.Connection(nil)

// A Hijacker allows hijacking of the stream creating part of a quic.Session from a http.Response.Body.
// It is used by WebTransport to create WebTransport streams after a session has been established.
type Hijacker interface {
StreamCreator() StreamCreator
}

// Settingser allows the server to retrieve the client's SETTINGS.
// The http.Request.Body implements this interface.
type Settingser interface {
// Settings returns the client's HTTP settings.
// It blocks until the SETTINGS frame has been received.
// Note that it is not guaranteed that this happens during the lifetime of the request.
Settings(context.Context) (*Settings, error)
Connection() Connection
}

// The body is used in the requestBody (for a http.Request) and the responseBody (for a http.Response).
Expand Down Expand Up @@ -83,7 +59,6 @@ type requestBody struct {
var (
_ io.ReadCloser = &requestBody{}
_ HTTPStreamer = &requestBody{}
_ Settingser = &requestBody{}
)

func newRequestBody(str Stream, connCtx context.Context, rcvdSettings <-chan struct{}, getSettings func() *Settings) *requestBody {
Expand All @@ -95,20 +70,8 @@ func newRequestBody(str Stream, connCtx context.Context, rcvdSettings <-chan str
}
}

func (r *requestBody) Settings(ctx context.Context) (*Settings, error) {
select {
case <-ctx.Done():
return nil, context.Cause(ctx)
case <-r.connCtx.Done():
return nil, context.Cause(r.connCtx)
case <-r.rcvdSettings:
return r.getSettings(), nil
}
}

type hijackableBody struct {
body body
conn quic.Connection // only needed to implement Hijacker

// only set for the http.Response
// The channel is closed when the user is done with this response:
Expand All @@ -117,16 +80,12 @@ type hijackableBody struct {
reqDoneClosed bool
}

var (
_ io.ReadCloser = &hijackableBody{}
_ Hijacker = &hijackableBody{}
)
var _ io.ReadCloser = &hijackableBody{}

func newResponseBody(str Stream, conn quic.Connection, done chan<- struct{}) *hijackableBody {
func newResponseBody(str Stream, done chan<- struct{}) *hijackableBody {
return &hijackableBody{
body: body{str: str},
reqDone: done,
conn: conn,
}
}

Expand Down Expand Up @@ -154,5 +113,3 @@ func (r *hijackableBody) Close() error {
r.body.str.CancelRead(quic.StreamErrorCode(ErrCodeRequestCanceled))
return nil
}

func (r *hijackableBody) StreamCreator() StreamCreator { return r.conn }
32 changes: 4 additions & 28 deletions http3/body_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package http3

import (
"context"
"errors"

"github.com/quic-go/quic-go"
Expand All @@ -12,29 +11,6 @@ import (
"go.uber.org/mock/gomock"
)

var _ = Describe("Request Body", func() {
It("makes the SETTINGS available", func() {
str := mockquic.NewMockStream(mockCtrl)
rcvdSettings := make(chan struct{})
close(rcvdSettings)
settings := &Settings{EnableExtendedConnect: true}
body := newRequestBody(str, context.Background(), rcvdSettings, func() *Settings { return settings })
s, err := body.Settings(context.Background())
Expect(err).ToNot(HaveOccurred())
Expect(s).To(Equal(settings))
})

It("unblocks Settings() when the connection is closed", func() {
str := mockquic.NewMockStream(mockCtrl)
ctx, cancel := context.WithCancelCause(context.Background())
testErr := errors.New("test error")
cancel(testErr)
body := newRequestBody(str, ctx, make(chan struct{}), func() *Settings { return nil })
_, err := body.Settings(context.Background())
Expect(err).To(MatchError(testErr))
})
})

var _ = Describe("Response Body", func() {
var reqDone chan struct{}

Expand All @@ -43,7 +19,7 @@ var _ = Describe("Response Body", func() {
It("closes the reqDone channel when Read errors", func() {
str := mockquic.NewMockStream(mockCtrl)
str.EXPECT().Read(gomock.Any()).Return(0, errors.New("test error"))
rb := newResponseBody(str, nil, reqDone)
rb := newResponseBody(str, reqDone)
_, err := rb.Read([]byte{0})
Expect(err).To(MatchError("test error"))
Expect(reqDone).To(BeClosed())
Expand All @@ -52,7 +28,7 @@ var _ = Describe("Response Body", func() {
It("allows multiple calls to Read, when Read errors", func() {
str := mockquic.NewMockStream(mockCtrl)
str.EXPECT().Read(gomock.Any()).Return(0, errors.New("test error")).Times(2)
rb := newResponseBody(str, nil, reqDone)
rb := newResponseBody(str, reqDone)
_, err := rb.Read([]byte{0})
Expect(err).To(HaveOccurred())
Expect(reqDone).To(BeClosed())
Expand All @@ -62,14 +38,14 @@ var _ = Describe("Response Body", func() {

It("closes responses", func() {
str := mockquic.NewMockStream(mockCtrl)
rb := newResponseBody(str, nil, reqDone)
rb := newResponseBody(str, reqDone)
str.EXPECT().CancelRead(quic.StreamErrorCode(ErrCodeRequestCanceled))
Expect(rb.Close()).To(Succeed())
})

It("allows multiple calls to Close", func() {
str := mockquic.NewMockStream(mockCtrl)
rb := newResponseBody(str, nil, reqDone)
rb := newResponseBody(str, reqDone)
str.EXPECT().CancelRead(quic.StreamErrorCode(ErrCodeRequestCanceled)).MaxTimes(2)
Expect(rb.Close()).To(Succeed())
Expect(reqDone).To(BeClosed())
Expand Down
8 changes: 8 additions & 0 deletions http3/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
"github.com/quic-go/quic-go/quicvarint"
)

type Connection interface {
quic.Connection
// ReceivedSettings returns a channel that is closed once the client's SETTINGS frame was received.
ReceivedSettings() <-chan struct{}
// Settings returns the settings received on this connection.
Settings() *Settings
}

type connection struct {
quic.Connection

Expand Down
2 changes: 1 addition & 1 deletion http3/http_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (s *requestStream) ReadResponse() (*http.Response, error) {
} else {
httpStr = s.stream
}
respBody := newResponseBody(httpStr, s.conn, s.reqDone)
respBody := newResponseBody(httpStr, s.reqDone)

// Rules for when to set Content-Length are defined in https://tools.ietf.org/html/rfc7230#section-3.3.2.
_, hasTransferEncoding := res.Header["Transfer-Encoding"]
Expand Down
6 changes: 3 additions & 3 deletions http3/response_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (hw *headerWriter) Write(p []byte) (int, error) {

type responseWriter struct {
*headerWriter
conn quic.Connection
conn Connection
bufferedStr *bufio.Writer
buf []byte

Expand All @@ -79,7 +79,7 @@ var (
_ Hijacker = &responseWriter{}
)

func newResponseWriter(str quic.Stream, conn quic.Connection, logger utils.Logger) *responseWriter {
func newResponseWriter(str quic.Stream, conn Connection, logger utils.Logger) *responseWriter {
hw := &headerWriter{
str: str,
header: http.Header{},
Expand Down Expand Up @@ -196,7 +196,7 @@ func (w *responseWriter) Flush() {
}
}

func (w *responseWriter) StreamCreator() StreamCreator {
func (w *responseWriter) Connection() Connection {
return w.conn
}

Expand Down
15 changes: 8 additions & 7 deletions integrationtests/self/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,13 +590,13 @@ var _ = Describe("HTTP tests", func() {
settingsChan := make(chan *http3.Settings, 1)
mux.HandleFunc("/settings", func(w http.ResponseWriter, r *http.Request) {
defer GinkgoRecover()
// The http.Request.Body is guaranteed to implement the http3.Settingser interface.
settings, err := r.Body.(http3.Settingser).Settings(context.Background())
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
conn := w.(http3.Hijacker).Connection()
select {
case <-conn.ReceivedSettings():
case <-time.After(5 * time.Second):
Fail("didn't receive SETTINGS")
}
settingsChan <- settings
settingsChan <- conn.Settings()
w.WriteHeader(http.StatusOK)
})

Expand All @@ -615,7 +615,8 @@ var _ = Describe("HTTP tests", func() {
_, err = rt.RoundTrip(req)
Expect(err).ToNot(HaveOccurred())
var settings *http3.Settings
Eventually(settingsChan).Should(Receive(&settings))
Expect(settingsChan).To(Receive(&settings))
Expect(settings).ToNot(BeNil())
Expect(settings.EnableDatagram).To(BeTrue())
Expect(settings.EnableExtendedConnect).To(BeFalse())
Expect(settings.Other).To(HaveKeyWithValue(uint64(1337), uint64(42)))
Expand Down

0 comments on commit 9397f32

Please sign in to comment.