Skip to content

Commit

Permalink
add resilience for backend proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
suchen-sci committed Apr 13, 2022
1 parent b3ab105 commit 878a49f
Show file tree
Hide file tree
Showing 13 changed files with 795 additions and 9 deletions.
6 changes: 6 additions & 0 deletions pkg/filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/resilience"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/v"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -80,6 +81,11 @@ type (
Close()
}

// Backend is the interface of filters that accept resilience policies.
Backend interface {
SetResilienceBeforeInit(policies map[string]resilience.Policy)
}

// Spec is the common interface of filter specs
Spec interface {
// Super returns supervisor
Expand Down
38 changes: 35 additions & 3 deletions pkg/filters/proxy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package proxy

import (
stdcontext "context"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -50,6 +51,7 @@ type ServerPool struct {

filter RequestMatcher
loadBalancer atomic.Value
resilience *Resilience

httpStat *httpstat.HTTPStat
memoryCache *memorycache.MemoryCache
Expand All @@ -65,6 +67,7 @@ type ServerPoolSpec struct {
ServiceName string `yaml:"serviceName" jsonschema:"omitempty"`
LoadBalance *LoadBalanceSpec `yaml:"loadBalance" jsonschema:"required"`
MemoryCache *memorycache.Spec `yaml:"memoryCache,omitempty" jsonschema:"omitempty"`
Resilience *ResilienceSpec `yaml:"resilience,omitempty" jsonschema:"omitempty"`
}

// ServerPoolStatus is the status of Pool.
Expand Down Expand Up @@ -116,6 +119,14 @@ func NewServerPool(proxy *Proxy, spec *ServerPoolSpec, name string) *ServerPool
sp.watchServers()
}

var err error
if spec.Resilience != nil {
sp.resilience, err = newResilience(spec.Resilience, proxy.resilience)
if err != nil {
panic(err)
}
}

return sp
}

Expand Down Expand Up @@ -211,15 +222,15 @@ type serverPoolContext struct {
endTime time.Time
}

func (spCtx *serverPoolContext) prepareRequest() error {
func (spCtx *serverPoolContext) prepareRequest(stdctx stdcontext.Context) error {
stdr := spCtx.req.Std()

url := spCtx.svr.URL + spCtx.req.Path()
if stdr.URL.RawQuery != "" {
url += "?" + stdr.URL.RawQuery
}

var ctx = stdr.Context()
ctx := stdctx
if !spCtx.isMirror {
spCtx.statResult = &gohttpstat.Result{}
ctx = gohttpstat.WithHTTPStat(ctx, spCtx.statResult)
Expand Down Expand Up @@ -273,6 +284,27 @@ func (spCtx *serverPoolContext) duration() time.Duration {
}

func (sp *ServerPool) handle(ctx *context.Context, isMirror bool) string {
stdctx := ctx.Request().(*httpprot.Request).Std().Context()
var handler Handler
handler = func(stdctx stdcontext.Context, ctx *context.Context) string {
return sp.doHandle(stdctx, ctx, isMirror)
}

if sp.resilience != nil {
if sp.resilience.circuitbreak != nil {
handler = sp.resilience.circuitbreak.wrap(handler)
}
if sp.resilience.timeLimit != nil {
handler = sp.resilience.timeLimit.wrap(handler)
}
if sp.resilience.retry != nil {
handler = sp.resilience.retry.wrap(handler)
}
}
return handler(stdctx, ctx)
}

func (sp *ServerPool) doHandle(stdctx stdcontext.Context, ctx *context.Context, isMirror bool) string {
/*
if sp.memoryCache != nil && sp.memoryCache.Load(ctx) {
}
Expand Down Expand Up @@ -308,7 +340,7 @@ func (sp *ServerPool) handle(ctx *context.Context, isMirror bool) string {
}

// prepare the request to send.
err := spCtx.prepareRequest()
err := spCtx.prepareRequest(stdctx)
if err != nil {
msg := "prepare request failed: " + err.Error()
logger.Errorf(msg)
Expand Down
19 changes: 17 additions & 2 deletions pkg/filters/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/protocols/httpprot"
"github.com/megaease/easegress/pkg/protocols/httpprot/fallback"
"github.com/megaease/easegress/pkg/resilience"
"github.com/megaease/easegress/pkg/supervisor"
)

Expand All @@ -42,6 +43,10 @@ const (
resultInternalError = "internalError"
resultClientError = "clientError"
resultServerError = "serverError"

// result for resilience
resultTimeout = "timeout"
resultShortCircuited = "shortCircuited"
)

var kind = &filters.Kind{
Expand All @@ -52,6 +57,8 @@ var kind = &filters.Kind{
resultInternalError,
resultClientError,
resultServerError,
resultTimeout,
resultShortCircuited,
},
DefaultSpec: func() filters.Spec {
return &Spec{
Expand All @@ -67,6 +74,9 @@ var kind = &filters.Kind{
},
}

var _ filters.Filter = (*Proxy)(nil)
var _ filters.Backend = (*Proxy)(nil)

func init() {
filters.Register(kind)
}
Expand All @@ -78,8 +88,9 @@ var fnSendRequest = func(r *http.Request, client *http.Client) (*http.Response,
type (
// Proxy is the filter Proxy.
Proxy struct {
super *supervisor.Supervisor
spec *Spec
super *supervisor.Supervisor
spec *Spec
resilience map[string]resilience.Policy

fallback *fallback.Fallback

Expand Down Expand Up @@ -331,6 +342,10 @@ func (p *Proxy) Handle(ctx *context.Context) (result string) {
return sp.handle(ctx, false)
}

func (p *Proxy) SetResilienceBeforeInit(policies map[string]resilience.Policy) {
p.resilience = policies
}

/*
if p.mirrorPool != nil && b.mirrorPool.filter.Filter(ctx) {
Expand Down
Loading

0 comments on commit 878a49f

Please sign in to comment.