Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http3: remove Settingser, StreamCreator, return Connection from Hijacker #4425

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 3 additions & 46 deletions http3/body.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package http3
import (
"context"
"io"
"net"

"github.com/quic-go/quic-go"
)
Expand All @@ -17,33 +16,10 @@ type HTTPStreamer interface {
HTTPStream() Stream
}

type StreamCreator interface {
// Context returns a context that is cancelled when the underlying connection is closed.
Context() context.Context
OpenStream() (quic.Stream, error)
OpenStreamSync(context.Context) (quic.Stream, error)
OpenUniStream() (quic.SendStream, error)
OpenUniStreamSync(context.Context) (quic.SendStream, error)
LocalAddr() net.Addr
RemoteAddr() net.Addr
ConnectionState() quic.ConnectionState
}

var _ StreamCreator = quic.Connection(nil)

// A Hijacker allows hijacking of the stream creating part of a quic.Session from a http.Response.Body.
// It is used by WebTransport to create WebTransport streams after a session has been established.
type Hijacker interface {
StreamCreator() StreamCreator
}

// Settingser allows the server to retrieve the client's SETTINGS.
// The http.Request.Body implements this interface.
type Settingser interface {
// Settings returns the client's HTTP settings.
// It blocks until the SETTINGS frame has been received.
// Note that it is not guaranteed that this happens during the lifetime of the request.
Settings(context.Context) (*Settings, error)
Connection() Connection
}

// The body is used in the requestBody (for a http.Request) and the responseBody (for a http.Response).
Expand Down Expand Up @@ -83,7 +59,6 @@ type requestBody struct {
var (
_ io.ReadCloser = &requestBody{}
_ HTTPStreamer = &requestBody{}
_ Settingser = &requestBody{}
)

func newRequestBody(str Stream, connCtx context.Context, rcvdSettings <-chan struct{}, getSettings func() *Settings) *requestBody {
Expand All @@ -95,20 +70,8 @@ func newRequestBody(str Stream, connCtx context.Context, rcvdSettings <-chan str
}
}

func (r *requestBody) Settings(ctx context.Context) (*Settings, error) {
select {
case <-ctx.Done():
return nil, context.Cause(ctx)
case <-r.connCtx.Done():
return nil, context.Cause(r.connCtx)
case <-r.rcvdSettings:
return r.getSettings(), nil
}
}

type hijackableBody struct {
body body
conn quic.Connection // only needed to implement Hijacker

// only set for the http.Response
// The channel is closed when the user is done with this response:
Expand All @@ -117,16 +80,12 @@ type hijackableBody struct {
reqDoneClosed bool
}

var (
_ io.ReadCloser = &hijackableBody{}
_ Hijacker = &hijackableBody{}
)
var _ io.ReadCloser = &hijackableBody{}

func newResponseBody(str Stream, conn quic.Connection, done chan<- struct{}) *hijackableBody {
func newResponseBody(str Stream, done chan<- struct{}) *hijackableBody {
return &hijackableBody{
body: body{str: str},
reqDone: done,
conn: conn,
}
}

Expand Down Expand Up @@ -154,5 +113,3 @@ func (r *hijackableBody) Close() error {
r.body.str.CancelRead(quic.StreamErrorCode(ErrCodeRequestCanceled))
return nil
}

func (r *hijackableBody) StreamCreator() StreamCreator { return r.conn }
32 changes: 4 additions & 28 deletions http3/body_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package http3

import (
"context"
"errors"

"github.com/quic-go/quic-go"
Expand All @@ -12,29 +11,6 @@ import (
"go.uber.org/mock/gomock"
)

var _ = Describe("Request Body", func() {
It("makes the SETTINGS available", func() {
str := mockquic.NewMockStream(mockCtrl)
rcvdSettings := make(chan struct{})
close(rcvdSettings)
settings := &Settings{EnableExtendedConnect: true}
body := newRequestBody(str, context.Background(), rcvdSettings, func() *Settings { return settings })
s, err := body.Settings(context.Background())
Expect(err).ToNot(HaveOccurred())
Expect(s).To(Equal(settings))
})

It("unblocks Settings() when the connection is closed", func() {
str := mockquic.NewMockStream(mockCtrl)
ctx, cancel := context.WithCancelCause(context.Background())
testErr := errors.New("test error")
cancel(testErr)
body := newRequestBody(str, ctx, make(chan struct{}), func() *Settings { return nil })
_, err := body.Settings(context.Background())
Expect(err).To(MatchError(testErr))
})
})

var _ = Describe("Response Body", func() {
var reqDone chan struct{}

Expand All @@ -43,7 +19,7 @@ var _ = Describe("Response Body", func() {
It("closes the reqDone channel when Read errors", func() {
str := mockquic.NewMockStream(mockCtrl)
str.EXPECT().Read(gomock.Any()).Return(0, errors.New("test error"))
rb := newResponseBody(str, nil, reqDone)
rb := newResponseBody(str, reqDone)
_, err := rb.Read([]byte{0})
Expect(err).To(MatchError("test error"))
Expect(reqDone).To(BeClosed())
Expand All @@ -52,7 +28,7 @@ var _ = Describe("Response Body", func() {
It("allows multiple calls to Read, when Read errors", func() {
str := mockquic.NewMockStream(mockCtrl)
str.EXPECT().Read(gomock.Any()).Return(0, errors.New("test error")).Times(2)
rb := newResponseBody(str, nil, reqDone)
rb := newResponseBody(str, reqDone)
_, err := rb.Read([]byte{0})
Expect(err).To(HaveOccurred())
Expect(reqDone).To(BeClosed())
Expand All @@ -62,14 +38,14 @@ var _ = Describe("Response Body", func() {

It("closes responses", func() {
str := mockquic.NewMockStream(mockCtrl)
rb := newResponseBody(str, nil, reqDone)
rb := newResponseBody(str, reqDone)
str.EXPECT().CancelRead(quic.StreamErrorCode(ErrCodeRequestCanceled))
Expect(rb.Close()).To(Succeed())
})

It("allows multiple calls to Close", func() {
str := mockquic.NewMockStream(mockCtrl)
rb := newResponseBody(str, nil, reqDone)
rb := newResponseBody(str, reqDone)
str.EXPECT().CancelRead(quic.StreamErrorCode(ErrCodeRequestCanceled)).MaxTimes(2)
Expect(rb.Close()).To(Succeed())
Expect(reqDone).To(BeClosed())
Expand Down
8 changes: 8 additions & 0 deletions http3/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
"github.com/quic-go/quic-go/quicvarint"
)

type Connection interface {
quic.Connection
// ReceivedSettings returns a channel that is closed once the client's SETTINGS frame was received.
ReceivedSettings() <-chan struct{}
// Settings returns the settings received on this connection.
Settings() *Settings
}

type connection struct {
quic.Connection

Expand Down
2 changes: 1 addition & 1 deletion http3/http_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (s *requestStream) ReadResponse() (*http.Response, error) {
} else {
httpStr = s.stream
}
respBody := newResponseBody(httpStr, s.conn, s.reqDone)
respBody := newResponseBody(httpStr, s.reqDone)

// Rules for when to set Content-Length are defined in https://tools.ietf.org/html/rfc7230#section-3.3.2.
_, hasTransferEncoding := res.Header["Transfer-Encoding"]
Expand Down
6 changes: 3 additions & 3 deletions http3/response_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (hw *headerWriter) Write(p []byte) (int, error) {

type responseWriter struct {
*headerWriter
conn quic.Connection
conn Connection
bufferedStr *bufio.Writer
buf []byte

Expand All @@ -79,7 +79,7 @@ var (
_ Hijacker = &responseWriter{}
)

func newResponseWriter(str quic.Stream, conn quic.Connection, logger utils.Logger) *responseWriter {
func newResponseWriter(str quic.Stream, conn Connection, logger utils.Logger) *responseWriter {
hw := &headerWriter{
str: str,
header: http.Header{},
Expand Down Expand Up @@ -196,7 +196,7 @@ func (w *responseWriter) Flush() {
}
}

func (w *responseWriter) StreamCreator() StreamCreator {
func (w *responseWriter) Connection() Connection {
return w.conn
}

Expand Down
15 changes: 8 additions & 7 deletions integrationtests/self/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,13 +590,13 @@ var _ = Describe("HTTP tests", func() {
settingsChan := make(chan *http3.Settings, 1)
mux.HandleFunc("/settings", func(w http.ResponseWriter, r *http.Request) {
defer GinkgoRecover()
// The http.Request.Body is guaranteed to implement the http3.Settingser interface.
settings, err := r.Body.(http3.Settingser).Settings(context.Background())
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
conn := w.(http3.Hijacker).Connection()
select {
case <-conn.ReceivedSettings():
case <-time.After(5 * time.Second):
Fail("didn't receive SETTINGS")
}
settingsChan <- settings
settingsChan <- conn.Settings()
w.WriteHeader(http.StatusOK)
})

Expand All @@ -615,7 +615,8 @@ var _ = Describe("HTTP tests", func() {
_, err = rt.RoundTrip(req)
Expect(err).ToNot(HaveOccurred())
var settings *http3.Settings
Eventually(settingsChan).Should(Receive(&settings))
Expect(settingsChan).To(Receive(&settings))
Expect(settings).ToNot(BeNil())
Expect(settings.EnableDatagram).To(BeTrue())
Expect(settings.EnableExtendedConnect).To(BeFalse())
Expect(settings.Other).To(HaveKeyWithValue(uint64(1337), uint64(42)))
Expand Down
Loading