Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support http1 full duplex per workload #14568

Merged
merged 10 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,13 @@ func main() {
// NOTE: MetricHandler is being used as the outermost handler of the meaty bits. We're not interested in measuring
// the healthchecks or probes.
ah = activatorhandler.NewMetricHandler(env.PodName, ah)
// We need the context handler to run first so ctx gets the revision info.
Copy link
Contributor Author

@skonto skonto Nov 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine as the handler that consumes the body is not run yet. The error we try to eliminate is btw:

2023/11/25 00:07:42 httputil: ReverseProxy read error during body copy: read tcp 10.244.0.32:37488->10.244.0.31:8012: use of closed network connection

Even without the activator wrapper the error at the activator side happens vary rarely, unlike the QP side.

ah = activatorhandler.WrapActivatorHandlerWithFullDuplex(ah, logger)
ah = activatorhandler.NewContextHandler(ctx, ah, configStore)

// Network probe handlers.
ah = &activatorhandler.ProbeHandler{NextHandler: ah}
ah = netprobe.NewHandler(ah)

// Set up our health check based on the health of stat sink and environmental factors.
sigCtx := signals.NewContext()
hc := newHealthCheck(sigCtx, logger, statSink)
Expand Down
12 changes: 12 additions & 0 deletions pkg/activator/handler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,15 @@ func RevisionFrom(ctx context.Context) *v1.Revision {
func RevIDFrom(ctx context.Context) types.NamespacedName {
return ctx.Value(revCtxKey{}).(*revCtx).revID
}

func RevAnnotation(ctx context.Context, annotation string) string {
v := ctx.Value(revCtxKey{})
if v == nil {
return ""
}
rev := v.(*revCtx).revision
if rev != nil && rev.GetAnnotations() != nil {
return rev.GetAnnotations()[annotation]
}
return ""
}
14 changes: 14 additions & 0 deletions pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"knative.dev/pkg/tracing/propagation/tracecontextb3"
"knative.dev/serving/pkg/activator"
activatorconfig "knative.dev/serving/pkg/activator/config"
apiconfig "knative.dev/serving/pkg/apis/config"
pkghttp "knative.dev/serving/pkg/http"
"knative.dev/serving/pkg/networking"
"knative.dev/serving/pkg/queue"
Expand Down Expand Up @@ -150,3 +151,16 @@ func useSecurePort(target string) string {
target = strings.Split(target, ":")[0]
return target + ":" + strconv.Itoa(networking.BackendHTTPSPort)
}

func WrapActivatorHandlerWithFullDuplex(h http.Handler, logger *zap.SugaredLogger) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
revEnableHTTP1FullDuplex := strings.EqualFold(RevAnnotation(r.Context(), apiconfig.AllowHTTPFullDuplexFeatureKey), "Enabled")
if revEnableHTTP1FullDuplex {
rc := http.NewResponseController(w)
if err := rc.EnableFullDuplex(); err != nil {
logger.Errorw("Unable to enable full duplex", zap.Error(err))
}
}
h.ServeHTTP(w, r)
})
}
164 changes: 164 additions & 0 deletions pkg/activator/handler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@ package handler

import (
"bytes"
"fmt"
"io"
"log"
"net/http"
"net/http/httptest"
"net/url"
"runtime"
"sync"
"testing"

netprobe "knative.dev/networking/pkg/http/probe"
"knative.dev/pkg/logging"
pkgnet "knative.dev/pkg/network"
rtesting "knative.dev/pkg/reconciler/testing"
"knative.dev/serving/pkg/activator"
apiconfig "knative.dev/serving/pkg/apis/config"
asmetrics "knative.dev/serving/pkg/autoscaler/metrics"
pkghttp "knative.dev/serving/pkg/http"
)
Expand Down Expand Up @@ -114,3 +120,161 @@ func BenchmarkHandlerChain(b *testing.B) {
})
})
}

// TestActivatorChainHandlerWithFullDuplex tests activator's chain handler with the new http1 full duplex support against the issue
// https://github.com/golang/go/issues/40747, where reverse proxy failed with read errors.
// The test uses the reproducer in https://github.com/golang/go/issues/40747#issuecomment-733552061.
// We enable full duplex by setting the annotation `features.knative.dev/http-full-duplex` at the revision level.
func TestActivatorChainHandlerWithFullDuplex(t *testing.T) {
if runtime.GOOS == "darwin" {
t.Skip("Testing this on Mac requires to loosen some restrictions, see https://github.com/knative/serving/pull/14568#issuecomment-1893151202 for more")
}

ctx, cancel, _ := rtesting.SetupFakeContextWithCancel(t)
rev := revision(testNamespace, testRevName)
defer reset()
rev.Annotations = map[string]string{apiconfig.AllowHTTPFullDuplexFeatureKey: "Enabled"}
t.Cleanup(cancel)

logger := logging.FromContext(ctx)
configStore := setupConfigStore(t, logger)
revisionInformer(ctx, rev)

// Buffer equal to the activator.
statCh := make(chan []asmetrics.StatMessage)
concurrencyReporter := NewConcurrencyReporter(ctx, activatorPodName, statCh)
go concurrencyReporter.Run(ctx.Done())

// Just read and ignore all stat messages.
go func() {
for {
select {
case <-statCh:
case <-ctx.Done():
return
}
}
}()

// The server responding with the sent body.
echoServer := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, req *http.Request) {
body, err := io.ReadAll(req.Body)
if err != nil {
log.Printf("error reading body: %v", err)
http.Error(w, fmt.Sprintf("error reading body: %v", err), http.StatusInternalServerError)
return
}

if _, err := w.Write(body); err != nil {
log.Printf("error writing body: %v", err)
}
},
))
defer echoServer.Close()

// The server proxying requests to the echo server.
echoURL, err := url.Parse(echoServer.URL)
if err != nil {
t.Fatalf("Failed to parse echo URL: %v", err)
}

proxy := pkghttp.NewHeaderPruningReverseProxy(echoURL.Host, "", []string{}, false)
proxy.FlushInterval = 0
proxyWithMiddleware := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
proxy.ServeHTTP(w, r)
})
var ah http.Handler
ah = concurrencyReporter.Handler(proxyWithMiddleware)
ah = NewTracingHandler(ah)
ah, _ = pkghttp.NewRequestLogHandler(ah, io.Discard, "", nil, false)
ah = NewMetricHandler(activatorPodName, ah)
ah = WrapActivatorHandlerWithFullDuplex(ah, logger)
ah = NewContextHandler(ctx, ah, configStore)
ah = &ProbeHandler{NextHandler: ah}
ah = netprobe.NewHandler(ah)
ah = &HealthHandler{HealthCheck: func() error { return nil }, NextHandler: ah, Logger: logger}

bodySize := 32 * 1024
parallelism := 32

proxyServer := httptest.NewServer(ah)

defer proxyServer.Close()

transport := http.DefaultTransport.(*http.Transport).Clone()

// Turning on this will hide the issue
transport.DisableKeepAlives = false
c := &http.Client{
Transport: transport,
}

body := make([]byte, bodySize)
for i := 0; i < cap(body); i++ {
body[i] = 42
}

for i := 0; i < 10; i++ {
var wg sync.WaitGroup
wg.Add(parallelism)
for i := 0; i < parallelism; i++ {
go func(i int) {
defer wg.Done()

for i := 0; i < 1000; i++ {
if err := send(c, proxyServer.URL, body, "test-host"); err != nil {
t.Errorf("error during request: %v", err)
}
}
}(i)
}

wg.Wait()
}

}

func send(client *http.Client, url string, body []byte, rHost string) error {
r := bytes.NewBuffer(body)
req, err := http.NewRequest("POST", url, r)

if rHost != "" {
req.Host = rHost
}

req.Header.Set(activator.RevisionHeaderNamespace, testNamespace)
req.Header.Set(activator.RevisionHeaderName, testRevName)

if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to execute request: %w", err)
}
defer resp.Body.Close()

bd := io.Reader(resp.Body)

rec, err := io.ReadAll(bd)

if err != nil {
return fmt.Errorf("failed to read body: %w", err)
}

if _, err = io.Copy(io.Discard, resp.Body); err != nil {
return fmt.Errorf("failed to discard body: %w", err)
}

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

if len(rec) != len(body) {
return fmt.Errorf("unexpected body length: %d", len(rec))
}

return nil
}
3 changes: 3 additions & 0 deletions pkg/apis/config/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ const (

// DryRunFeatureKey gates the podspec dryrun feature and runs with the value 'enabled'
DryRunFeatureKey = "features.knative.dev/podspec-dryrun"

// AllowHTTPFullDuplexFeatureKey gates the use of http1 full duplex per workload
AllowHTTPFullDuplexFeatureKey = "features.knative.dev/http-full-duplex"
)

func defaultFeaturesConfig() *Features {
Expand Down
5 changes: 5 additions & 0 deletions pkg/http/handler/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ type timeoutWriter struct {
var _ http.Flusher = (*timeoutWriter)(nil)
var _ http.ResponseWriter = (*timeoutWriter)(nil)

// Unwrap returns the underlying writer
func (tw *timeoutWriter) Unwrap() http.ResponseWriter {
return tw.w
}

func (tw *timeoutWriter) Flush() {
// The inner handler of timeoutHandler can call Flush at any time including after
// timeoutHandler.ServeHTTP has returned. Forwarding this call to the inner
Expand Down
5 changes: 5 additions & 0 deletions pkg/http/response_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func NewResponseRecorder(w http.ResponseWriter, responseCode int) *ResponseRecor
}
}

// Unwrap returns the underlying writer
func (rr *ResponseRecorder) Unwrap() http.ResponseWriter {
return rr.writer
}

// Flush flushes the buffer to the client.
func (rr *ResponseRecorder) Flush() {
rr.writer.(http.Flusher).Flush()
Expand Down
15 changes: 15 additions & 0 deletions pkg/queue/sharedmain/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func mainHandler(
composedHandler = tracing.HTTPSpanMiddleware(composedHandler)
}

composedHandler = withFullDuplex(composedHandler, env.EnableHTTPFullDuplex, logger)

drainer := &pkghandler.Drainer{
QuietPeriod: drainSleepDuration,
// Add Activator probe header to the drainer so it can handle probes directly from activator
Expand Down Expand Up @@ -124,3 +126,16 @@ func adminHandler(ctx context.Context, logger *zap.SugaredLogger, drainer *pkgha

return mux
}

func withFullDuplex(h http.Handler, enableFullDuplex bool, logger *zap.SugaredLogger) http.Handler {
if !enableFullDuplex {
return h
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rc := http.NewResponseController(w)
if err := rc.EnableFullDuplex(); err != nil {
logger.Errorw("Unable to enable full duplex", zap.Error(err))
}
h.ServeHTTP(w, r)
})
}
12 changes: 7 additions & 5 deletions pkg/queue/sharedmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,13 @@ type config struct {
QueueServingTLSPort string `split_words:"true" required:"true"`
UserPort string `split_words:"true" required:"true"`
RevisionTimeoutSeconds int `split_words:"true" required:"true"`
RevisionResponseStartTimeoutSeconds int `split_words:"true"` // optional
RevisionIdleTimeoutSeconds int `split_words:"true"` // optional
ServingReadinessProbe string `split_words:"true"` // optional
EnableProfiling bool `split_words:"true"` // optional
EnableHTTP2AutoDetection bool `envconfig:"ENABLE_HTTP2_AUTO_DETECTION"` // optional
RevisionResponseStartTimeoutSeconds int `split_words:"true"` // optional
RevisionIdleTimeoutSeconds int `split_words:"true"` // optional
ServingReadinessProbe string `split_words:"true"` // optional
EnableProfiling bool `split_words:"true"` // optional
// See https://github.com/knative/serving/issues/12387
EnableHTTPFullDuplex bool `split_words:"true"` // optional
EnableHTTP2AutoDetection bool `envconfig:"ENABLE_HTTP2_AUTO_DETECTION"` // optional

// Logging configuration
ServingLoggingConfig string `split_words:"true" required:"true"`
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/revision/resources/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ var (
}, {
Name: "ENABLE_HTTP2_AUTO_DETECTION",
Value: "false",
}, {
Name: "ENABLE_HTTP_FULL_DUPLEX",
Value: "false",
}, {
Name: "ROOT_CA",
Value: "",
Expand Down
6 changes: 6 additions & 0 deletions pkg/reconciler/revision/resources/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math"
"strconv"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -303,6 +304,8 @@ func makeQueueContainer(rev *v1.Revision, cfg *config.Config) (*corev1.Container
}
}

fullDuplexFeature, fullDuplexExists := rev.Annotations[apicfg.AllowHTTPFullDuplexFeatureKey]

useQPResourceDefaults := cfg.Features.QueueProxyResourceDefaults == apicfg.Enabled
c := &corev1.Container{
Name: QueueContainerName,
Expand Down Expand Up @@ -418,6 +421,9 @@ func makeQueueContainer(rev *v1.Revision, cfg *config.Config) (*corev1.Container
}, {
Name: "ENABLE_HTTP2_AUTO_DETECTION",
Value: strconv.FormatBool(cfg.Features.AutoDetectHTTP2 == apicfg.Enabled),
}, {
Name: "ENABLE_HTTP_FULL_DUPLEX",
Value: strconv.FormatBool(fullDuplexExists && strings.EqualFold(fullDuplexFeature, string(apicfg.Enabled))),
}, {
Name: "ROOT_CA",
Value: cfg.Deployment.QueueSidecarRootCA,
Expand Down
11 changes: 11 additions & 0 deletions pkg/reconciler/revision/resources/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,16 @@ func TestMakeQueueContainer(t *testing.T) {
"ENABLE_HTTP2_AUTO_DETECTION": "true",
})
}),
}, {
name: "HTTP1 full duplex enabled",
rev: revision("bar", "foo",
withContainers(containers),
WithRevisionAnnotations(map[string]string{apicfg.AllowHTTPFullDuplexFeatureKey: string(apicfg.Enabled)})),
want: queueContainer(func(c *corev1.Container) {
c.Env = env(map[string]string{
"ENABLE_HTTP_FULL_DUPLEX": "true",
})
}),
}, {
name: "set root ca",
rev: revision("bar", "foo",
Expand Down Expand Up @@ -1041,6 +1051,7 @@ func TestTCPProbeGeneration(t *testing.T) {
var defaultEnv = map[string]string{
"CONTAINER_CONCURRENCY": "0",
"ENABLE_HTTP2_AUTO_DETECTION": "false",
"ENABLE_HTTP_FULL_DUPLEX": "false",
"ENABLE_PROFILING": "false",
"METRICS_DOMAIN": metrics.Domain(),
"METRICS_COLLECTOR_ADDRESS": "",
Expand Down
Loading