Skip to content

Commit

Permalink
support forward proxy (#1016)
Browse files Browse the repository at this point in the history
  • Loading branch information
localvar committed Jun 7, 2023
1 parent 98f7b36 commit d4ed1cc
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 33 deletions.
12 changes: 9 additions & 3 deletions doc/reference/filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,15 @@ pools:

## SimpleHTTPProxy

The SimpleHTTPProxy filter is a simplified version of the Proxy filter, designed to handle HTTP requests in a more straightforward manner while providing basic proxy functionality for backend services.

The following example demonstrates a basic configuration for `SimpleHTTPProxy`. Unlike the `Proxy` filter, the backend service's address is not specified in the `SimpleHTTPProxy` configuration. Instead, the request URL is used directly, allowing for the use of a single `SimpleHTTPProxy` instance for multiple backend services.
The `SimpleHTTPProxy` filter is a simplified version of the Proxy filter, unlike
`Proxy`, which are mainly used as reverse proxy, this filter is mainly for
forward proxies.

The following example demonstrates a basic configuration for `SimpleHTTPProxy`.
Unlike the `Proxy` filter, the backend service's address is not specified in
the `SimpleHTTPProxy` configuration. Instead, the request URL is used directly,
allowing for the use of a single `SimpleHTTPProxy` instance for any backend
services.

```yaml
name: simple-http-proxy
Expand Down
134 changes: 108 additions & 26 deletions pkg/filters/proxies/httpproxy/simplehttpproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,31 @@ import (
stdctx "context"
"errors"
"fmt"
"io"
"net"
"net/http"
"sync"
"time"

"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/filters"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/protocols/httpprot"
"github.com/megaease/easegress/pkg/protocols/httpprot/httpstat"
"github.com/megaease/easegress/pkg/resilience"
"github.com/megaease/easegress/pkg/supervisor"
"net/http"
"time"
)

const (
// SimpleHttpProxyKind is the kind of SimpleHTTPProxy.
SimpleHttpProxyKind = "SimpleHTTPProxy"
"github.com/megaease/easegress/pkg/util/fasttime"
)

var simpleHTTPProxyKind = &filters.Kind{
Name: SimpleHttpProxyKind,
Name: "SimpleHTTPProxy",
Description: "SimpleHTTPProxy sets the http proxy of proxy servers",
Results: []string{
resultInternalError,
resultClientError,
resultServerError,
resultFailureCode,
resultTimeout,
resultShortCircuited,
},
DefaultSpec: func() filters.Spec {
return &SimpleHTTPProxySpec{}
Expand All @@ -70,6 +70,7 @@ type (
super *supervisor.Supervisor
spec *SimpleHTTPProxySpec

done chan struct{}
client *http.Client
compression *compression

Expand All @@ -92,7 +93,6 @@ type (

// Validate validates SimpleHTTPProxySpec.
func (s *SimpleHTTPProxySpec) Validate() error {

// Validate MaxIdleConns
if s.MaxIdleConns < 0 {
return errors.New("maxIdleConns must be greater than or equal to 0")
Expand Down Expand Up @@ -140,6 +140,7 @@ func (shp *SimpleHTTPProxy) Inherit(previousGeneration filters.Filter) {
}

func (shp *SimpleHTTPProxy) reload() {
shp.done = make(chan struct{})
shp.timeout, _ = time.ParseDuration(shp.spec.Timeout)
if shp.spec.Compression != nil {
shp.compression = newCompression(shp.spec.Compression)
Expand All @@ -155,60 +156,141 @@ func (shp *SimpleHTTPProxy) Status() interface{} {

// Close closes SimpleHTTPProxy.
func (shp *SimpleHTTPProxy) Close() {
close(shp.done)
shp.client.CloseIdleConnections()
}

func (shp *SimpleHTTPProxy) buildFailureResponse(ctx *context.Context, statusCode int) {
resp, _ := ctx.GetOutputResponse().(*httpprot.Response)
if resp == nil {
resp, _ = httpprot.NewResponse(nil)
}

resp.SetStatusCode(statusCode)
ctx.SetOutputResponse(resp)
}

func (shp *SimpleHTTPProxy) handleConnect(ctx *context.Context) string {
metric := &httpstat.Metric{}
startTime := fasttime.Now()

host := ctx.GetInputRequest().(*httpprot.Request).Host()
w, _ := ctx.GetData("HTTP_RESPONSE_WRITER").(http.ResponseWriter)

destConn, err := net.Dial("tcp", host)
if err != nil {
shp.buildFailureResponse(ctx, http.StatusServiceUnavailable)
return resultServerError
}
w.WriteHeader(http.StatusOK)

hijacker, ok := w.(http.Hijacker)
if !ok {
shp.buildFailureResponse(ctx, http.StatusInternalServerError)
return resultInternalError
}
clientConn, _, err := hijacker.Hijack()
if err != nil {
shp.buildFailureResponse(ctx, http.StatusServiceUnavailable)
return resultServerError
}

var wg sync.WaitGroup
wg.Add(2)

stop := make(chan struct{})

go func() {
defer wg.Done()
n, _ := io.Copy(destConn, clientConn)
metric.ReqSize = uint64(n)
}()

go func() {
defer wg.Done()
n, _ := io.Copy(clientConn, destConn)
metric.RespSize = uint64(n)
}()

go func() {
select {
case <-shp.done:
case <-stop:
}
destConn.Close()
clientConn.Close()
}()

wg.Wait()
close(stop)

metric.StatusCode = http.StatusOK
metric.Duration = fasttime.Since(startTime)
ctx.SetData("HTTP_METRIC", metric)

return ""
}

// Handle handles HTTPContext.
func (shp *SimpleHTTPProxy) doRequestWithRetry(request *httpprot.Request) (*http.Response, error) {
func (shp *SimpleHTTPProxy) doRequestWithRetry(req *httpprot.Request) (*http.Response, error) {
var resp *http.Response
var err error

handler := func(ctx stdctx.Context) error {
resp, err = shp.client.Do(request.Request)
payload := req.GetPayload()
stdr, err := http.NewRequestWithContext(ctx, req.Method(), req.URL().String(), payload)
if err != nil {
return err
}
stdr.Header = req.HTTPHeader().Clone()
removeHopByHopHeaders(stdr.Header)
resp, err = shp.client.Do(stdr)
return err
}

if shp.retryWrapper != nil && !request.IsStream() {
if shp.retryWrapper != nil && !req.IsStream() {
handler = shp.retryWrapper.Wrap(handler)
}

err = handler(request.Context())
err := handler(req.Context())
return resp, err
}

// Handle handles HTTPContext.
func (shp *SimpleHTTPProxy) Handle(ctx *context.Context) (result string) {
// get request from Context
inputReq := ctx.GetInputRequest().(*httpprot.Request)
req := ctx.GetInputRequest().(*httpprot.Request)

// if method is connect, then we are working as a forward proxy
if req.Method() == http.MethodConnect {
logger.Infof("%s: handling CONNECT request", shp.Name())
return shp.handleConnect(ctx)
}

// send request with retry policy if set
resp, err := shp.doRequestWithRetry(inputReq)
resp, err := shp.doRequestWithRetry(req)
if err != nil {
return err.Error()
logger.Errorf("%s: failed to send request: %v", shp.Name(), err)
return resultServerError
}

// use httpprot.NewResponse to build ctx.SetOutputResponse
httpResp, err := httpprot.NewResponse(resp)
if err != nil {
return err.Error()
}
httpResp, _ := httpprot.NewResponse(resp)

maxBodySize := shp.spec.ServerMaxBodySize
if err = httpResp.FetchPayload(maxBodySize); err != nil {
logger.Errorf("%s: failed to fetch response payload: %v", shp.Name, err)
return err.Error()
logger.Errorf("%s: failed to fetch response payload: %v", shp.Name(), err)
return resultServerError
}

// apply compression
if shp.compression != nil {
if shp.compression.compress(inputReq.Request, resp) {
if shp.compression.compress(req.Request, resp) {
ctx.AddTag("gzip")
}
}

// set response to Context
ctx.SetOutputResponse(httpResp)

return ""
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/filters/proxies/httpproxy/simplehttpproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package httpproxy

import (
"fmt"
"io"
"net/http"
"testing"

"github.com/megaease/easegress/pkg/filters"
"github.com/megaease/easegress/pkg/protocols/httpprot"
"github.com/megaease/easegress/pkg/resilience"
"github.com/megaease/easegress/pkg/util/codectool"
"github.com/stretchr/testify/assert"
"io"
"net/http"
"testing"
)

func newTestSimpleHttpProxy(yamlConfig string, assert *assert.Assertions) *SimpleHTTPProxy {
Expand Down Expand Up @@ -117,7 +118,6 @@ maxBodySize: 1024
fmt.Println(err)
assert.Fail("read body error")
}

}

func TestSimpleHttpProxyWithRetry(t *testing.T) {
Expand Down

0 comments on commit d4ed1cc

Please sign in to comment.