Skip to content

Commit

Permalink
Route HTTP request by http headers (#454)
Browse files Browse the repository at this point in the history
  • Loading branch information
qdongxu authored and xxx7xxxx committed Dec 5, 2019
1 parent 9517797 commit 21ae722
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 55 deletions.
1 change: 1 addition & 0 deletions pkg/object/httpserver/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type (
notFound bool
methodNotAllowed bool
backend string

}
)

Expand Down
146 changes: 92 additions & 54 deletions pkg/object/httpserver/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type (
pathRE *regexp.Regexp
methods []string
backend string
headers []*Header
}
)

Expand Down Expand Up @@ -179,6 +180,10 @@ func newMuxPath(parentIPFilters *ipfilter.IPFilters, path *Path) *muxPath {
}
}

for _, p := range path.Headers {
p.initHeaderRoute()
}

return &muxPath{
ipFilter: newIPFilter(path.IPFilter),
ipFilterChain: newIPFilterChain(parentIPFilters, path.IPFilter),
Expand All @@ -189,6 +194,7 @@ func newMuxPath(parentIPFilters *ipfilter.IPFilters, path *Path) *muxPath {
pathRE: pathRE,
methods: path.Methods,
backend: path.Backend,
headers: path.Headers,
}
}

Expand Down Expand Up @@ -228,6 +234,23 @@ func (mp *muxPath) matchMethod(ctx context.HTTPContext) bool {
return stringtool.StrInSlice(ctx.Request().Method(), mp.methods)
}

func (mp *muxPath) matchHeaders(ctx context.HTTPContext) (ci *cacheItem, ok bool) {
for _, h := range mp.headers {
v := ctx.Request().Header().Get(h.Key)
if stringtool.StrInSlice(v, h.Values){
ci = &cacheItem{ipFilterChan: mp.ipFilterChain, backend: h.Backend}
return ci, true
}

if h.Regexp != "" && h.headerRE.MatchString(v) {
ci = &cacheItem{ipFilterChan: mp.ipFilterChain, backend: h.Backend}
return ci, true
}
}

return nil, false
}

func newMux(handlers *sync.Map, httpStat *httpstat.HTTPStat, topN *topn.TopN) *mux {
m := &mux{
handlers: handlers,
Expand Down Expand Up @@ -285,81 +308,96 @@ func (m *mux) ServeHTTP(stdw http.ResponseWriter, stdr *http.Request) {
m.topN.Stat(ctx)
})

w := ctx.Response()

handleIPNotAllow := func() {
ctx.AddTag(stringtool.Cat("ip ", ctx.Request().RealIP(), " not allow"))
w.SetStatusCode(http.StatusForbidden)
}

handleCacheItem := func(ci *cacheItem) {
rules.putCacheItem(ctx, ci)

if ci.ipFilterChan != nil {
if !ci.ipFilterChan.AllowHTTPContext(ctx) {
handleIPNotAllow()
return
}
}

switch {
case ci.notFound:
w.SetStatusCode(http.StatusNotFound)
case ci.methodNotAllowed:
w.SetStatusCode(http.StatusMethodNotAllowed)
case ci.backend != "":
handler, exists := m.handlers.Load(ci.backend)
if exists {
handler, ok := handler.(scheduler.HTTPHandler)
if !ok {
ctx.AddTag(stringtool.Cat("BUG: backend ", ci.backend, " is not handler"))
} else {
handler.Handle(ctx)
}
} else {
ctx.AddTag(stringtool.Cat("backend ", ci.backend, " not found"))
w.SetStatusCode(http.StatusServiceUnavailable)
}
}
}

ci := rules.getCacheItem(ctx)
if ci != nil {
handleCacheItem(ci)
m.handleRequestWithCache(rules, ctx, ci)
return
}

if !rules.pass(ctx) {
handleIPNotAllow()
m.handleIPNotAllow(ctx)
return
}

for _, rule := range rules.rules {
if !rule.match(ctx) {
for _, host := range rules.rules {
if !host.match(ctx) {
continue
}
if !rule.pass(ctx) {
handleIPNotAllow()

if !host.pass(ctx) {
m.handleIPNotAllow(ctx)
return
}

for _, path := range rule.paths {
for _, path := range host.paths {
if !path.matchPath(ctx) {
continue
}

if path.matchMethod(ctx) {
if !path.pass(ctx) {
handleIPNotAllow()
return
}
handleCacheItem(&cacheItem{ipFilterChan: path.ipFilterChain, backend: path.backend})
} else {
handleCacheItem(&cacheItem{ipFilterChan: path.ipFilterChain, methodNotAllowed: true})
if !path.matchMethod(ctx) {
ci = &cacheItem{ipFilterChan: path.ipFilterChain, methodNotAllowed: true}
rules.putCacheItem(ctx, ci)
m.handleRequestWithCache(rules, ctx, ci)
return
}

if !path.pass(ctx) {
m.handleIPNotAllow(ctx)
return
}

ci, ok := path.matchHeaders(ctx)
if ok {
// NOTE: must not cache the route by header
m.handleRequestWithCache(rules, ctx, ci)
return
}

ci = &cacheItem{ipFilterChan: path.ipFilterChain, backend: path.backend}
rules.putCacheItem(ctx, ci)
m.handleRequestWithCache(rules, ctx, ci)
return
}
}

ci = &cacheItem{ipFilterChan: rules.ipFilterChan, notFound: true}
rules.putCacheItem(ctx, ci)
m.handleRequestWithCache(rules, ctx, ci)
}

func (m *mux) handleIPNotAllow(ctx context.HTTPContext) {
ctx.AddTag(stringtool.Cat("ip ", ctx.Request().RealIP(), " not allow"))
ctx.Response().SetStatusCode(http.StatusForbidden)
}

func (m *mux) handleRequestWithCache(rules *muxRules, ctx context.HTTPContext, ci *cacheItem) {
if ci.ipFilterChan != nil {
if !ci.ipFilterChan.AllowHTTPContext(ctx) {
m.handleIPNotAllow(ctx)
return
}
}

handleCacheItem(&cacheItem{ipFilterChan: rules.ipFilterChan, notFound: true})
switch {
case ci.notFound:
ctx.Response().SetStatusCode(http.StatusNotFound)
case ci.methodNotAllowed:
ctx.Response().SetStatusCode(http.StatusMethodNotAllowed)
case ci.backend != "":
rawHandler, exists := m.handlers.Load(ci.backend)
if !exists {
ctx.AddTag(stringtool.Cat("backend ", ci.backend, " not found"))
ctx.Response().SetStatusCode(http.StatusServiceUnavailable)
return
}

handler, ok := rawHandler.(scheduler.HTTPHandler)
if !ok {
ctx.AddTag(stringtool.Cat("BUG: backend ", ci.backend, " is not a http handler"))
ctx.Response().SetStatusCode(http.StatusServiceUnavailable)
return
}

handler.Handle(ctx)
}
}
1 change: 0 additions & 1 deletion pkg/object/httpserver/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ type (
*httpstat.Status
TopN *topn.Status `yaml:"topN"`
}

)

func newRuntime(handlers *sync.Map) *runtime {
Expand Down
27 changes: 27 additions & 0 deletions pkg/object/httpserver/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,24 @@ type (
PathRegexp string `yaml:"pathRegexp,omitempty" v:"omitempty,regexp"`
Methods []string `yaml:"methods,omitempty" v:"unique,dive,httpmethod"`
Backend string `yaml:"backend" v:"required"`
Headers []*Header `yaml:"headers" v:"dive"`

pathRE *regexp.Regexp
}

// Header is the third level entry of router. A header entry is always under a specific path entry, that is to mean
// the headers entry will only be checked after a path entry matched. However, the headers entry has a higher priority
// than the path entry itself.
Header struct {
V string `yaml:"-" v:"parent"`

Key string `yaml:"key" v:"required"`
Values []string `yaml:"values,omitempty" v:"omitempty"`
Regexp string `yaml:"regexp,omitempty" v:"omitempty,regexp"`
Backend string `yaml:"backend" v:"required"`

headerRE *regexp.Regexp
}
)

// Validate validates HTTPServerSpec.
Expand Down Expand Up @@ -81,3 +96,15 @@ func (spec *Spec) tlsConfig() (*tls.Config, error) {

return &tls.Config{Certificates: []tls.Certificate{cert}}, nil
}

func (h *Header) initHeaderRoute() {
h.headerRE = regexp.MustCompile(h.Regexp)
}

func (h Header) Validate() error {
if (h.Values == nil || len(h.Values) == 0) && h.Regexp == "" {
return fmt.Errorf("both of values and regexp are empty for key: %s", h.Key)
}

return nil
}
12 changes: 12 additions & 0 deletions test/config/http-server-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,15 @@ rules:
backend: http-proxy-example
- pathPrefix: /remote
backend: remote-pipeline
headers:
- key: X-Activity-No
values: [ "123456", "124456" ]
backend: remote-pipeline1
- key: X-Activity-No
values: [ "224456" ]
regexp: ^224.*$
backend: remote-pipeline2
- key: X-Activity-No
regexp: ^324.*$
backend: remote-pipeline2

0 comments on commit 21ae722

Please sign in to comment.