Skip to content

Commit

Permalink
support global filter fallthrough when meet error (#1233)
Browse files Browse the repository at this point in the history
  • Loading branch information
suchen-sci committed Mar 1, 2024
1 parent 474996a commit b0490a5
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 8 deletions.
165 changes: 164 additions & 1 deletion build/test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1320,7 +1320,7 @@ func TestEgctlNamespace(t *testing.T) {
mockNamespace := `
name: mockNamespace
kind: MockNamespacer
namespace: mockNamespace
namespace: mockNamespace
httpservers:
- kind: HTTPServer
name: mock-httpserver
Expand Down Expand Up @@ -1642,3 +1642,166 @@ filters:
assert.Equal(tc.query, queryMap)
}
}

func TestGlobalFilterFallthrough(t *testing.T) {
assert := assert.New(t)

gfYamlTmpl := `
name: global-filter
kind: GlobalFilter
beforePipeline:
filters:
- name: validator
kind: Validator
headers:
Before-Pipeline:
values: ["valid"]
afterPipeline:
filters:
- name: adaptor
kind: ResponseAdaptor
header:
add:
After-Pipeline: valid
%s
`
yaml := fmt.Sprintf(gfYamlTmpl, "")
err := createResource(yaml)
assert.Nil(err)
defer deleteResource("globalfilter", "global-filter")

yaml = `
name: httpserver-gf
kind: HTTPServer
port: 10099
globalFilter: global-filter
rules:
- paths:
- pathPrefix: /health
backend: pipeline-ok
- pathPrefix: /
backend: pipeline-gf
`
err = createResource(yaml)
assert.Nil(err)
defer deleteResource("httpserver", "httpserver-gf")

makeReq := func() *http.Request {
req, err := http.NewRequest(http.MethodGet, "http:https://127.0.0.1:10099", nil)
assert.Nil(err)
return req
}

yaml = `
name: pipeline-ok
kind: Pipeline
filters:
- name: responsebuilder
kind: ResponseBuilder
protocol: http
template: |
statusCode: 200
`
err = createResource(yaml)
assert.Nil(err)
defer deleteResource("pipeline", "pipeline-ok")

// invalid url
yaml = `
name: pipeline-gf
kind: Pipeline
filters:
- name: proxy
kind: Proxy
pools:
- servers:
- url: http:https://127.0.0.1:11111
`
err = createResource(yaml)
assert.Nil(err)
defer deleteResource("pipeline", "pipeline-gf")

httpServerStarted := checkServerStart(func() *http.Request {
req, err := http.NewRequest(http.MethodGet, "http:https://127.0.0.1:10099/health", nil)
req.Header.Add("Before-Pipeline", "valid")
require.Nil(t, err)
return req
})
assert.True(httpServerStarted)
time.Sleep(3 * time.Second)

// not pass before, not exec pipeline and after
// bad request from before
// not header of after
req := makeReq()
resp, err := http.DefaultClient.Do(req)
resp.Body.Close()
assert.Nil(err)
assert.Equal(http.StatusBadRequest, resp.StatusCode, resp)
assert.Empty(resp.Header.Get("After-Pipeline"), resp)

// pass before, exec pipeline, meet error, not exec after
// status code of 503, error in filter
req = makeReq()
req.Header.Add("Before-Pipeline", "valid")
resp, err = http.DefaultClient.Do(req)
resp.Body.Close()
assert.Nil(err)
assert.Equal(http.StatusServiceUnavailable, resp.StatusCode, resp)
assert.Empty(resp.Header.Get("After-Pipeline"), resp)

// pass before, exec pipeline, exec after
req = makeReq()
req.URL.Path = "/health"
req.Header.Add("Before-Pipeline", "valid")
resp, err = http.DefaultClient.Do(req)
resp.Body.Close()
assert.Nil(err)
assert.Equal(http.StatusOK, resp.StatusCode, resp)
assert.Equal("valid", resp.Header.Get("After-Pipeline"), resp)

// update gfYaml to fallthrough before pipeline
yaml = fmt.Sprintf(gfYamlTmpl, `
fallthrough:
beforePipeline: true
`)
err = applyResource(yaml)
assert.Nil(err)
time.Sleep(3 * time.Second)

// pass before, exec pipeline, meet error, exec after
// not add header to before
req = makeReq()
resp, err = http.DefaultClient.Do(req)
resp.Body.Close()
assert.Nil(err)
assert.Equal(http.StatusServiceUnavailable, resp.StatusCode, resp)
assert.Empty(resp.Header.Get("After-Pipeline"), resp)

// pass before, exec pipeline, exec after
req = makeReq()
req.URL.Path = "/health"
resp, err = http.DefaultClient.Do(req)
resp.Body.Close()
assert.Nil(err)
assert.Equal(http.StatusOK, resp.StatusCode, resp)
assert.Equal("valid", resp.Header.Get("After-Pipeline"), resp)

// update gfYaml to fallthrough before pipeline
yaml = fmt.Sprintf(gfYamlTmpl, `
fallthrough:
beforePipeline: true
pipeline: true
`)
err = applyResource(yaml)
assert.Nil(err)
time.Sleep(3 * time.Second)

// fallthrough before, fallthrough pipeline, exec after
req = makeReq()
resp, err = http.DefaultClient.Do(req)
resp.Body.Close()
assert.Nil(err)
assert.Equal(http.StatusServiceUnavailable, resp.StatusCode, resp)
assert.Equal("valid", resp.Header.Get("After-Pipeline"), resp)
}
9 changes: 9 additions & 0 deletions docs/07.Reference/7.01.Controllers.md
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,15 @@ beforePipeline:
- name: validator
kind: Validator
...

# fallthrough controls the error handling behavior in different pipelines.
# specifying whether subsequent stages should execute despite encountering errors in earlier stages.
# By default, encountering an error in either the beforePipeline or the pipeline stages halts the entire process.
fallthrough:
# false indicates that if an error occurs in the beforePipeline, the pipeline will not execute.
beforePipeline: false
# false means that if an error occurs in the pipeline, the afterPipeline will not execute.
pipeline: false
---
name: server-example
kind: HTTPServer
Expand Down
13 changes: 12 additions & 1 deletion pkg/object/globalfilter/globalfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ type (
Spec struct {
BeforePipeline *pipeline.Spec `json:"beforePipeline,omitempty"`
AfterPipeline *pipeline.Spec `json:"afterPipeline,omitempty"`
Fallthrough Fallthrough `json:"fallthrough,omitempty"`
}

// Fallthrough describes the fallthrough behavior.
Fallthrough struct {
BeforePipeline bool `json:"beforePipeline,omitempty"`
Pipeline bool `json:"pipeline,omitempty"`
}

// pipelineSpec defines pipeline spec to create an pipeline entity.
Expand Down Expand Up @@ -197,7 +204,11 @@ func (gf *GlobalFilter) Handle(ctx *context.Context, handler context.Handler) {

before, _ := gf.beforePipeline.Load().(*pipeline.Pipeline)
after, _ := gf.afterPipeline.Load().(*pipeline.Pipeline)
p.HandleWithBeforeAfter(ctx, before, after)
option := pipeline.HandleWithBeforeAfterOption{
FallthroughBefore: gf.spec.Fallthrough.BeforePipeline,
FallthroughPipeline: gf.spec.Fallthrough.Pipeline,
}
p.HandleWithBeforeAfter(ctx, before, after, option)
}

// Close closes GlobalFilter itself.
Expand Down
14 changes: 11 additions & 3 deletions pkg/object/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,17 @@ func (p *Pipeline) getFilter(name string) filters.Filter {
return p.filters[name]
}

// HandleWithBeforeAfterOption is the option of HandleWithBeforeAfter.
// FallthroughBefore: if true, the pipeline will be executed even if the before pipeline ends.
// FallthroughPipeline: if true, the after pipeline will be executed even if the pipeline ends.
type HandleWithBeforeAfterOption struct {
FallthroughBefore bool
FallthroughPipeline bool
}

// HandleWithBeforeAfter handles the request, with additional flow defined by
// the before/after pipeline.
func (p *Pipeline) HandleWithBeforeAfter(ctx *context.Context, before, after *Pipeline) string {
func (p *Pipeline) HandleWithBeforeAfter(ctx *context.Context, before, after *Pipeline, option HandleWithBeforeAfterOption) string {
if len(p.spec.Data) > 0 {
ctx.SetData("PIPELINE", p.spec.Data)
}
Expand All @@ -331,11 +339,11 @@ func (p *Pipeline) HandleWithBeforeAfter(ctx *context.Context, before, after *Pi
result, stats, sawEnd = p.doHandle(ctx, before.flow, stats)
}

if !sawEnd {
if !sawEnd || option.FallthroughBefore {
result, stats, sawEnd = p.doHandle(ctx, p.flow, stats)
}

if !sawEnd && after != nil {
if (after != nil) && (!sawEnd || option.FallthroughPipeline) {
result, stats, _ = p.doHandle(ctx, after.flow, stats)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/object/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ filters:
ctx := context.New(tracing.NoopSpan)
ctx.SetRequest(context.DefaultNamespace, req)

pipeline.HandleWithBeforeAfter(ctx, nil, nil)
pipeline.HandleWithBeforeAfter(ctx, nil, nil, HandleWithBeforeAfterOption{})
tags := ctx.Tags()
assert.NotContains(tags, "filter1")
assert.Contains(tags, "filter2")
Expand All @@ -467,7 +467,7 @@ filters:

ctx = context.New(tracing.NoopSpan)
ctx.SetRequest(context.DefaultNamespace, req)
pipeline.HandleWithBeforeAfter(ctx, nil, after)
pipeline.HandleWithBeforeAfter(ctx, nil, after, HandleWithBeforeAfterOption{})
tags = ctx.Tags()
assert.NotContains(tags, "filter1")
assert.Contains(tags, "filter2")
Expand All @@ -492,7 +492,7 @@ filters:

ctx = context.New(tracing.NoopSpan)
ctx.SetRequest(context.DefaultNamespace, req)
pipeline.HandleWithBeforeAfter(ctx, before, after)
pipeline.HandleWithBeforeAfter(ctx, before, after, HandleWithBeforeAfterOption{})
tags = ctx.Tags()
assert.Contains(tags, "filter1")
assert.NotContains(tags, "filter2")
Expand Down

0 comments on commit b0490a5

Please sign in to comment.