Skip to content

Commit

Permalink
update filter
Browse files Browse the repository at this point in the history
  • Loading branch information
suchen-sci committed Mar 25, 2022
1 parent c823a66 commit 4718258
Show file tree
Hide file tree
Showing 20 changed files with 264 additions and 127 deletions.
5 changes: 5 additions & 0 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,8 @@ func (ctx *context) Finish() {
fn()
}
}

func (ctx *context) Span() tracing.Span {
// TODO: add span
return nil
}
57 changes: 31 additions & 26 deletions pkg/filters/apiaggregator/apiaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ import (
"github.com/megaease/easegress/pkg/filters"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/object/rawconfigtrafficcontroller"
"github.com/megaease/easegress/pkg/protocols/httpprot"
"github.com/megaease/easegress/pkg/protocols/httpprot/httpheader"
"github.com/megaease/easegress/pkg/tracing"
"github.com/megaease/easegress/pkg/util/httpheader"
"github.com/megaease/easegress/pkg/util/pathadaptor"
)

Expand Down Expand Up @@ -174,30 +175,32 @@ func (aa *APIAggregator) reload() {

// Handle limits HTTPContext.
func (aa *APIAggregator) Handle(ctx context.Context) (result string) {
httpreq := ctx.Request().(*httpprot.Request)
httpresp := ctx.Response().(*httpprot.Response)
buff := bytes.NewBuffer(nil)
if aa.spec.MaxBodyBytes > 0 {
written, err := io.CopyN(buff, ctx.Request().Body(), aa.spec.MaxBodyBytes+1)
written, err := io.CopyN(buff, httpreq.Payload().NewReader(), aa.spec.MaxBodyBytes+1)
if written > aa.spec.MaxBodyBytes {
ctx.AddTag(fmt.Sprintf("apiAggregator: request body exceed %dB", aa.spec.MaxBodyBytes))
ctx.Response().SetStatusCode(http.StatusRequestEntityTooLarge)
httpresp.SetStatusCode(http.StatusRequestEntityTooLarge)
return resultFailed
}
if err != io.EOF {
ctx.AddTag(fmt.Sprintf("apiAggregator: read request body failed: %v", err))
ctx.Response().SetStatusCode(http.StatusBadRequest)
httpresp.SetStatusCode(http.StatusBadRequest)
return resultFailed
}
}

wg := &sync.WaitGroup{}
wg.Add(len(aa.spec.Pipelines))

httpResps := make([]context.HTTPResponse, len(aa.spec.Pipelines))
httpResps := make([]*httpprot.Response, len(aa.spec.Pipelines))
for i, p := range aa.spec.Pipelines {
req, err := aa.newHTTPReq(ctx, p, buff)
if err != nil {
logger.Errorf("BUG: new HTTP request failed: %v, pipelinename: %s", err, aa.spec.Pipelines[i].Name)
ctx.Response().SetStatusCode(http.StatusBadRequest)
httpresp.SetStatusCode(http.StatusBadRequest)
return resultFailed
}

Expand All @@ -209,9 +212,9 @@ func (aa *APIAggregator) Handle(ctx context.Context) (result string) {
return
}
w := httptest.NewRecorder()
copyCtx := context.New(w, req, tracing.NoopTracing, "no trace")
copyCtx := context.New(httpprot.NewRequest(req), httpprot.NewResponse(w), tracing.NoopTracing, "no trace")
handler.Handle(copyCtx)
rsp := copyCtx.Response()
rsp := copyCtx.Response().(*httpprot.Response)

if rsp != nil && rsp.StatusCode() == http.StatusOK {
httpResps[i] = rsp
Expand All @@ -225,9 +228,7 @@ func (aa *APIAggregator) Handle(ctx context.Context) (result string) {
_resp := resp

if resp != nil {
if body, ok := _resp.Body().(io.ReadCloser); ok {
defer body.Close()
}
_resp.Payload().Close()
}
}

Expand All @@ -236,14 +237,14 @@ func (aa *APIAggregator) Handle(ctx context.Context) (result string) {
// Get all Pipeline response' body
for i, resp := range httpResps {
if resp == nil && !aa.spec.PartialSucceed {
ctx.Response().Std().Header().Set("X-EG-Aggregator", fmt.Sprintf("failed-in-%s",
httpresp.Header().Set("X-EG-Aggregator", fmt.Sprintf("failed-in-%s",
aa.spec.Pipelines[i].Name))
ctx.Response().SetStatusCode(http.StatusServiceUnavailable)
httpresp.SetStatusCode(http.StatusServiceUnavailable)
return resultFailed
}

if resp != nil && resp.Body() != nil {
if res := aa.copyHTTPBody2Map(resp.Body(), ctx, data, aa.spec.Pipelines[i].Name); len(res) != 0 {
if resp != nil && resp.Payload().NewReader() != nil {
if res := aa.copyHTTPBody2Map(resp.Payload().NewReader(), ctx, data, aa.spec.Pipelines[i].Name); len(res) != 0 {
return res
}
}
Expand All @@ -253,18 +254,20 @@ func (aa *APIAggregator) Handle(ctx context.Context) (result string) {
}

func (aa *APIAggregator) newHTTPReq(ctx context.Context, p *Pipeline, buff *bytes.Buffer) (*http.Request, error) {
var stdctx stdcontext.Context = ctx
httpreq := ctx.Request().(*httpprot.Request)

var stdctx stdcontext.Context = httpreq.Context()
if aa.spec.timeout != nil {
// NOTE: Cancel function could be omitted here.
stdctx, _ = stdcontext.WithTimeout(stdctx, *aa.spec.timeout)
}

method := ctx.Request().Method()
method := httpreq.Method()
if p.Method != "" {
method = p.Method
}

url := ctx.Request().Std().URL
url := httpreq.Std().URL
if p.pa != nil {
url.Path = p.pa.Adapt(url.Path)
}
Expand All @@ -278,17 +281,18 @@ func (aa *APIAggregator) newHTTPReq(ctx context.Context, p *Pipeline, buff *byte
}

func (aa *APIAggregator) copyHTTPBody2Map(body io.Reader, ctx context.Context, data map[string][]byte, name string) string {
httpresp := ctx.Response().(*httpprot.Response)
respBody := bytes.NewBuffer(nil)

written, err := io.CopyN(respBody, body, aa.spec.MaxBodyBytes)
if written > aa.spec.MaxBodyBytes {
ctx.AddTag(fmt.Sprintf("apiAggregator: response body exceed %dB", aa.spec.MaxBodyBytes))
ctx.Response().SetStatusCode(http.StatusInsufficientStorage)
httpresp.SetStatusCode(http.StatusInsufficientStorage)
return resultFailed
}
if err != io.EOF {
ctx.AddTag(fmt.Sprintf("apiAggregator: read response body failed: %v", err))
ctx.Response().SetStatusCode(http.StatusInternalServerError)
httpresp.SetStatusCode(http.StatusInternalServerError)
return resultFailed
}

Expand All @@ -298,14 +302,15 @@ func (aa *APIAggregator) copyHTTPBody2Map(body io.Reader, ctx context.Context, d
}

func (aa *APIAggregator) formatResponse(ctx context.Context, data map[string][]byte) string {
httpresp := ctx.Response().(*httpprot.Response)
if aa.spec.MergeResponse {
result := map[string]interface{}{}
for _, resp := range data {
err := jsoniter.Unmarshal(resp, &result)
if err != nil {
ctx.AddTag(fmt.Sprintf("apiAggregator: unmarshal %s to json object failed: %v",
resp, err))
ctx.Response().SetStatusCode(context.EGStatusBadResponse)
httpresp.SetStatusCode(context.EGStatusBadResponse)
return resultFailed
}
}
Expand All @@ -314,11 +319,11 @@ func (aa *APIAggregator) formatResponse(ctx context.Context, data map[string][]b
ctx.AddTag(fmt.Sprintf("apiAggregator: marshal %#v to json failed: %v",
result, err))
logger.Errorf("apiAggregator: marshal %#v to json failed: %v", result, err)
ctx.Response().SetStatusCode(http.StatusInternalServerError)
httpresp.SetStatusCode(http.StatusInternalServerError)
return resultFailed
}

ctx.Response().SetBody(bytes.NewReader(buff))
httpresp.Payload().SetReader(bytes.NewReader(buff), true)
} else {
result := []map[string]interface{}{}
for _, resp := range data {
Expand All @@ -327,7 +332,7 @@ func (aa *APIAggregator) formatResponse(ctx context.Context, data map[string][]b
if err != nil {
ctx.AddTag(fmt.Sprintf("apiAggregator: unmarshal %s to json object failed: %v",
resp, err))
ctx.Response().SetStatusCode(context.EGStatusBadResponse)
httpresp.SetStatusCode(context.EGStatusBadResponse)
return resultFailed
}
result = append(result, ele)
Expand All @@ -336,11 +341,11 @@ func (aa *APIAggregator) formatResponse(ctx context.Context, data map[string][]b
if err != nil {
ctx.AddTag(fmt.Sprintf("apiAggregator: marshal %#v to json failed: %v",
result, err))
ctx.Response().SetStatusCode(http.StatusInternalServerError)
httpresp.SetStatusCode(http.StatusInternalServerError)
return resultFailed
}

ctx.Response().SetBody(bytes.NewReader(buff))
httpresp.Payload().SetReader(bytes.NewReader(buff), true)
}

return ""
Expand Down
6 changes: 4 additions & 2 deletions pkg/filters/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/filters"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/protocols/httpprot"
)

const (
Expand Down Expand Up @@ -118,6 +119,7 @@ func (b *Bridge) InjectMuxMapper(mapper context.MuxMapper) {

// Handle builds a bridge for pipeline.
func (b *Bridge) Handle(ctx context.Context) (result string) {
httpresp := ctx.Response().(*httpprot.Response)
if len(b.spec.Destinations) <= 0 {
panic("not any destination defined")
}
Expand All @@ -141,15 +143,15 @@ func (b *Bridge) Handle(ctx context.Context) (result string) {

if !found {
logger.Errorf("dest not found: %s", dest)
ctx.Response().SetStatusCode(http.StatusServiceUnavailable)
httpresp.SetStatusCode(http.StatusServiceUnavailable)
return resultDestinationNotFound
}

handler, exists := b.muxMapper.GetHandler(dest)

if !exists {
logger.Errorf("failed to get running object %s", b.spec.Destinations[0])
ctx.Response().SetStatusCode(http.StatusServiceUnavailable)
httpresp.SetStatusCode(http.StatusServiceUnavailable)
return resultDestinationNotFound
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/filters/canary/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
yaml "gopkg.in/yaml.v2"

"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/protocols/httpprot"
)

// Rule is the A/B testing rule specified by user.
Expand Down Expand Up @@ -85,9 +86,10 @@ func (r *Rule) parse(p []byte) (err error) {
// if true, add tag.
// if false, do nothing.
func (r *Rule) doMatch(ctx context.Context) {
httpreq := ctx.Request().(*httpprot.Request)
if r.isMatch(&sourceData{
req: ctx.Request().Std(),
clientIP: ctx.Request().RealIP(),
req: httpreq.Std(),
clientIP: httpreq.RealIP(),
}) {
ctx.Request().Header().Set(r.TagKey, r.TagValue)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/filters/httprequestbuilder/httprequestbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package requestbuilder
import (
"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/filters"
"github.com/megaease/easegress/pkg/protocols"
)

const (
Expand Down Expand Up @@ -92,7 +91,8 @@ func (rb *HTTPRequestBuilder) reload() {

// Handle builds request.
func (rb *HTTPRequestBuilder) Handle(ctx context.Context) string {
r := protocols.HTTP().CreateRequest().(*protocols.HTTPRequest)
// TODO: finish it later
panic("finish this when ready")
return ""
}

Expand Down
27 changes: 27 additions & 0 deletions pkg/filters/httprequestbuilder/httprequestbuilder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2017, MegaEase
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package requestbuilder

import "testing"

func TestRequestBuilder(t *testing.T) {
// this test will panic 100% update it when ready
// don't forget TODO
rb := &HTTPRequestBuilder{}
rb.Handle(nil)
}
15 changes: 9 additions & 6 deletions pkg/filters/meshadaptor/meshadaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ package meshadaptor
import (
"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/filters"
"github.com/megaease/easegress/pkg/util/httpfilter"
"github.com/megaease/easegress/pkg/util/httpheader"
"github.com/megaease/easegress/pkg/protocols/httpprot"
"github.com/megaease/easegress/pkg/protocols/httpprot/httpfilter"
"github.com/megaease/easegress/pkg/protocols/httpprot/httpheader"
"github.com/megaease/easegress/pkg/util/pathadaptor"
)

Expand Down Expand Up @@ -104,14 +105,16 @@ func (ra *MeshAdaptor) reload() {

// Handle adapts request.
func (ra *MeshAdaptor) Handle(ctx context.Context) string {
httpreq := ctx.Request().(*httpprot.Request)
for _, serviceCanary := range ra.spec.ServiceCanaries {
if serviceCanary.filter.Filter(ctx) {
ctx.Request().Header().Adapt(serviceCanary.Header, ctx.Template())
if serviceCanary.filter.Filter(httpreq) {
// ctx.Request().Header().Adapt(serviceCanary.Header, ctx.Template())
// TODO: add context template here!
panic("")
}
}

result := ""
return ctx.CallNextHandler(result)
return ""
}

// Status returns status.
Expand Down
26 changes: 26 additions & 0 deletions pkg/filters/meshadaptor/meshadaptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2017, MegaEase
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package meshadaptor

import "testing"

func TestMeshAdaptor(t *testing.T) {
// this test will panic 100% only update this test when finish context template
ra := &MeshAdaptor{}
ra.Handle(nil)
}
3 changes: 2 additions & 1 deletion pkg/filters/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (m *Mock) match(ctx context.Context) *Rule {
}

func (m *Mock) mock(ctx context.Context, rule *Rule) {
httpreq := ctx.Request().(*httpprot.Request)
w := ctx.Response().(*httpprot.Response)
w.SetStatusCode(rule.Code)
for key, value := range rule.Headers {
Expand All @@ -211,7 +212,7 @@ func (m *Mock) mock(ctx context.Context, rule *Rule) {

logger.Debugf("delay for %v ...", rule.delay)
select {
case <-ctx.Done():
case <-httpreq.Context().Done():
logger.Debugf("request cancelled in the middle of delay mocking")
case <-time.After(rule.delay):
}
Expand Down
Loading

0 comments on commit 4718258

Please sign in to comment.