Skip to content

Commit

Permalink
fix unexpected EOF when compression is enabled (#834)
Browse files Browse the repository at this point in the history
* fix unexpected EOF when compression is enabled

* improve log

* adjust log levels
  • Loading branch information
localvar committed Oct 21, 2022
1 parent c42e220 commit 39255e3
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 18 deletions.
1 change: 1 addition & 0 deletions pkg/filters/proxy/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (c *compression) compress(req *http.Request, resp *http.Response) bool {
return false
}

resp.ContentLength = -1
resp.Header.Del(keyContentLength)
resp.Header.Set(keyContentEncoding, "gzip")
resp.Header.Add(keyVary, keyContentEncoding)
Expand Down
14 changes: 7 additions & 7 deletions pkg/filters/proxy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (sp *ServerPool) handleMirror(spCtx *serverPoolContext) {

err := spCtx.prepareRequest(svr, spCtx.req.Context(), true)
if err != nil {
logger.Debugf("%s: failed to prepare request: %v", sp.name, err)
logger.Errorf("%s: failed to prepare request: %v", sp.name, err)
return
}

Expand Down Expand Up @@ -377,7 +377,7 @@ func (sp *ServerPool) handle(ctx *context.Context, mirror bool) string {
// CircuitBreaker is the most outside resiliencer, if the error
// is ErrShortCircuited, we are sure the response is nil.
if err == resilience.ErrShortCircuited {
logger.Debugf("%s: short circuited by circuit break policy", sp.name)
logger.Errorf("%s: short circuited by circuit break policy", sp.name)
spCtx.AddTag("short circuited")
sp.buildFailureResponse(spCtx, http.StatusServiceUnavailable)
return resultShortCircuited
Expand All @@ -401,21 +401,21 @@ func (sp *ServerPool) doHandle(stdctx stdcontext.Context, spCtx *serverPoolConte

// if there's no available server.
if svr == nil {
logger.Debugf("%s: no available server", sp.name)
logger.Errorf("%s: no available server", sp.name)
return serverPoolError{http.StatusServiceUnavailable, resultInternalError}
}

// prepare the request to send.
statResult := &gohttpstat.Result{}
stdctx = gohttpstat.WithHTTPStat(stdctx, statResult)
if err := spCtx.prepareRequest(svr, stdctx, false); err != nil {
logger.Debugf("%s: failed to prepare request: %v", sp.name, err)
logger.Errorf("%s: failed to prepare request: %v", sp.name, err)
return serverPoolError{http.StatusInternalServerError, resultInternalError}
}

resp, err := fnSendRequest(spCtx.stdReq, sp.proxy.client)
if err != nil {
logger.Debugf("%s: failed to send request: %v", sp.name, err)
logger.Errorf("%s: failed to send request: %v", sp.name, err)

statResult.End(fasttime.Now())
spCtx.LazyAddTag(func() string {
Expand Down Expand Up @@ -481,7 +481,7 @@ func (sp *ServerPool) buildResponse(spCtx *serverPoolContext) (err error) {

resp, err := httpprot.NewResponse(spCtx.stdResp)
if err != nil {
logger.Debugf("%s: NewResponse returns an error: %v", sp.name, err)
logger.Errorf("%s: NewResponse returns an error: %v", sp.name, err)
body.Close()
return err
}
Expand All @@ -491,7 +491,7 @@ func (sp *ServerPool) buildResponse(spCtx *serverPoolContext) (err error) {
maxBodySize = sp.proxy.spec.ServerMaxBodySize
}
if err = resp.FetchPayload(maxBodySize); err != nil {
logger.Debugf("%s: failed to fetch response payload: %v", sp.name, err)
logger.Errorf("%s: failed to fetch response payload: %v", sp.name, err)
body.Close()
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/filters/proxy/wspool.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,15 @@ func (sp *WebSocketServerPool) handle(ctx *context.Context) (result string) {

// if there's no available server.
if svr == nil {
logger.Debugf("%s: no available server", sp.name)
logger.Errorf("%s: no available server", sp.name)
sp.buildFailureResponse(ctx, http.StatusServiceUnavailable)
metric.StatusCode = http.StatusServiceUnavailable
return resultInternalError
}

stdw, _ := ctx.GetData("HTTP_RESPONSE_WRITER").(http.ResponseWriter)
if stdw == nil {
logger.Debugf("%s: cannot get response writer from context", sp.name)
logger.Errorf("%s: cannot get response writer from context", sp.name)
sp.buildFailureResponse(ctx, http.StatusInternalServerError)
metric.StatusCode = http.StatusInternalServerError
return resultInternalError
Expand Down
20 changes: 16 additions & 4 deletions pkg/filters/requestadaptor/requestadaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package requestadaptor
import (
"fmt"
"io"
"strconv"
"time"

"github.com/megaease/easegress/pkg/context"
Expand All @@ -40,6 +41,9 @@ const (
resultDecompressFailed = "decompressFailed"
resultCompressFailed = "compressFailed"
resultSignFailed = "signFailed"

keyContentLength = "Content-Length"
keyContentEncoding = "Content-Encoding"
)

var kind = &filters.Kind{
Expand Down Expand Up @@ -295,14 +299,16 @@ func (ra *RequestAdaptor) Handle(ctx *context.Context) string {
}

func (ra *RequestAdaptor) processCompress(req *httpprot.Request) string {
encoding := req.HTTPHeader().Get("Content-Encoding")
encoding := req.HTTPHeader().Get(keyContentEncoding)
if encoding != "" {
return ""
}

zr := readers.NewGZipCompressReader(req.GetPayload())
if req.IsStream() {
req.SetPayload(zr)
req.ContentLength = -1
req.HTTPHeader().Del(keyContentLength)
} else {
data, err := io.ReadAll(zr)
zr.Close()
Expand All @@ -311,14 +317,16 @@ func (ra *RequestAdaptor) processCompress(req *httpprot.Request) string {
return resultCompressFailed
}
req.SetPayload(data)
req.ContentLength = int64(len(data))
req.HTTPHeader().Set(keyContentLength, strconv.Itoa(len(data)))
}

req.HTTPHeader().Set("Content-Encoding", "gzip")
req.HTTPHeader().Set(keyContentEncoding, "gzip")
return ""
}

func (ra *RequestAdaptor) processDecompress(req *httpprot.Request) string {
encoding := req.HTTPHeader().Get("Content-Encoding")
encoding := req.HTTPHeader().Get(keyContentEncoding)
if ra.spec.Decompress != "gzip" || encoding != "gzip" {
return ""
}
Expand All @@ -330,6 +338,8 @@ func (ra *RequestAdaptor) processDecompress(req *httpprot.Request) string {

if req.IsStream() {
req.SetPayload(zr)
req.ContentLength = -1
req.HTTPHeader().Del(keyContentLength)
} else {
data, err := io.ReadAll(zr)
zr.Close()
Expand All @@ -338,9 +348,11 @@ func (ra *RequestAdaptor) processDecompress(req *httpprot.Request) string {
return resultDecompressFailed
}
req.SetPayload(data)
req.ContentLength = int64(len(data))
req.HTTPHeader().Set(keyContentLength, strconv.Itoa(len(data)))
}

req.HTTPHeader().Del("Content-Encoding")
req.HTTPHeader().Del(keyContentEncoding)
return ""
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/filters/responseadaptor/responseadaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (ra *ResponseAdaptor) compress(resp *httpprot.Response) string {
zr := readers.NewGZipCompressReader(resp.GetPayload())
if resp.IsStream() {
resp.SetPayload(zr)
resp.ContentLength = -1
resp.HTTPHeader().Del(keyContentLength)
} else {
data, err := io.ReadAll(zr)
Expand All @@ -184,6 +185,7 @@ func (ra *ResponseAdaptor) compress(resp *httpprot.Response) string {
return resultCompressFailed
}
resp.SetPayload(data)
resp.ContentLength = int64(len(data))
resp.HTTPHeader().Set(keyContentLength, strconv.Itoa(len(data)))
}

Expand All @@ -208,6 +210,7 @@ func (ra *ResponseAdaptor) decompress(resp *httpprot.Response) string {

if resp.IsStream() {
resp.SetPayload(zr)
resp.ContentLength = -1
resp.HTTPHeader().Del(keyContentLength)
} else {
data, err := io.ReadAll(zr)
Expand All @@ -217,6 +220,7 @@ func (ra *ResponseAdaptor) decompress(resp *httpprot.Response) string {
return resultDecompressFailed
}
resp.SetPayload(data)
resp.ContentLength = int64(len(data))
resp.HTTPHeader().Set(keyContentLength, strconv.Itoa(len(data)))
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/object/httpserver/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,14 +525,14 @@ func (mi *muxInstance) serveHTTP(stdw http.ResponseWriter, stdr *http.Request) {

route := mi.search(req)
if route.code != 0 {
logger.Debugf("%s: status code of result route for [%s %s]: %d", mi.superSpec.Name(), req.Method(), req.RequestURI, route.code)
logger.Errorf("%s: status code of result route for [%s %s]: %d", mi.superSpec.Name(), req.Method(), req.RequestURI, route.code)
buildFailureResponse(ctx, route.code)
return
}

handler, ok := mi.muxMapper.GetHandler(route.path.backend)
if !ok {
logger.Debugf("%s: backend(Pipeline) %q for [%s %s] not found", mi.superSpec.Name(), req.Method(), req.RequestURI, route.path.backend)
logger.Errorf("%s: backend(Pipeline) %q for [%s %s] not found", mi.superSpec.Name(), req.Method(), req.RequestURI, route.path.backend)
buildFailureResponse(ctx, http.StatusServiceUnavailable)
return
}
Expand All @@ -549,12 +549,12 @@ func (mi *muxInstance) serveHTTP(stdw http.ResponseWriter, stdr *http.Request) {
}
err := req.FetchPayload(maxBodySize)
if err == httpprot.ErrRequestEntityTooLarge {
logger.Debugf("%s: %s", mi.superSpec.Name(), err.Error())
logger.Errorf("%s: %s, you may need to increase 'clientMaxBodySize' or set it to -1", mi.superSpec.Name(), err.Error())
buildFailureResponse(ctx, http.StatusRequestEntityTooLarge)
return
}
if err != nil {
logger.Debugf("%s: failed to read request body: %v", mi.superSpec.Name(), err)
logger.Errorf("%s: failed to read request body: %v", mi.superSpec.Name(), err)
buildFailureResponse(ctx, http.StatusBadRequest)
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/protocols/httpprot/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Response struct {
}

// ErrResponseEntityTooLarge means the request entity is too large.
var ErrResponseEntityTooLarge = fmt.Errorf("response entity too large")
var ErrResponseEntityTooLarge = fmt.Errorf("response entity too large, you may need to increase 'serverMaxBodySize' or set it to -1")

var _ protocols.Response = (*Response)(nil)

Expand Down

0 comments on commit 39255e3

Please sign in to comment.