Skip to content

Commit

Permalink
put some http util into protocols/httpprot and update httpprot reques…
Browse files Browse the repository at this point in the history
…t and response
  • Loading branch information
suchen-sci committed Mar 24, 2022
1 parent e8bab2d commit 10361dd
Show file tree
Hide file tree
Showing 14 changed files with 79 additions and 42 deletions.
2 changes: 1 addition & 1 deletion pkg/filters/fallback/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/filters"
"github.com/megaease/easegress/pkg/protocols/httpprot"
"github.com/megaease/easegress/pkg/util/fallback"
"github.com/megaease/easegress/pkg/protocols/httpprot/fallback"
)

const (
Expand Down
4 changes: 2 additions & 2 deletions pkg/filters/requestadaptor/requestadaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/megaease/easegress/pkg/filters"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/protocols/httpprot"
"github.com/megaease/easegress/pkg/util/httpheader"
"github.com/megaease/easegress/pkg/protocols/httpprot/httpheader"
"github.com/megaease/easegress/pkg/util/pathadaptor"
"github.com/megaease/easegress/pkg/util/stringtool"
)
Expand Down Expand Up @@ -128,7 +128,7 @@ func (ra *RequestAdaptor) Handle(ctx context.Context) string {
}

func (ra *RequestAdaptor) handle(ctx context.Context) string {
r := ctx.Request().(httpprot.Request)
r := ctx.Request().(*httpprot.Request)
method, path, header := r.Method(), r.Path(), r.Header()

if ra.spec.Method != "" && ra.spec.Method != method {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"strconv"

"github.com/megaease/easegress/pkg/protocols/httpprot"
"github.com/megaease/easegress/pkg/util/httpheader"
)

type (
Expand Down Expand Up @@ -54,7 +53,7 @@ func New(spec *Spec) *Fallback {
// Fallback fallbacks HTTPContext.
func (f *Fallback) Fallback(w *httpprot.Response) {
w.SetStatusCode(f.spec.MockCode)
w.Header().Set(httpheader.KeyContentLength, f.bodyLength)
w.Header().Set(httpprot.KeyContentLength, f.bodyLength)
for key, value := range f.spec.MockHeaders {
w.Header().Set(key, value)
}
Expand Down
31 changes: 16 additions & 15 deletions pkg/protocols/httpprot/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package httpprot

import (
"bytes"
"io"
"net/http"

"github.com/megaease/easegress/pkg/protocols"
"github.com/megaease/easegress/pkg/util/readers"
)

func init() {
Expand All @@ -35,8 +35,7 @@ type (
}

payload struct {
reader io.Reader
data []byte
readerAt *readers.ReaderAt
}
)

Expand Down Expand Up @@ -79,31 +78,33 @@ func (h *header) Iter(f func(key string, values []string)) {
var _ protocols.Payload = (*payload)(nil)

func newPayload(r io.Reader) *payload {
data, err := io.ReadAll(r)
// here how to deal with readall error??????
// return nil when error happens????
if err != nil {
panic(err)
if r != nil {
return &payload{
readerAt: readers.NewReaderAt(r),
}
}
return &payload{reader: r, data: data}
return &payload{}
}

func (p *payload) NewReader() io.Reader {
return bytes.NewReader(p.data)
if p.readerAt != nil {
return readers.NewReaderAtReader(p.readerAt, 0)
}
return nil
}

func (p *payload) SetReader(reader io.Reader, closePreviousReader bool) {
if closePreviousReader {
if closer, ok := p.reader.(io.Closer); ok {
closer.Close()
if p.readerAt != nil {
p.readerAt.Close()
}
}
p.reader = reader
p.readerAt = readers.NewReaderAt(reader)
}

func (p *payload) Close() {
if closer, ok := p.reader.(io.Closer); ok {
closer.Close()
if p.readerAt != nil {
p.readerAt.Close()
}
}

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package httpheader
package httpprot

const (
// KeyCacheControl is the key of Cache-Control.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/protocols"
"github.com/megaease/easegress/pkg/protocols/httpprot"
"github.com/megaease/easegress/pkg/util/httpheader"
"github.com/megaease/easegress/pkg/util/stringtool"
)

Expand Down Expand Up @@ -98,7 +97,7 @@ func (mc *MemoryCache) Load(r *httpprot.Request, w *httpprot.Response) (loaded b
return false
}

for _, value := range r.Header().Values(httpheader.KeyCacheControl) {
for _, value := range r.Header().Values(httpprot.KeyCacheControl) {
if strings.Contains(value, "no-cache") {
return false
}
Expand Down Expand Up @@ -144,13 +143,13 @@ func (mc *MemoryCache) Store(r *httpprot.Request, w *httpprot.Response) {
return
}

for _, value := range r.Header().Values(httpheader.KeyCacheControl) {
for _, value := range r.Header().Values(httpprot.KeyCacheControl) {
if strings.Contains(value, "no-store") ||
strings.Contains(value, "no-cache") {
return
}
}
for _, value := range w.Header().Values(httpheader.KeyCacheControl) {
for _, value := range w.Header().Values(httpprot.KeyCacheControl) {
if strings.Contains(value, "no-store") ||
strings.Contains(value, "no-cache") ||
strings.Contains(value, "must-revalidate") {
Expand Down
9 changes: 7 additions & 2 deletions pkg/protocols/httpprot/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package httpprot

import (
"context"
"io"
"net/http"
"net/url"

Expand Down Expand Up @@ -59,8 +60,10 @@ func NewRequest(r *http.Request) *Request {
req := &Request{}
req.std = r
req.realIP = realip.FromRequest(r)
req.payload = newPayload(r.Body)
req.header = newHeader(r.Header)

req.payload = newPayload(r.Body)
req.std.Body = io.NopCloser(req.payload.NewReader())
return req
}

Expand Down Expand Up @@ -129,7 +132,9 @@ func (r *Request) SetHost(host string) {
}

func (r *Request) Clone() protocols.Request {
return nil
req := r.std.Clone(context.Background())
req.Body = io.NopCloser(r.payload.NewReader())
return NewRequest(req)
}

func (r *Request) Path() string {
Expand Down
62 changes: 48 additions & 14 deletions pkg/protocols/httpprot/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package httpprot

import (
"bytes"
"io"
"net/http"
"os"

"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/protocols"
Expand Down Expand Up @@ -48,13 +50,16 @@ type (
)

var _ protocols.Response = (*Response)(nil)
var bodyFlushBuffSize = 8 * int64(os.Getpagesize())

// NewResponse creates a new response from a standard response writer.
func NewResponse(w http.ResponseWriter) *Response {
return &Response{
std: w,
code: http.StatusOK,
header: newHeader(w.Header()),
std: w,
code: http.StatusOK,
header: newHeader(w.Header()),
payload: newPayload(nil),
bodyFlushFuncs: []BodyFlushFunc{},
}
}

Expand Down Expand Up @@ -92,21 +97,50 @@ func (resp *Response) Finish() {
if reader == nil {
return
}
defer resp.Payload().Close()

copyToClient := func(src io.Reader) (succeed bool) {
written, err := io.Copy(resp.std, src)
if err != nil {
logger.Warnf("copy body failed: %v", err)
return false
}
resp.bodyWritten += uint64(written)
return true
}

defer func() {
resp.payload.Close()
}()

written, err := io.Copy(resp.std, resp.payload.NewReader())
if err != nil {
logger.Warnf("copy body failed: %v", err)
if len(resp.bodyFlushFuncs) == 0 {
copyToClient(reader)
return
}
resp.bodyWritten += uint64(written)
}

func (resp *Response) Clone() protocols.Response {
return nil
buff := bytes.NewBuffer(nil)
for {
buff.Reset()
_, err := io.CopyN(buff, reader, bodyFlushBuffSize)
body := buff.Bytes()

switch err {
case nil:
for _, fn := range resp.bodyFlushFuncs {
body = fn(body, false)
}
if !copyToClient(bytes.NewReader(body)) {
return
}
case io.EOF:
for _, fn := range resp.bodyFlushFuncs {
body = fn(body, true)
}

copyToClient(bytes.NewReader(body))
return
default:
resp.std.WriteHeader(http.StatusInternalServerError)
return
}
}

}

func (resp *Response) OnFlushBody(fn BodyFlushFunc) {
Expand Down
1 change: 0 additions & 1 deletion pkg/protocols/protocols.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ type Response interface {
Payload() Payload

Finish()
Clone() Response
}

type Header interface {
Expand Down

0 comments on commit 10361dd

Please sign in to comment.