diff --git a/build/test/integration_test.go b/build/test/integration_test.go index 3d66e8279f..d9ee9082e6 100644 --- a/build/test/integration_test.go +++ b/build/test/integration_test.go @@ -1320,7 +1320,7 @@ func TestEgctlNamespace(t *testing.T) { mockNamespace := ` name: mockNamespace kind: MockNamespacer -namespace: mockNamespace +namespace: mockNamespace httpservers: - kind: HTTPServer name: mock-httpserver @@ -1642,3 +1642,166 @@ filters: assert.Equal(tc.query, queryMap) } } + +func TestGlobalFilterFallthrough(t *testing.T) { + assert := assert.New(t) + + gfYamlTmpl := ` +name: global-filter +kind: GlobalFilter +beforePipeline: + filters: + - name: validator + kind: Validator + headers: + Before-Pipeline: + values: ["valid"] +afterPipeline: + filters: + - name: adaptor + kind: ResponseAdaptor + header: + add: + After-Pipeline: valid +%s +` + yaml := fmt.Sprintf(gfYamlTmpl, "") + err := createResource(yaml) + assert.Nil(err) + defer deleteResource("globalfilter", "global-filter") + + yaml = ` +name: httpserver-gf +kind: HTTPServer +port: 10099 +globalFilter: global-filter +rules: +- paths: + - pathPrefix: /health + backend: pipeline-ok + - pathPrefix: / + backend: pipeline-gf +` + err = createResource(yaml) + assert.Nil(err) + defer deleteResource("httpserver", "httpserver-gf") + + makeReq := func() *http.Request { + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:10099", nil) + assert.Nil(err) + return req + } + + yaml = ` +name: pipeline-ok +kind: Pipeline +filters: +- name: responsebuilder + kind: ResponseBuilder + protocol: http + template: | + statusCode: 200 +` + err = createResource(yaml) + assert.Nil(err) + defer deleteResource("pipeline", "pipeline-ok") + + // invalid url + yaml = ` +name: pipeline-gf +kind: Pipeline +filters: +- name: proxy + kind: Proxy + pools: + - servers: + - url: http://127.0.0.1:11111 +` + err = createResource(yaml) + assert.Nil(err) + defer deleteResource("pipeline", "pipeline-gf") + + httpServerStarted := checkServerStart(func() *http.Request { + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:10099/health", nil) + req.Header.Add("Before-Pipeline", "valid") + require.Nil(t, err) + return req + }) + assert.True(httpServerStarted) + time.Sleep(3 * time.Second) + + // not pass before, not exec pipeline and after + // bad request from before + // not header of after + req := makeReq() + resp, err := http.DefaultClient.Do(req) + resp.Body.Close() + assert.Nil(err) + assert.Equal(http.StatusBadRequest, resp.StatusCode, resp) + assert.Empty(resp.Header.Get("After-Pipeline"), resp) + + // pass before, exec pipeline, meet error, not exec after + // status code of 503, error in filter + req = makeReq() + req.Header.Add("Before-Pipeline", "valid") + resp, err = http.DefaultClient.Do(req) + resp.Body.Close() + assert.Nil(err) + assert.Equal(http.StatusServiceUnavailable, resp.StatusCode, resp) + assert.Empty(resp.Header.Get("After-Pipeline"), resp) + + // pass before, exec pipeline, exec after + req = makeReq() + req.URL.Path = "/health" + req.Header.Add("Before-Pipeline", "valid") + resp, err = http.DefaultClient.Do(req) + resp.Body.Close() + assert.Nil(err) + assert.Equal(http.StatusOK, resp.StatusCode, resp) + assert.Equal("valid", resp.Header.Get("After-Pipeline"), resp) + + // update gfYaml to fallthrough before pipeline + yaml = fmt.Sprintf(gfYamlTmpl, ` +fallthrough: + beforePipeline: true +`) + err = applyResource(yaml) + assert.Nil(err) + time.Sleep(3 * time.Second) + + // pass before, exec pipeline, meet error, exec after + // not add header to before + req = makeReq() + resp, err = http.DefaultClient.Do(req) + resp.Body.Close() + assert.Nil(err) + assert.Equal(http.StatusServiceUnavailable, resp.StatusCode, resp) + assert.Empty(resp.Header.Get("After-Pipeline"), resp) + + // pass before, exec pipeline, exec after + req = makeReq() + req.URL.Path = "/health" + resp, err = http.DefaultClient.Do(req) + resp.Body.Close() + assert.Nil(err) + assert.Equal(http.StatusOK, resp.StatusCode, resp) + assert.Equal("valid", resp.Header.Get("After-Pipeline"), resp) + + // update gfYaml to fallthrough before pipeline + yaml = fmt.Sprintf(gfYamlTmpl, ` +fallthrough: + beforePipeline: true + pipeline: true +`) + err = applyResource(yaml) + assert.Nil(err) + time.Sleep(3 * time.Second) + + // fallthrough before, fallthrough pipeline, exec after + req = makeReq() + resp, err = http.DefaultClient.Do(req) + resp.Body.Close() + assert.Nil(err) + assert.Equal(http.StatusServiceUnavailable, resp.StatusCode, resp) + assert.Equal("valid", resp.Header.Get("After-Pipeline"), resp) +} diff --git a/docs/07.Reference/7.01.Controllers.md b/docs/07.Reference/7.01.Controllers.md index 1a14f962bc..f2c9575d04 100644 --- a/docs/07.Reference/7.01.Controllers.md +++ b/docs/07.Reference/7.01.Controllers.md @@ -391,6 +391,15 @@ beforePipeline: - name: validator kind: Validator ... + +# fallthrough controls the error handling behavior in different pipelines. +# specifying whether subsequent stages should execute despite encountering errors in earlier stages. +# By default, encountering an error in either the beforePipeline or the pipeline stages halts the entire process. +fallthrough: + # false indicates that if an error occurs in the beforePipeline, the pipeline will not execute. + beforePipeline: false + # false means that if an error occurs in the pipeline, the afterPipeline will not execute. + pipeline: false --- name: server-example kind: HTTPServer diff --git a/pkg/object/globalfilter/globalfilter.go b/pkg/object/globalfilter/globalfilter.go index 1cb5d97246..1c8212c49d 100644 --- a/pkg/object/globalfilter/globalfilter.go +++ b/pkg/object/globalfilter/globalfilter.go @@ -66,6 +66,13 @@ type ( Spec struct { BeforePipeline *pipeline.Spec `json:"beforePipeline,omitempty"` AfterPipeline *pipeline.Spec `json:"afterPipeline,omitempty"` + Fallthrough Fallthrough `json:"fallthrough,omitempty"` + } + + // Fallthrough describes the fallthrough behavior. + Fallthrough struct { + BeforePipeline bool `json:"beforePipeline,omitempty"` + Pipeline bool `json:"pipeline,omitempty"` } // pipelineSpec defines pipeline spec to create an pipeline entity. @@ -197,7 +204,11 @@ func (gf *GlobalFilter) Handle(ctx *context.Context, handler context.Handler) { before, _ := gf.beforePipeline.Load().(*pipeline.Pipeline) after, _ := gf.afterPipeline.Load().(*pipeline.Pipeline) - p.HandleWithBeforeAfter(ctx, before, after) + option := pipeline.HandleWithBeforeAfterOption{ + FallthroughBefore: gf.spec.Fallthrough.BeforePipeline, + FallthroughPipeline: gf.spec.Fallthrough.Pipeline, + } + p.HandleWithBeforeAfter(ctx, before, after, option) } // Close closes GlobalFilter itself. diff --git a/pkg/object/pipeline/pipeline.go b/pkg/object/pipeline/pipeline.go index 1451759526..a1c87617d6 100644 --- a/pkg/object/pipeline/pipeline.go +++ b/pkg/object/pipeline/pipeline.go @@ -310,9 +310,17 @@ func (p *Pipeline) getFilter(name string) filters.Filter { return p.filters[name] } +// HandleWithBeforeAfterOption is the option of HandleWithBeforeAfter. +// FallthroughBefore: if true, the pipeline will be executed even if the before pipeline ends. +// FallthroughPipeline: if true, the after pipeline will be executed even if the pipeline ends. +type HandleWithBeforeAfterOption struct { + FallthroughBefore bool + FallthroughPipeline bool +} + // HandleWithBeforeAfter handles the request, with additional flow defined by // the before/after pipeline. -func (p *Pipeline) HandleWithBeforeAfter(ctx *context.Context, before, after *Pipeline) string { +func (p *Pipeline) HandleWithBeforeAfter(ctx *context.Context, before, after *Pipeline, option HandleWithBeforeAfterOption) string { if len(p.spec.Data) > 0 { ctx.SetData("PIPELINE", p.spec.Data) } @@ -331,11 +339,11 @@ func (p *Pipeline) HandleWithBeforeAfter(ctx *context.Context, before, after *Pi result, stats, sawEnd = p.doHandle(ctx, before.flow, stats) } - if !sawEnd { + if !sawEnd || option.FallthroughBefore { result, stats, sawEnd = p.doHandle(ctx, p.flow, stats) } - if !sawEnd && after != nil { + if (after != nil) && (!sawEnd || option.FallthroughPipeline) { result, stats, _ = p.doHandle(ctx, after.flow, stats) } diff --git a/pkg/object/pipeline/pipeline_test.go b/pkg/object/pipeline/pipeline_test.go index a76960e723..a8d670cdf7 100644 --- a/pkg/object/pipeline/pipeline_test.go +++ b/pkg/object/pipeline/pipeline_test.go @@ -442,7 +442,7 @@ filters: ctx := context.New(tracing.NoopSpan) ctx.SetRequest(context.DefaultNamespace, req) - pipeline.HandleWithBeforeAfter(ctx, nil, nil) + pipeline.HandleWithBeforeAfter(ctx, nil, nil, HandleWithBeforeAfterOption{}) tags := ctx.Tags() assert.NotContains(tags, "filter1") assert.Contains(tags, "filter2") @@ -467,7 +467,7 @@ filters: ctx = context.New(tracing.NoopSpan) ctx.SetRequest(context.DefaultNamespace, req) - pipeline.HandleWithBeforeAfter(ctx, nil, after) + pipeline.HandleWithBeforeAfter(ctx, nil, after, HandleWithBeforeAfterOption{}) tags = ctx.Tags() assert.NotContains(tags, "filter1") assert.Contains(tags, "filter2") @@ -492,7 +492,7 @@ filters: ctx = context.New(tracing.NoopSpan) ctx.SetRequest(context.DefaultNamespace, req) - pipeline.HandleWithBeforeAfter(ctx, before, after) + pipeline.HandleWithBeforeAfter(ctx, before, after, HandleWithBeforeAfterOption{}) tags = ctx.Tags() assert.Contains(tags, "filter1") assert.NotContains(tags, "filter2")