Skip to content

Commit

Permalink
update loadbalancer
Browse files Browse the repository at this point in the history
  • Loading branch information
suchen-sci committed Apr 6, 2022
1 parent eec0116 commit f11532a
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 98 deletions.
80 changes: 29 additions & 51 deletions pkg/protocols/httpprot/loadbalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package httpprot

import (
"fmt"
"math/rand"

"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/protocols"
"github.com/megaease/easegress/pkg/util/hashtool"
)
Expand All @@ -33,71 +31,51 @@ const (
PolicyHeaderHash = "headerHash"
)

type LoadBalancer struct {
general protocols.LoadBalancer

spec *LoadBalancerSpec
servers []protocols.Server
}

type LoadBalancerSpec struct {
Policy string `yaml:"policy" jsonschema:"required,enum=roundRobin,enum=random,enum=weightedRandom,enum=ipHash,enum=headerHash"`
protocols.LoadBalancerSpec
HeaderHashKey string `yaml:"headerHashKey" jsonschema:"omitempty"`
}

func (s LoadBalancerSpec) validate() error {
if s.Policy == PolicyHeaderHash && len(s.HeaderHashKey) == 0 {
return fmt.Errorf("headerHash needs to specify headerHashKey")
}

return nil
}

var _ protocols.LoadBalancer = (*LoadBalancer)(nil)

func NewLoadBalancer(spec interface{}, servers []protocols.Server) (protocols.LoadBalancer, error) {
s := spec.(*LoadBalancerSpec)
if err := s.validate(); err != nil {
return nil, err
}
lb := &LoadBalancer{
spec: s,
servers: servers,
}
p := s.Policy
if p == protocols.PolicyRandom || p == protocols.PolicyWeightedRandom || p == protocols.PolicyRoundRobin {
glb, err := protocols.NewLoadBalancer(spec, servers)
if err != nil {
return nil, err
}
lb.general = glb
return lb, nil
}
return lb, nil
}

func (lb *LoadBalancer) ChooseServer(req protocols.Request) protocols.Server {
r := req.(*Request)
switch lb.spec.Policy {
switch s.Policy {
case protocols.PolicyRoundRobin, protocols.PolicyRandom, protocols.PolicyWeightedRandom:
return lb.general.ChooseServer(req)
return protocols.NewLoadBalancer(spec, servers)
case PolicyIPHash:
return lb.chooseIPHash(r)
return newIPHashLoadBalancer(servers), nil
case PolicyHeaderHash:
return lb.chooseHeaderHash(r)
return newHeaderHashLoadBalancer(servers, s.HeaderHashKey), nil
default:
logger.Errorf("unsupported load balancing policy: %s", lb.spec.Policy)
return lb.servers[rand.Intn(len(lb.servers))]
return nil, fmt.Errorf("unsupported load balancing policy: %s", s.Policy)
}
}

func (lb *LoadBalancer) chooseIPHash(req *Request) protocols.Server {
sum32 := int(hashtool.Hash32(req.RealIP()))
type IPHashLoadBalancer struct {
servers []protocols.Server
}

func newIPHashLoadBalancer(servers []protocols.Server) *IPHashLoadBalancer {
return &IPHashLoadBalancer{servers: servers}
}

func (lb *IPHashLoadBalancer) ChooseServer(req protocols.Request) protocols.Server {
r := req.(*Request)
sum32 := int(hashtool.Hash32(r.RealIP()))
return lb.servers[sum32%len(lb.servers)]
}

func (lb *LoadBalancer) chooseHeaderHash(req *Request) protocols.Server {
value := req.HTTPHeader().Get(lb.spec.HeaderHashKey)
type HeaderHashLoadBalancer struct {
servers []protocols.Server
key string
}

func newHeaderHashLoadBalancer(servers []protocols.Server, key string) *HeaderHashLoadBalancer {
return &HeaderHashLoadBalancer{servers: servers, key: key}
}

func (lb *HeaderHashLoadBalancer) ChooseServer(req protocols.Request) protocols.Server {
r := req.(*Request)
value := r.HTTPHeader().Get(lb.key)
sum32 := int(hashtool.Hash32(value))
return lb.servers[sum32%len(lb.servers)]
}
97 changes: 50 additions & 47 deletions pkg/protocols/loadbalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"fmt"
"math/rand"
"sync/atomic"

"github.com/megaease/easegress/pkg/logger"
)

const (
Expand All @@ -34,70 +32,75 @@ const (
PolicyWeightedRandom = "weightedRandom"
)

type GeneralLoadBalancer struct {
spec *GeneralLoadBalancerSpec
servers []Server
count int32
weightsSum int
type LoadBalancerSpec struct {
Policy string `yaml:"policy" jsonschema:"required"`
}

type GeneralLoadBalancerSpec struct {
Policy string `yaml:"policy" jsonschema:"required,enum=roundRobin,enum=random,enum=weightedRandom"`
}

var _ LoadBalancer = (*GeneralLoadBalancer)(nil)
var _ LoadBalancer = (*RandomLoadBalancer)(nil)
var _ LoadBalancer = (*RoundRobinLoadBalancer)(nil)
var _ LoadBalancer = (*WeightedRandomLoadBalancer)(nil)

func NewLoadBalancer(spec interface{}, servers []Server) (LoadBalancer, error) {
lb := &GeneralLoadBalancer{}
lb.spec = spec.(*GeneralLoadBalancerSpec)
lb.servers = servers

p := lb.spec.Policy
if p != PolicyRoundRobin && p != PolicyRandom && p != PolicyWeightedRandom {
return nil, fmt.Errorf("unsupported load balancing policy: %s", p)
}

for _, s := range servers {
lb.weightsSum += s.Weight()
}
return lb, nil

}

func (lb *GeneralLoadBalancer) ChooseServer(req Request) Server {
switch lb.spec.Policy {
sepc := spec.(*LoadBalancerSpec)
switch sepc.Policy {
case PolicyRoundRobin:
return lb.chooseRoundRobin()
return newRoundRobinLoadBalancer(servers), nil
case PolicyRandom:
return lb.chooseRandom()
return newRandomLoadBalancer(servers), nil
case PolicyWeightedRandom:
return lb.chooseWeightedRandom()
return newWeightedRandomLoadBalancer(servers), nil
default:
logger.Errorf("unsupported load balancing policy: %s", lb.spec.Policy)
return lb.chooseRoundRobin()
return nil, fmt.Errorf("unsupported load balancing policy: %s", sepc.Policy)
}
}

func (lb *GeneralLoadBalancer) chooseRoundRobin() Server {
count := atomic.AddInt32(&lb.count, 1)
type RandomLoadBalancer struct {
servers []Server
}

func newRandomLoadBalancer(servers []Server) *RandomLoadBalancer {
return &RandomLoadBalancer{servers: servers}
}

func (rlb *RandomLoadBalancer) ChooseServer(req Request) Server {
return rlb.servers[rand.Intn(len(rlb.servers))]
}

type RoundRobinLoadBalancer struct {
servers []Server
count uint64
}

func newRoundRobinLoadBalancer(servers []Server) *RoundRobinLoadBalancer {
return &RoundRobinLoadBalancer{servers: servers}
}

func (rrlb *RoundRobinLoadBalancer) ChooseServer(req Request) Server {
count := atomic.AddUint64(&rrlb.count, 1)
count--
return lb.servers[count%int32(len(lb.servers))]
return rrlb.servers[int(count)%len(rrlb.servers)]
}

func (lb *GeneralLoadBalancer) chooseRandom() Server {
return lb.servers[rand.Intn(len(lb.servers))]
type WeightedRandomLoadBalancer struct {
servers []Server
weightsSum int
}

func (lb *GeneralLoadBalancer) chooseWeightedRandom() Server {
randomWeight := rand.Intn(lb.weightsSum)
for _, server := range lb.servers {
func newWeightedRandomLoadBalancer(servers []Server) *WeightedRandomLoadBalancer {
wrlb := &WeightedRandomLoadBalancer{servers: servers}
for _, server := range servers {
wrlb.weightsSum += server.Weight()
}
return wrlb
}

func (wrlb *WeightedRandomLoadBalancer) ChooseServer(req Request) Server {
randomWeight := rand.Intn(wrlb.weightsSum)
for _, server := range wrlb.servers {
randomWeight -= server.Weight()
if randomWeight < 0 {
return server
}
}

logger.Errorf("BUG: weighted random can't pick a server: sum(%d) servers(%+v)",
lb.weightsSum, lb.servers)
return lb.chooseRandom()
return wrlb.servers[rand.Intn(len(wrlb.servers))]
}

0 comments on commit f11532a

Please sign in to comment.