# Pipeline Explained Easegress offers a rich set of filters tailored for diverse use cases, including validation, request building, and more. At its core, the `Pipeline` serves as an integrated framework within Easegress, designed specifically to coordinate and sequence filters for processing requests. * It supports calling multiple backend services for one input request. * It supports conditional forward direction jumping. * The `HTTPServer` receives incoming traffic and routes to one dedicated `Pipeline` in Easegress according to the HTTP header, path matching, etc. - [Details](#details) - [Sequences executing](#sequences-executing) - [JumpIf](#jumpif) - [Built-in Filter `END`](#built-in-filter-end) - [Alias](#alias) - [Namespace](#namespace) - [Usage](#usage) - [GlobalFilter](#globalfilter) - [Load Balancer](#load-balancer) - [Traffic Adaptor: Change Something of Two-Way Traffic](#traffic-adaptor-change-something-of-two-way-traffic) - [Health Check](#health-check) - [API Aggregation](#api-aggregation) - [Example 1: Simple aggregation](#example-1-simple-aggregation) - [Example 2: Merge response body](#example-2-merge-response-body) - [Example 3: Handle Failures](#example-3-handle-failures) - [References](#references) ## Details ### Sequences executing * The basic model of Pipeline execution is a sequence. Filters are executed one by one in the order described by the `flow` field in Pipeline's spec. ``` Request │ ┌──────────┼──────────┐ │ Pipeline │ │ │ │ │ │ ┌───────┴───────┐ │ │ │requestAdaptor │ │ │ └───────┬───────┘ │ │ │ │ │ │ │ │ ┌───────▼───────┐ │ │ │ Proxy │ │ │ └───────┬───────┘ │ │ │ │ └──────────┼──────────┘ │ ▼ Response ``` ```bash $ echo ' name: pipeline-demo kind: Pipeline flow: - filter: requestAdaptor - filter: proxy filters: - name: requestAdaptor kind: RequestAdaptor header: set: X-Adapt-Key: goodplan - name: proxy kind: Proxy pools: - servers: - url: http://127.0.0.1:9095 ' | egctl create -f - ``` * The `pipeline-demo` above will execute the `requestAdaptor` filter first, then `proxy`. So the `proxy` filter can forward the request with the header `X-Adapt-Key` which was set by the `requestAdaptor`. ### JumpIf * Easegress' filter returns a string message as the execution result. If it's empty, that means these filters handle the request without failure. Otherwise, Easegress will use these no-empty result strings as the basis for the `JumpIf` mechanism. * The Pipeline supports `JumpIf` mechanism[2]. We use it to avoid some chosen filters' execution when something goes wrong. ``` Request │ │ ┌──────────────┼─────────────────┐ │ Pipeline │ │ │ ▼ │ │ ┌──────────────────┐ │ │ │ Validator ├────┐ │ │ └────────┬─────────┘ │ │ │ │ │ │ │ ▼ │ │ │ ┌──────────────────┐ │ │ │ │ RequestAdaptor │ │ │ │ └────────┬─────────┘ │ │ │ │ Invalid│ │ ▼ │ │ │ ┌──────────────────┐ │ │ │ │ Proxy │ │ │ │ └────────┬─────────┘ │ │ │ │ │ │ │ ◄──────────────┘ │ └──────────────┼─────────────────┘ │ ▼ Response ``` ```bash $ echo ' name: pipeline-demo kind: Pipeline flow: - filter: validator jumpIf: { invalid: END } - filter: requestAdaptor - filter: proxy filters: - name: validator kind: Validator headers: Content-Type: values: - application/json - name: requestAdaptor kind: RequestAdaptor header: set: X-Adapt-Key: goodplan - name: proxy kind: Proxy pools: - servers: - url: http://127.0.0.1:9095 ' | egctl apply -f - ``` * As we can see above, `pipeline-demo` will jump to the end of pipeline execution when `validator`'s execution result is `invalid`. ### Built-in Filter `END` * `END` is a built-in filter, that's we can use without defining it. From the above example, we already see that `END` can be used as the target of `jumpIf`, and we can also use it directly in the flow. ```bash $ echo ' name: pipeline-demo kind: Pipeline flow: - filter: validator jumpIf: { invalid: buildFailureResponse } - filter: requestAdaptor - filter: proxy - filter: END - filter: buildFailureResponse filters: - name: validator kind: Validator headers: Content-Type: values: - application/json - name: requestAdaptor kind: RequestAdaptor header: set: X-Adapt-Key: goodplan - name: proxy kind: Proxy pools: - servers: - url: http://127.0.0.1:9095 - name: buildFailureResponse kind: ResponseBuilder template: | statusCode: 400 body: the request is invalid. ' | egctl apply -f - ``` * By using the `END` filter, we are now possible to build a custom failure response. ### Alias * We have assigned a name to a filter when we define it, but a filter can be used more than once in the flow, in this case, we can assign an alias to each appearance. ```bash $ echo ' name: pipeline-demo kind: Pipeline flow: - filter: validator jumpIf: invalid: mockRequestForProxy2 - filter: mockRequest - filter: proxy1 - filter: END - filter: mockRequest alias: mockRequestForProxy2 - filter: proxy2 filters: - name: validator kind: Validator headers: Content-Type: values: - application/json - name: mockRequest kind: RequestBuilder template: | method: GET url: /hello - name: proxy1 kind: Proxy pools: - servers: - url: http://127.0.0.1:9095 - name: proxy2 kind: Proxy pools: - servers: - url: http://127.0.0.2:9095 ' | egctl apply -f - ``` ### Namespace * The pipeline can handle multiple requests/responses, requests/responses are grouped by namespaces, and each namespace can have at most one request and one response. * We can set the `namespace` attribute of filters to let the filter know which request/response it should use. * `RequestBuilder` and `ResponseBuilder` can access all requests and responses, they output the result request/response to the namespace they are assigned. * The default namespace is `DEFAULT`. ``` bash echo ' name: pipeline-api kind: Pipeline flow: - filter: copyRequest namespace: demo1 - filter: copyRequest namespace: demo2 - filter: copyRequest namespace: demo3 - filter: proxy-demo1 namespace: demo1 - filter: proxy-demo2 namespace: demo2 - filter: proxy-demo3 namespace: demo3 - filter: buildResponse filters: - name: copyRequest kind: RequestBuilder sourceNamespace: DEFAULT - name: proxy-demo1 kind: Proxy pools: - servers: - url: https://demo1 - name: proxy-demo2 kind: Proxy pools: - servers: - url: https://demo2 - name: proxy-demo3 kind: Proxy pools: - servers: - url: https://demo3 - name: buildResponse kind: ResponseBuilder template: | statusCode: 200 body: [{{.responses.demo1.Body}}, {{.response.demo2.Body}}, {{.response.demo3.Body}}] ' | egctl create -f - ``` ## Usage ### GlobalFilter GlobalFilter is a special pipeline that can be executed before or/and after all pipelines in a server. For example: ```yaml name: globalFilter-example kind: GlobalFilter beforePipeline: flow: - filter: validator filters: - name: validator kind: Validator ... afterPipeline: ... --- name: server-example kind: HTTPServer globalFilter: globalFilter-example ... ``` In this case, all requests in `HTTPServer` `server-example` go through `GlobalFilter` `globalFilter-example` before and after executing any other pipelines. ### Load Balancer The reverse proxy is the common middleware that is accessed by clients, forwards them to backend servers. The reverse proxy is a very core role played by Easegress. The filter `Proxy` is the filter to fire requests to backend servers. It contains servers group under load balance, whose policy support `roundRobin`, `random`, `weightedRandom`, `ipHash`, `headerHash`. For more information, please check [load balance](../07.Reference/7.02.Filters.md#proxyloadbalancespec). ```yaml name: pipeline-reverse-proxy kind: Pipeline flow: - filter: proxy filters: - name: proxy kind: Proxy pools: - servers: - url: http://127.0.0.1:9095 - url: http://127.0.0.1:9096 - url: http://127.0.0.1:9097 loadBalance: policy: roundRobin ``` #### Traffic Adaptor: Change Something of Two-Way Traffic Sometimes backend applications can't adapt to quick changes of requirements of traffic. Easegress could be an adaptor between new traffic and old applications. There are 2 phases of adaption in reverse proxy: request adaption, response adaption. `RequestAdaptor` supports the adaption of a method, path, header, and body. `ResponseAdaptor` supports the adaption of header and body. As you can see, the flow in spec plays a critical role. ```yaml name: pipeline-reverse-proxy kind: Pipeline flow: - filter: requestAdaptor - filter: proxy - filter: responseAdaptor filters: - name: requestAdaptor kind: RequestAdaptor host: easegress.megaease.com method: POST path: trimPrefix: /apis/v2 header: set: X-Api-Version: v2 - name: responseAdaptor kind: ResponseAdaptor header: set: Server: Easegress v1.0.0 add: X-Easegress-Pipeline: pipeline-reverse-proxy - name: proxy kind: Proxy pools: - servers: - url: http://127.0.0.1:9095 - url: http://127.0.0.1:9096 - url: http://127.0.0.1:9097 loadBalance: policy: roundRobin ``` ### Health Check Perform a health check on the servers in the pool. If a server fails the check, it will be marked as unhealthy, and requests will be rerouted to other healthy servers until it regains health. More details config in [Proxy](../07.Reference/7.02.Filters.md#health-check). ```yaml name: pipeline-reverse-proxy kind: Pipeline flow: - filter: proxy filters: - name: proxy kind: Proxy pools: - servers: - url: http://127.0.0.1:9095 - url: http://127.0.0.1:9096 - url: http://127.0.0.1:9097 loadBalance: policy: roundRobin healthCheck: # interval between health checks (default: 60s) interval: 60s # uri for health check http request uri: /health # http request headers for health check headers: X-Health-Check: easegress # username for basic authentication username: admin # password for basic authentication password: xxxxxx # response validation criteria (default: 2xx and 3xx status codes) match: # acceptable status code ranges statusCodes: - [200, 299] # 2xx - [300, 399] # 3xx ``` ### API Aggregation API aggregation is a pattern to aggregate multiple individual requests into a single request. This pattern is useful when a client must make multiple calls to different backend systems to operate. Easegress provides filters [RequestBuilder](../07.Reference/7.02.Filters.md#requestbuilder) & [ResponseBuilder](../07.Reference/7.02.Filters.md#responsebuilder) for this powerful feature. #### Example 1: Simple aggregation 1. Suppose we have three backend services called `demo1`, `demo2`, and `demo3`. We want to call these three services and combine their response to one. `demo1` returns `{"mega":"ease"}` as the HTTP response body, `demo2` returns `{"hello":"world"}`, and `demo3` returns `{"hello":"new world"}`. ``` bash echo ' name: pipeline-api kind: Pipeline flow: - filter: copyRequest namespace: demo1 - filter: copyRequest namespace: demo2 - filter: copyRequest namespace: demo3 - filter: proxy-demo1 namespace: demo1 - filter: proxy-demo2 namespace: demo2 - filter: proxy-demo3 namespace: demo3 - filter: buildResponse filters: - name: copyRequest kind: RequestBuilder sourceNamespace: DEFAULT - name: proxy-demo1 kind: Proxy pools: - servers: - url: https://demo1 - name: proxy-demo2 kind: Proxy pools: - servers: - url: https://demo2 - name: proxy-demo3 kind: Proxy pools: - servers: - url: https://demo3 - name: buildResponse kind: ResponseBuilder template: | statusCode: 200 body: | [{{.responses.demo1.Body}}, {{.responses.demo2.Body}}, {{.responses.demo3.Body}}] ' | egctl create -f - ``` 2. Creating an HTTPServer for forwarding the traffic to this pipeline. ``` bash echo ' kind: HTTPServer name: server-demo port: 10080 keepAlive: true https: false rules: - paths: - pathPrefix: /api backend: pipeline-api ' | egctl create -f - ``` 3. Send a request to the pipeline ``` bash $ curl -X GET http://127.0.0.1:10080/api -v [{"hello":"world"},{"hello":"new world"},{"mega":"ease"}] ``` #### Example 2: Merge response body * In #Example 1, `demo2` and `demo3`'s responses share the same JSON key, we want to merge their response body by the JSON key together. If there are duplicated keys, we will use the last value. 1. Update the pipeline spec ``` bash echo ' name: pipeline-api kind: Pipeline flow: - filter: copyRequest namespace: demo1 - filter: copyRequest namespace: demo2 - filter: copyRequest namespace: demo3 - filter: proxy-demo1 namespace: demo1 - filter: proxy-demo2 namespace: demo2 - filter: proxy-demo3 namespace: demo3 - filter: buildResponse filters: - name: copyRequest kind: RequestBuilder sourceNamespace: DEFAULT - name: proxy-demo1 kind: Proxy pools: - servers: - url: https://demo1 - name: proxy-demo2 kind: Proxy pools: - servers: - url: https://demo2 - name: proxy-demo3 kind: Proxy pools: - servers: - url: https://demo3 - name: buildResponse kind: ResponseBuilder template: | statusCode: 200 body: | {{mergeObject .responses.demo1.JSONBody .responses.demo2.JSONBody .responses.demo3.JSONBody | toRawJson}} ' | egctl apply -f - ``` 2. Send a request to the pipeline ``` bash $ curl -X GET http://127.0.0.1:10080/api -v {"hello":"new world","mega":"ease"} ``` #### Example 3: Handle Failures In #Example 1, if one of the backend services encounters a failure, the pipeline will result a wrong result. We can improve the pipeline to handle this kind of failures. 1. Update the pipeline spec ``` bash echo ' name: pipeline-api kind: Pipeline flow: - filter: copyRequest namespace: demo1 - filter: copyRequest namespace: demo2 - filter: copyRequest namespace: demo3 - filter: proxy-demo1 namespace: demo1 jumpIf: { serverError: proxy-demo2 } - filter: proxy-demo2 namespace: demo2 - filter: proxy-demo3 namespace: demo3 - filter: buildResponse filters: - name: copyRequest kind: RequestBuilder sourceNamespace: DEFAULT - name: proxy-demo1 kind: Proxy pools: - servers: - url: https://demo1 - name: proxy-demo2 kind: Proxy pools: - servers: - url: https://demo2 - name: proxy-demo3 kind: Proxy pools: - servers: - url: https://demo3 - name: buildResponse kind: ResponseBuilder template: | statusCode: {{$code := 503}}{{range $resp := .responses}}{{if eq $resp.StatusCode 200}}{{$code = 200}}{{break}}{{end}}{{end}}{{$code}} body: | {{$first := true -}} [{{range $resp := .responses -}} {{if eq $resp.StatusCode 200 -}} {{if not $first}},{{end}}{{$resp.Body}}{{$first = false -}} {{- end}} {{- end}}] ' | egctl apply -f - ``` 2. Stop service `demo1` and send a request to the pipeline ``` bash $ curl -X GET http://127.0.0.1:10080/api -v [{"hello":"world"},{"hello":"new world"}] ``` ## References 1. https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern 2. [Jumpif mechanism in pipeline](../06.Development-for-Easegress/6.1.Developer-Guide.md#jumpif-mechanism-in-pipeline)