Skip to content

Commit

Permalink
move traffic matcher from protocols to proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
localvar committed Apr 7, 2022
1 parent 6930252 commit a6bec09
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 114 deletions.
4 changes: 4 additions & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package main

import (
"log"
"math/rand"
"os"
"sync"
"time"

"github.com/megaease/easegress/pkg/api"
"github.com/megaease/easegress/pkg/cluster"
Expand All @@ -37,6 +39,8 @@ import (
)

func main() {
rand.Seed(time.Now().UnixNano())

opt := option.New()
msg, err := opt.Parse()
if err != nil {
Expand Down
28 changes: 14 additions & 14 deletions pkg/filters/proxy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/object/serviceregistry"
"github.com/megaease/easegress/pkg/protocols"
"github.com/megaease/easegress/pkg/protocols/httpprot"
"github.com/megaease/easegress/pkg/protocols/httpprot/httpheader"
"github.com/megaease/easegress/pkg/protocols/httpprot/httpstat"
Expand All @@ -49,7 +48,7 @@ type ServerPool struct {
wg sync.WaitGroup
name string

filter protocols.TrafficMatcher
filter RequestMatcher
loadBalancer atomic.Value

httpStat *httpstat.HTTPStat
Expand All @@ -58,14 +57,14 @@ type ServerPool struct {

// ServerPoolSpec is the spec for a server pool.
type ServerPoolSpec struct {
SpanName string `yaml:"spanName" jsonschema:"omitempty"`
Filter *httpprot.TrafficMatcherSpec `yaml:"filter" jsonschema:"omitempty"`
ServersTags []string `yaml:"serversTags" jsonschema:"omitempty,uniqueItems=true"`
Servers []*Server `yaml:"servers" jsonschema:"omitempty"`
ServiceRegistry string `yaml:"serviceRegistry" jsonschema:"omitempty"`
ServiceName string `yaml:"serviceName" jsonschema:"omitempty"`
LoadBalance *LoadBalanceSpec `yaml:"loadBalance" jsonschema:"required"`
MemoryCache *memorycache.Spec `yaml:"memoryCache,omitempty" jsonschema:"omitempty"`
SpanName string `yaml:"spanName" jsonschema:"omitempty"`
Filter *RequestMatcherSpec `yaml:"filter" jsonschema:"omitempty"`
ServersTags []string `yaml:"serversTags" jsonschema:"omitempty,uniqueItems=true"`
Servers []*Server `yaml:"servers" jsonschema:"omitempty"`
ServiceRegistry string `yaml:"serviceRegistry" jsonschema:"omitempty"`
ServiceName string `yaml:"serviceName" jsonschema:"omitempty"`
LoadBalance *LoadBalanceSpec `yaml:"loadBalance" jsonschema:"required"`
MemoryCache *memorycache.Spec `yaml:"memoryCache,omitempty" jsonschema:"omitempty"`
}

// ServerPoolStatus is the status of Pool.
Expand Down Expand Up @@ -102,7 +101,7 @@ func newPool(proxy *Proxy, spec *ServerPoolSpec, name string, failureCodes []int
}

if spec.Filter != nil {
sp.filter, _ = httpprot.NewTrafficMatcher(spec.Filter)
sp.filter, _ = NewRequestMatcher(spec.Filter)
}

if spec.MemoryCache != nil {
Expand All @@ -121,6 +120,7 @@ func newPool(proxy *Proxy, spec *ServerPoolSpec, name string, failureCodes []int
// httpStat: httpstat.New(),
}

// LoadBalancer returns the load balancer of the server pool.
func (sp *ServerPool) LoadBalancer() LoadBalancer {
return sp.loadBalancer.Load().(LoadBalancer)
}
Expand Down Expand Up @@ -175,9 +175,9 @@ func (sp *ServerPool) useService(instances map[string]*serviceregistry.ServiceIn
for _, tag := range sp.spec.ServersTags {
if stringtool.StrInSlice(tag, instance.Tags) {
servers = append(servers, &Server{
URL: instance.URL(),
Tags: instance.Tags,
W: instance.Weight,
URL: instance.URL(),
Tags: instance.Tags,
Weight: instance.Weight,
})
break
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/filters/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (p *Proxy) fallbackForCodes(ctx context.Context) bool {

// Handle handles HTTPContext.
func (p *Proxy) Handle(ctx context.Context) (result string) {
req := ctx.Request()
req := ctx.Request().(*httpprot.Request)

if p.mirrorPool != nil && p.mirrorPool.filter.Match(req) {
p.mirrorPool.handle(ctx, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,56 +15,43 @@
* limitations under the License.
*/

package httpprot
package proxy

import (
"fmt"
"hash/fnv"
"math/rand"
"regexp"
"strings"
"time"

"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/protocols"
"github.com/megaease/easegress/pkg/protocols/httpprot"
"github.com/megaease/easegress/pkg/util/stringtool"

yaml "gopkg.in/yaml.v2"
)

func init() {
rand.Seed(time.Now().UnixNano())
// RequestMatcher is the interface to match requests.
type RequestMatcher interface {
Match(req *httpprot.Request) bool
}

const (
policyIPHash = "ipHash"
policyHeaderHash = "headerHash"
policyRandom = "random"
)

// TrafficMatcherSpec describe TrafficMatcher
type TrafficMatcherSpec struct {
// RequestMatcherSpec describe RequestMatcher
type RequestMatcherSpec struct {
Policy string `yaml:"policy" jsonschema:"omitempty,enum=general,enum=ipHash,enum=headerHash,enum=random"`
MatchAllHeaders bool `yaml:"matchAllHeaders" jsonschema:"omitempty"`
Headers map[string]*StringMatcher `yaml:"headers" jsonschema:"omitempty"`
URLs []*MethodAndURLMatcher `yaml:"urls" jsonschema:"omitempty"`
Probability *ProbabilitySpec `yaml:"probability,omitempty" jsonschema:"omitempty"`
Permil uint32 `yaml:"permil" jsonschema:"omitempty,minimum=1,maximum=1000"`
HeaderHashKey string `yaml:"headerHashKey" jsonschema:"omitempty"`
}

// ProbabilitySpec defines rule to match traffic by probability.
type ProbabilitySpec struct {
PerMill uint32 `yaml:"perMill" jsonschema:"required,minimum=1,maximum=1000"`
Policy string `yaml:"policy" jsonschema:"required,enum=ipHash,enum=headerHash,enum=random"`
HeaderHashKey string `yaml:"headerHashKey" jsonschema:"omitempty"`
}

// Validate validtes the TrafficMatcherSpec.
func (s *TrafficMatcherSpec) Validate() error {
if len(s.Headers) == 0 && s.Probability == nil {
return fmt.Errorf("none of headers and probability is specified")
}

if len(s.Headers) > 0 && s.Probability != nil {
return fmt.Errorf("both headers and probability are specified")
// Validate validtes the RequestMatcherSpec.
func (s *RequestMatcherSpec) Validate() error {
if s.Policy == "general" || s.Policy == "" {
if len(s.Headers) == 0 {
return fmt.Errorf("headers is not specified")
}
} else if s.Permil == 0 {
return fmt.Errorf("permil is not specified")
}

for _, v := range s.Headers {
Expand All @@ -79,64 +66,37 @@ func (s *TrafficMatcherSpec) Validate() error {
}
}

if s.Probability != nil {
return s.Probability.Validate()
}

return nil
}

// Validate validates the ProbabilitySpec.
func (ps *ProbabilitySpec) Validate() error {
if ps.Policy == policyHeaderHash && ps.HeaderHashKey == "" {
return fmt.Errorf("headerHash needs to speficy headerHashKey")
if s.Policy == "headerHash" && s.HeaderHashKey == "" {
return fmt.Errorf("headerHash needs to specify headerHashKey")
}

return nil
}

// NewTrafficMatcher creates a new traffic matcher according to spec.
func NewTrafficMatcher(spec interface{}) (protocols.TrafficMatcher, error) {
tms, ok := spec.(*TrafficMatcherSpec)
if !ok {
data, err := yaml.Marshal(spec)
if err != nil {
return nil, err
}
tms = &TrafficMatcherSpec{}
if err = yaml.Unmarshal(data, tms); err != nil {
return nil, err
}
}

if err := tms.Validate(); err != nil {
return nil, err
}

if len(tms.Headers) > 0 {
// NewRequestMatcher creates a new traffic matcher according to spec.
func NewRequestMatcher(spec *RequestMatcherSpec) (RequestMatcher, error) {
switch spec.Policy {
case "", "general":
matcher := &generalMatcher{
matchAllHeaders: tms.MatchAllHeaders,
headers: tms.Headers,
urls: tms.URLs,
matchAllHeaders: spec.MatchAllHeaders,
headers: spec.Headers,
urls: spec.URLs,
}
matcher.init()
return matcher, nil
}

switch tms.Probability.Policy {
case policyIPHash:
return &ipHashMatcher{permill: tms.Probability.PerMill}, nil
case policyHeaderHash:
case "ipHash":
return &ipHashMatcher{permill: spec.Permil}, nil
case "headerHash":
return &headerHashMatcher{
permill: tms.Probability.PerMill,
headerHashKey: tms.Probability.HeaderHashKey,
permill: spec.Permil,
headerHashKey: spec.HeaderHashKey,
}, nil
case policyRandom:
return &randomMatcher{permill: tms.Probability.PerMill}, nil
case "random":
return &randomMatcher{permill: spec.Permil}, nil
}

logger.Errorf("BUG: unsupported probability policy: %s", tms.Probability.Policy)
return &ipHashMatcher{permill: tms.Probability.PerMill}, nil
logger.Errorf("BUG: unsupported probability policy: %s", spec.Policy)
return &ipHashMatcher{permill: spec.Permil}, nil
}

// randomMatcher implements random request matcher.
Expand All @@ -146,7 +106,7 @@ type randomMatcher struct {
}

// Match implements protocols.Matcher.
func (rm randomMatcher) Match(req protocols.Request) bool {
func (rm randomMatcher) Match(req *httpprot.Request) bool {
return rand.Uint32()%1000 < rm.permill
}

Expand All @@ -157,8 +117,8 @@ type headerHashMatcher struct {
}

// Match implements protocols.Matcher.
func (hhm headerHashMatcher) Match(req protocols.Request) bool {
v := req.(*Request).HTTPHeader().Get(hhm.headerHashKey)
func (hhm headerHashMatcher) Match(req *httpprot.Request) bool {
v := req.HTTPHeader().Get(hhm.headerHashKey)
hash := fnv.New32()
hash.Write([]byte(v))
return hash.Sum32()%1000 < hhm.permill
Expand All @@ -170,8 +130,8 @@ type ipHashMatcher struct {
}

// Match implements protocols.Matcher.
func (iphm ipHashMatcher) Match(req protocols.Request) bool {
ip := req.(*Request).RealIP()
func (iphm ipHashMatcher) Match(req *httpprot.Request) bool {
ip := req.RealIP()
hash := fnv.New32()
hash.Write([]byte(ip))
return hash.Sum32()%1000 < iphm.permill
Expand All @@ -195,9 +155,7 @@ func (gm *generalMatcher) init() {
}

// Match implements protocols.Matcher.
func (gm *generalMatcher) Match(r protocols.Request) bool {
req := r.(*Request)

func (gm *generalMatcher) Match(req *httpprot.Request) bool {
matched := false
if gm.matchAllHeaders {
matched = gm.matchOneHeader(req)
Expand All @@ -212,7 +170,7 @@ func (gm *generalMatcher) Match(r protocols.Request) bool {
return matched
}

func (gm *generalMatcher) matchOneHeader(req *Request) bool {
func (gm *generalMatcher) matchOneHeader(req *httpprot.Request) bool {
h := req.HTTPHeader()

for key, rule := range gm.headers {
Expand All @@ -235,7 +193,7 @@ func (gm *generalMatcher) matchOneHeader(req *Request) bool {
return false
}

func (gm *generalMatcher) matchAllHeader(req *Request) bool {
func (gm *generalMatcher) matchAllHeader(req *httpprot.Request) bool {
h := req.HTTPHeader()

OUTER_LOOP:
Expand All @@ -261,9 +219,9 @@ OUTER_LOOP:
return true
}

func (gm *generalMatcher) matchURL(req *Request) bool {
func (gm *generalMatcher) matchURL(req *httpprot.Request) bool {
for _, url := range gm.urls {
if url.match(req) {
if url.Match(req) {
return true
}
}
Expand Down Expand Up @@ -291,11 +249,7 @@ func (r *MethodAndURLMatcher) init() {
}

// Match matches a request.
func (r *MethodAndURLMatcher) Match(req protocols.Request) bool {
return r.match(req.(*Request))
}

func (r *MethodAndURLMatcher) match(req *Request) bool {
func (r *MethodAndURLMatcher) Match(req *httpprot.Request) bool {
if len(r.Methods) > 0 {
if !stringtool.StrInSlice(req.Method(), r.Methods) {
return false
Expand Down
6 changes: 0 additions & 6 deletions pkg/protocols/protocols.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,8 @@ type Header interface {
Clone() Header
}

// TrafficMatcher is the protocol independent interface to match traffics.
type TrafficMatcher interface {
Match(req Request) bool
}

// Protocol is the interface of a protocol.
type Protocol interface {
CreateRequest(req interface{}) Request
CreateResponse(resp interface{}) Response
CreateTrafficMatcher(spec interface{}) (TrafficMatcher, error)
}

0 comments on commit a6bec09

Please sign in to comment.