Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support http1 full duplex per workload #14568

Merged
merged 10 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
single call
  • Loading branch information
skonto committed Jan 15, 2024
commit 5b21d8e1a4c3773e4381b7795dfb89b7b8a0a4b8
18 changes: 11 additions & 7 deletions cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -209,7 +210,7 @@ func main() {
// Create activation handler chain
// Note: innermost handlers are specified first, ie. the last handler in the chain will be executed first
ah := activatorhandler.New(ctx, throttler, transport, networkConfig.EnableMeshPodAddressability, logger, tlsEnabled)
ah = wrapActivatorHandlerWithFullDuplex(handler.NewTimeoutHandler(ah, "activator request timeout", func(r *http.Request) (time.Duration, time.Duration, time.Duration) {
ah = handler.NewTimeoutHandler(ah, "activator request timeout", func(r *http.Request) (time.Duration, time.Duration, time.Duration) {
if rev := activatorhandler.RevisionFrom(r.Context()); rev != nil {
var responseStartTimeout = 0 * time.Second
if rev.Spec.ResponseStartTimeoutSeconds != nil {
Expand All @@ -224,7 +225,7 @@ func main() {
return apiconfig.DefaultRevisionTimeoutSeconds * time.Second,
apiconfig.DefaultRevisionResponseStartTimeoutSeconds * time.Second,
apiconfig.DefaultRevisionIdleTimeoutSeconds * time.Second
}))
})
ah = concurrencyReporter.Handler(ah)
ah = activatorhandler.NewTracingHandler(ah)
reqLogHandler, err := pkghttp.NewRequestLogHandler(ah, logging.NewSyncFileWriter(os.Stdout), "",
Expand All @@ -236,18 +237,19 @@ func main() {

// NOTE: MetricHandler is being used as the outermost handler of the meaty bits. We're not interested in measuring
// the healthchecks or probes.
ah = wrapActivatorHandlerWithFullDuplex(activatorhandler.NewMetricHandler(env.PodName, ah))
ah = activatorhandler.NewMetricHandler(env.PodName, ah)
ah = activatorhandler.NewContextHandler(ctx, ah, configStore)

// Network probe handlers.
ah = &activatorhandler.ProbeHandler{NextHandler: ah}
ah = netprobe.NewHandler(ah)

// Set up our health check based on the health of stat sink and environmental factors.
sigCtx := signals.NewContext()
hc := newHealthCheck(sigCtx, logger, statSink)
ah = &activatorhandler.HealthHandler{HealthCheck: hc, NextHandler: ah, Logger: logger}

ah = wrapActivatorHandlerWithFullDuplex(ah, logger)

profilingHandler := profiling.NewHandler(logger, false)
// Watch the logging config map and dynamically update logging levels.
configMapWatcher.Watch(pkglogging.ConfigMapName(), pkglogging.UpdateLevelFromConfigMap(logger, atomicLevel, component))
Expand Down Expand Up @@ -340,12 +342,14 @@ func flush(logger *zap.SugaredLogger) {
metrics.FlushExporter()
}

func wrapActivatorHandlerWithFullDuplex(h http.Handler) http.HandlerFunc {
func wrapActivatorHandlerWithFullDuplex(h http.Handler, logger *zap.SugaredLogger) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
revEnableHTTP1FullDuplex := activatorhandler.RevAnnotations(r.Context(), apiconfig.AllowHTTPFullDuplexFeatureKey) == "Enabled"
revEnableHTTP1FullDuplex := strings.EqualFold(activatorhandler.RevAnnotation(r.Context(), apiconfig.AllowHTTPFullDuplexFeatureKey), "Enabled")
if revEnableHTTP1FullDuplex {
rc := http.NewResponseController(w)
_ = rc.EnableFullDuplex()
if err := rc.EnableFullDuplex(); err != nil {
logger.Errorw("Unable to enable full duplex", zap.Error(err))
}
}
h.ServeHTTP(w, r)
})
Expand Down
8 changes: 6 additions & 2 deletions pkg/activator/handler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ func RevIDFrom(ctx context.Context) types.NamespacedName {
return ctx.Value(revCtxKey{}).(*revCtx).revID
}

func RevAnnotations(ctx context.Context, annotation string) string {
rev := ctx.Value(revCtxKey{}).(*revCtx).revision
func RevAnnotation(ctx context.Context, annotation string) string {
v := ctx.Value(revCtxKey{})
if v == nil {
return ""
}
rev := v.(*revCtx).revision
if rev != nil && rev.GetAnnotations() != nil {
return rev.GetAnnotations()[annotation]
}
Expand Down
25 changes: 4 additions & 21 deletions pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"knative.dev/pkg/tracing/propagation/tracecontextb3"
"knative.dev/serving/pkg/activator"
activatorconfig "knative.dev/serving/pkg/activator/config"
apicfg "knative.dev/serving/pkg/apis/config"
pkghttp "knative.dev/serving/pkg/http"
"knative.dev/serving/pkg/networking"
"knative.dev/serving/pkg/queue"
Expand Down Expand Up @@ -87,15 +86,14 @@ func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

revID := RevIDFrom(r.Context())
revEnableHTTP1FullDuplex := RevAnnotations(r.Context(), apicfg.AllowHTTPFullDuplexFeatureKey) == "Enabled"

if err := a.throttler.Try(tryContext, revID, func(dest string) error {
trySpan.End()

proxyCtx, proxySpan := r.Context(), (*trace.Span)(nil)
if tracingEnabled {
proxyCtx, proxySpan = trace.StartSpan(r.Context(), "activator_proxy")
}
a.proxyRequest(revID, revEnableHTTP1FullDuplex, w, r.WithContext(proxyCtx), dest, tracingEnabled, a.usePassthroughLb)
a.proxyRequest(revID, w, r.WithContext(proxyCtx), dest, tracingEnabled, a.usePassthroughLb)
proxySpan.End()

return nil
Expand All @@ -114,7 +112,7 @@ func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

func (a *activationHandler) proxyRequest(revID types.NamespacedName, enableHTTP1FullDuplex bool, w http.ResponseWriter,
func (a *activationHandler) proxyRequest(revID types.NamespacedName, w http.ResponseWriter,
r *http.Request, target string, tracingEnabled bool, usePassthroughLb bool) {
netheader.RewriteHostIn(r)
r.Header.Set(netheader.ProxyKey, activator.Name)
Expand All @@ -126,27 +124,12 @@ func (a *activationHandler) proxyRequest(revID types.NamespacedName, enableHTTP1
}

var proxy *httputil.ReverseProxy
var proxyHandler http.Handler
if a.tls {
proxy = pkghttp.NewHeaderPruningReverseProxy(useSecurePort(target), hostOverride, activator.RevisionHeaders, true /* uss HTTPS */)
} else {
proxy = pkghttp.NewHeaderPruningReverseProxy(target, hostOverride, activator.RevisionHeaders, false /* use HTTPS */)
}

if enableHTTP1FullDuplex {
proxyWithFullDuplex := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rc := http.NewResponseController(w)
_ = rc.EnableFullDuplex()
proxy.ServeHTTP(w, r)
})
tmpHandler := struct{ http.Handler }{}
tmpHandler.Handler = proxyWithFullDuplex
proxyHandler = tmpHandler

} else {
proxyHandler = proxy
}

proxy.BufferPool = a.bufferPool
proxy.Transport = a.transport
if tracingEnabled {
Expand All @@ -157,7 +140,7 @@ func (a *activationHandler) proxyRequest(revID types.NamespacedName, enableHTTP1
pkghandler.Error(a.logger.With(zap.String(logkey.Key, revID.String())))(w, req, err)
}

proxyHandler.ServeHTTP(w, r)
proxy.ServeHTTP(w, r)
}

// useSecurePort replaces the default port with HTTPS port (8112).
Expand Down
4 changes: 4 additions & 0 deletions pkg/http/handler/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ type timeoutWriter struct {
lastWriteTime time.Time
}

func (tw *timeoutWriter) Unwrap() http.ResponseWriter {
return tw.w
}

var _ http.Flusher = (*timeoutWriter)(nil)
var _ http.ResponseWriter = (*timeoutWriter)(nil)

Expand Down
4 changes: 4 additions & 0 deletions pkg/http/response_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func NewResponseRecorder(w http.ResponseWriter, responseCode int) *ResponseRecor
}
}

func (rr *ResponseRecorder) Unwrap() http.ResponseWriter {
return rr.writer
}

// Flush flushes the buffer to the client.
func (rr *ResponseRecorder) Flush() {
rr.writer.(http.Flusher).Flush()
Expand Down
32 changes: 11 additions & 21 deletions pkg/queue/sharedmain/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,6 @@ func mainHandler(
target := net.JoinHostPort("127.0.0.1", env.UserPort)

httpProxy := pkghttp.NewHeaderPruningReverseProxy(target, pkghttp.NoHostOverride, activator.RevisionHeaders, false /* use HTTP */)
var proxyHandler http.Handler
if env.EnableHTTPFullDuplex {
proxyWithFullDuplex := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rc := http.NewResponseController(w)
_ = rc.EnableFullDuplex()
httpProxy.ServeHTTP(w, r)
})
tmpHandler := struct{ http.Handler }{}
tmpHandler.Handler = proxyWithFullDuplex
proxyHandler = tmpHandler
} else {
proxyHandler = httpProxy
}

httpProxy.Transport = transport
httpProxy.ErrorHandler = pkghandler.Error(logger)
httpProxy.BufferPool = netproxy.NewBufferPool()
Expand All @@ -79,25 +65,27 @@ func mainHandler(
}
// Create queue handler chain.
// Note: innermost handlers are specified first, ie. the last handler in the chain will be executed first.
composedHandler := proxyHandler
var composedHandler http.Handler = httpProxy

metricsSupported := supportsMetrics(ctx, logger, env)
if metricsSupported {
composedHandler = wrapQPHandlerWithFullDuplex(requestAppMetricsHandler(logger, composedHandler, breaker, env), env.EnableHTTPFullDuplex)
composedHandler = requestAppMetricsHandler(logger, composedHandler, breaker, env)
}
composedHandler = queue.ProxyHandler(breaker, stats, false, composedHandler)
composedHandler = queue.ForwardedShimHandler(composedHandler)
composedHandler = wrapQPHandlerWithFullDuplex(handler.NewTimeoutHandler(composedHandler, "request timeout", func(r *http.Request) (time.Duration, time.Duration, time.Duration) {
composedHandler = handler.NewTimeoutHandler(composedHandler, "request timeout", func(r *http.Request) (time.Duration, time.Duration, time.Duration) {
return timeout, responseStartTimeout, idleTimeout
}), env.EnableHTTPFullDuplex)
})

if metricsSupported {
composedHandler = wrapQPHandlerWithFullDuplex(requestMetricsHandler(logger, composedHandler, env), env.EnableHTTPFullDuplex)
composedHandler = requestMetricsHandler(logger, composedHandler, env)
}
if tracingEnabled {
composedHandler = tracing.HTTPSpanMiddleware(composedHandler)
}

composedHandler = wrapQPHandlerWithFullDuplex(composedHandler, env.EnableHTTPFullDuplex, logger)

drainer := &pkghandler.Drainer{
QuietPeriod: drainSleepDuration,
// Add Activator probe header to the drainer so it can handle probes directly from activator
Expand Down Expand Up @@ -139,11 +127,13 @@ func adminHandler(ctx context.Context, logger *zap.SugaredLogger, drainer *pkgha
return mux
}

func wrapQPHandlerWithFullDuplex(h http.Handler, enableFullDuplex bool) http.HandlerFunc {
func wrapQPHandlerWithFullDuplex(h http.Handler, enableFullDuplex bool, logger *zap.SugaredLogger) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if enableFullDuplex {
rc := http.NewResponseController(w)
_ = rc.EnableFullDuplex()
if err := rc.EnableFullDuplex(); err != nil {
logger.Errorw("Unable to enable full duplex", zap.Error(err))
}
}
h.ServeHTTP(w, r)
})
Expand Down