Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix GitHub warnings #931

Merged
merged 15 commits into from
Feb 17, 2023
Prev Previous commit
Next Next commit
refactor load balancer (WIP)
  • Loading branch information
localvar committed Feb 3, 2023
commit 5d4d882eceadab4741e780453e5207cf911be124
78 changes: 48 additions & 30 deletions pkg/filters/proxies/basepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,33 @@

package proxies

/*
// BaseServerPool defines a server pool.
type BaseServerPool struct {
name string
import (
"fmt"
"sync"
"sync/atomic"

"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/object/serviceregistry"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/stringtool"
)

// ServerPoolImpl is the interface for server pool.
type ServerPoolImpl interface {
CreateLoadBalancer(spec *LoadBalanceSpec, servers []*Server) LoadBalancer
}

// ServerPoolBase defines a base server pool.
type ServerPoolBase struct {
spImpl ServerPoolImpl
Name string
done chan struct{}
wg sync.WaitGroup
loadBalancer atomic.Value
}

// BaseServerPoolSpec is the spec for a base server pool.
type BaseServerPoolSpec struct {
// ServerPoolBaseSpec is the spec for a base server pool.
type ServerPoolBaseSpec struct {
ServerTags []string `json:"serverTags" jsonschema:"omitempty,uniqueItems=true"`
Servers []*Server `json:"servers" jsonschema:"omitempty"`
ServiceRegistry string `json:"serviceRegistry" jsonschema:"omitempty"`
Expand All @@ -36,7 +52,7 @@ type BaseServerPoolSpec struct {
}

// Validate validates ServerPoolSpec.
func (sps *BaseServerPoolSpec) Validate() error {
func (sps *ServerPoolBaseSpec) Validate() error {
if sps.ServiceName == "" && len(sps.Servers) == 0 {
return fmt.Errorf("both serviceName and servers are empty")
}
Expand All @@ -60,12 +76,12 @@ func (sps *BaseServerPoolSpec) Validate() error {
}

// Init initialize the base server pool according to the spec.
func (bsp *BaseServerPool) Init(super *supervisor.Supervisor, name string, spec *BaseServerPoolSpec) {
bsp.name = name
bsp.done = make(chan struct{})
func (spb *ServerPoolBase) Init(spImpl ServerPoolImpl, super *supervisor.Supervisor, name string, spec *ServerPoolBaseSpec) {
spb.Name = name
spb.done = make(chan struct{})

if spec.ServiceRegistry == "" || spec.ServiceName == "" {
bsp.createLoadBalancer(spec.LoadBalance, spec.Servers)
spb.createLoadBalancer(spec.LoadBalance, spec.Servers)
return
}

Expand All @@ -77,36 +93,36 @@ func (bsp *BaseServerPool) Init(super *supervisor.Supervisor, name string, spec
if err != nil {
msgFmt := "first try to use service %s/%s failed(will try again): %v"
logger.Warnf(msgFmt, spec.ServiceRegistry, spec.ServiceName, err)
bsp.createLoadBalancer(spec.LoadBalance, spec.Servers)
spb.createLoadBalancer(spec.LoadBalance, spec.Servers)
}

bsp.useService(spec, instances)
spb.useService(spec, instances)

watcher := registry.NewServiceWatcher(spec.ServiceRegistry, spec.ServiceName)
bsp.wg.Add(1)
spb.wg.Add(1)
go func() {
for {
select {
case <-bsp.done:
case <-spb.done:
watcher.Stop()
bsp.wg.Done()
spb.wg.Done()
return
case event := <-watcher.Watch():
bsp.useService(spec, event.Instances)
spb.useService(spec, event.Instances)
}
}
}()
}

// LoadBalancer returns the load balancer of the server pool.
func (bsp *BaseServerPool) LoadBalancer() LoadBalancer {
if v := bsp.loadBalancer.Load(); v != nil {
func (spb *ServerPoolBase) LoadBalancer() LoadBalancer {
if v := spb.loadBalancer.Load(); v != nil {
return v.(LoadBalancer)
}
return nil
}

func (bsp *BaseServerPool) createLoadBalancer(spec *LoadBalanceSpec, servers []*Server) {
func (spb *ServerPoolBase) createLoadBalancer(spec *LoadBalanceSpec, servers []*Server) {
for _, server := range servers {
server.CheckAddrPattern()
}
Expand All @@ -115,13 +131,13 @@ func (bsp *BaseServerPool) createLoadBalancer(spec *LoadBalanceSpec, servers []*
spec = &LoadBalanceSpec{}
}

lb := NewLoadBalancer(spec, servers)
if old := bsp.loadBalancer.Swap(lb); old != nil {
lb := spb.spImpl.CreateLoadBalancer(spec, servers)
if old := spb.loadBalancer.Swap(lb); old != nil {
old.(LoadBalancer).Close()
}
}

func (bsp *BaseServerPool) useService(spec *BaseServerPoolSpec, instances map[string]*serviceregistry.ServiceInstanceSpec) {
func (spb *ServerPoolBase) useService(spec *ServerPoolBaseSpec, instances map[string]*serviceregistry.ServiceInstanceSpec) {
servers := make([]*Server, 0)

for _, instance := range instances {
Expand Down Expand Up @@ -149,15 +165,17 @@ func (bsp *BaseServerPool) useService(spec *BaseServerPoolSpec, instances map[st
servers = spec.Servers
}

bsp.createLoadBalancer(spec.LoadBalance, servers)
spb.createLoadBalancer(spec.LoadBalance, servers)
}

func (bsp *BaseServerPool) close() {
close(bsp.done)
bsp.wg.Wait()
if lb := bsp.LoadBalancer(); lb != nil {
func (spb *ServerPoolBase) Done() <-chan struct{} {
return spb.done
}

func (spb *ServerPoolBase) Close() {
close(spb.done)
spb.wg.Wait()
if lb := spb.LoadBalancer(); lb != nil {
lb.Close()
}
}

*/
15 changes: 2 additions & 13 deletions pkg/filters/proxies/grpcproxy/loadbalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,7 @@

package grpcproxy

import (
"fmt"
"hash/fnv"
"math/rand"
"net"
"sync"
"sync/atomic"

"github.com/megaease/easegress/pkg/protocols/grpcprot"

"github.com/megaease/easegress/pkg/logger"
)

/*
// LoadBalancer is the interface of an gRPC load balancer.
type LoadBalancer interface {
ChooseServer(req *grpcprot.Request) *Server
Expand Down Expand Up @@ -250,3 +238,4 @@ func (f *forwardLoadBalancer) ChooseServer(req *grpcprot.Request) *Server {
func (f *forwardLoadBalancer) ReturnServer(s *Server) {
f.server.Put(s)
}
*/
118 changes: 11 additions & 107 deletions pkg/filters/proxies/grpcproxy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
stdcontext "context"
"fmt"
"io"
"sync"
"sync/atomic"
"time"

"github.com/megaease/easegress/pkg/protocols/grpcprot"
Expand All @@ -36,9 +34,7 @@ import (
"github.com/megaease/easegress/pkg/context"

"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/object/serviceregistry"
"github.com/megaease/easegress/pkg/resilience"
"github.com/megaease/easegress/pkg/util/stringtool"
)

const (
Expand Down Expand Up @@ -99,14 +95,12 @@ var (

// ServerPool defines a server pool.
type ServerPool struct {
BaseServerPool

proxy *Proxy
spec *ServerPoolSpec
done chan struct{}
wg sync.WaitGroup
name string

filter RequestMatcher
loadBalancer atomic.Value
timeout time.Duration
connectTimeout time.Duration
circuitBreakerWrapper resilience.Wrapper
Expand All @@ -115,13 +109,10 @@ type ServerPool struct {

// ServerPoolSpec is the spec for a server pool.
type ServerPoolSpec struct {
BaseServerPoolSpec `json:",inline"`

SpanName string `json:"spanName" jsonschema:"omitempty"`
Filter *RequestMatcherSpec `json:"filter" jsonschema:"omitempty"`
ServerTags []string `json:"serverTags" jsonschema:"omitempty,uniqueItems=true"`
Servers []*Server `json:"servers" jsonschema:"omitempty"`
ServiceRegistry string `json:"serviceRegistry" jsonschema:"omitempty"`
ServiceName string `json:"serviceName" jsonschema:"omitempty"`
LoadBalance *LoadBalanceSpec `json:"loadBalance" jsonschema:"omitempty"`
Timeout string `json:"timeout" jsonschema:"omitempty,format=duration"`
ConnectTimeout string `json:"connectTimeout" jsonschema:"omitempty,format=duration"`
CircuitBreakerPolicy string `json:"circuitBreakerPolicy" jsonschema:"omitempty"`
Expand Down Expand Up @@ -158,19 +149,13 @@ func NewServerPool(proxy *Proxy, spec *ServerPoolSpec, name string) *ServerPool
sp := &ServerPool{
proxy: proxy,
spec: spec,
done: make(chan struct{}),
name: name,
}

if spec.Filter != nil {
sp.filter = NewRequestMatcher(spec.Filter)
}

if spec.ServiceRegistry == "" || spec.ServiceName == "" {
sp.createLoadBalancer(sp.spec.Servers)
} else {
sp.watchServers()
}
sp.BaseServerPool.Init(sp, proxy.super, name, &spec.BaseServerPoolSpec)

if spec.Timeout != "" {
sp.timeout, _ = time.ParseDuration(spec.Timeout)
Expand All @@ -185,83 +170,8 @@ func NewServerPool(proxy *Proxy, spec *ServerPoolSpec, name string) *ServerPool
return sp
}

// LoadBalancer returns the load balancer of the server pool.
func (sp *ServerPool) LoadBalancer() LoadBalancer {
return sp.loadBalancer.Load().(LoadBalancer)
}

func (sp *ServerPool) createLoadBalancer(servers []*Server) {
for _, server := range servers {
server.CheckAddrPattern()
}

spec := sp.spec.LoadBalance
if spec == nil {
spec = &LoadBalanceSpec{}
}

lb := NewLoadBalancer(spec, servers)
sp.loadBalancer.Store(lb)
}

func (sp *ServerPool) watchServers() {
entity := sp.proxy.super.MustGetSystemController(serviceregistry.Kind)
registry := entity.Instance().(*serviceregistry.ServiceRegistry)

instances, err := registry.ListServiceInstances(sp.spec.ServiceRegistry, sp.spec.ServiceName)
if err != nil {
msgFmt := "first try to use service %s/%s failed(will try again): %v"
logger.Warnf(msgFmt, sp.spec.ServiceRegistry, sp.spec.ServiceName, err)
sp.createLoadBalancer(sp.spec.Servers)
}

sp.useService(instances)

watcher := registry.NewServiceWatcher(sp.spec.ServiceRegistry, sp.spec.ServiceName)
sp.wg.Add(1)
go func() {
for {
select {
case <-sp.done:
watcher.Stop()
sp.wg.Done()
return
case event := <-watcher.Watch():
sp.useService(event.Instances)
}
}
}()
}

func (sp *ServerPool) useService(instances map[string]*serviceregistry.ServiceInstanceSpec) {
servers := make([]*Server, 0)

for _, instance := range instances {
// default to true in case of sp.spec.ServerTags is empty
match := true

for _, tag := range sp.spec.ServerTags {
if match = stringtool.StrInSlice(tag, instance.Tags); match {
break
}
}

if match {
servers = append(servers, &Server{
URL: instance.URL(),
Tags: instance.Tags,
Weight: instance.Weight,
})
}
}

if len(servers) == 0 {
msgFmt := "%s/%s: no service instance satisfy tags: %v"
logger.Warnf(msgFmt, sp.spec.ServiceRegistry, sp.spec.ServiceName, sp.spec.ServerTags)
servers = sp.spec.Servers
}

sp.createLoadBalancer(servers)
func (sp *ServerPool) CreateLoadBalancer(spec *LoadBalanceSpec, servers []*Server) LoadBalancer {
return nil
}

// InjectResiliencePolicy injects resilience policies to the server pool.
Expand Down Expand Up @@ -326,7 +236,7 @@ func (sp *ServerPool) handle(ctx *context.Context) string {
// CircuitBreaker is the most outside resiliencer, if the error
// is ErrShortCircuited, we are sure the response is nil.
if err == resilience.ErrShortCircuited {
logger.Debugf("%s: short circuited by circuit break policy", sp.name)
logger.Debugf("%s: short circuited by circuit break policy", sp.Name)
spCtx.AddTag("short circuited")
sp.buildOutputResponse(spCtx, status.Newf(codes.Unavailable, "short circuited by circuit break policy"))
return resultShortCircuited
Expand All @@ -348,12 +258,11 @@ func (sp *ServerPool) doHandle(ctx stdcontext.Context, spCtx *serverPoolContext)
svr := lb.ChooseServer(spCtx.req)
// if there's no available server.
if svr == nil {
logger.Debugf("%s: no available server", sp.name)
logger.Debugf("%s: no available server", sp.Name)
return serverPoolError{status.New(codes.InvalidArgument, "no available server"), resultClientError}
}
if f, ok := lb.(ReusableServerLB); ok {
defer f.ReturnServer(svr)
}
defer lb.ReturnServer(svr, spCtx.req, spCtx.resp)

// maybe be rewrite by grpcserver.MuxPath#rewrite
fullMethodName := spCtx.req.FullMethod()
if fullMethodName == "" {
Expand Down Expand Up @@ -428,11 +337,6 @@ func (sp *ServerPool) buildOutputResponse(spCtx *serverPoolContext, s *status.St
spCtx.SetOutputResponse(spCtx.resp)
}

func (sp *ServerPool) close() {
close(sp.done)
sp.wg.Wait()
}

func (sp *ServerPool) forwardE2E(src grpc.Stream, dst grpc.Stream, header *grpcprot.Header) chan error {
ret := make(chan error, 1)
go func() {
Expand Down
Loading