Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing TrafficController to manage TrafficGate and Pipeline #55

Merged
merged 4 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[supervisor]: remodel supervisor to only handle controller
  • Loading branch information
xxx7xxxx committed Jun 20, 2021
commit ddc3b0090c1b800f1e6ff38b5ab2a2b2e2be3b23
12 changes: 12 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ build_server:
CGO_ENABLED=0 go build -v -trimpath -ldflags ${GO_LD_FLAGS} \
-o ${TARGET_SERVER} ${MKFILE_DIR}cmd/server

dev_build_client:
@echo "build dev client"
cd ${MKFILE_DIR} && \
go build -v -race -ldflags ${GO_LD_FLAGS} \
-o ${TARGET_CLIENT} ${MKFILE_DIR}cmd/client

dev_build_server:
@echo "build dev server"
cd ${MKFILE_DIR} && \
go build -v -race -ldflags ${GO_LD_FLAGS} \
-o ${TARGET_SERVER} ${MKFILE_DIR}cmd/server

build_docker:
docker build -t megaease/easegress:${RELEASE} -f ./build/package/Dockerfile .

Expand Down
1 change: 0 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func main() {
os.Exit(1)
}
super := supervisor.MustNew(opt, cls)
supervisor.InitGlobalSupervisor(super)
apiServer := api.MustNewServer(opt, cls)

if graceupdate.CallOriProcessTerm(super.FirstHandleDone()) {
Expand Down
Binary file modified doc/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 5 additions & 5 deletions example/config/http-pipeline-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ flow:
# - filter: fallback
- filter: rateLimiter
- filter: requestAdaptor
- filter: proxy
- filter: proxy
jumpIf: { fallback: END }
- filter: responseAdaptor

Expand Down Expand Up @@ -43,11 +43,11 @@ filters:
- name: policy-one
timeoutDuration: 1000ms
limitRefreshPeriod: 5000ms
limitForPeriod: 2
limitForPeriod: 2
- name: policy-two
timeoutDuration: 100ms
limitRefreshPeriod: 10ms
limitForPeriod: 30
limitForPeriod: 30
defaultPolicyRef: policy-one
urls:
- methods: [GET, POST, PUT, DELETE]
Expand All @@ -68,8 +68,8 @@ filters:
set:
X-Adapt-Key: goodplan
add: {}
- name: proxy
kind: Proxy
- name: proxy
kind: Proxy
failureCodes: [500, 501]
fallback:
forCodes: true
Expand Down
25 changes: 16 additions & 9 deletions pkg/filter/apiaggregator/apiaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ const (
var results = []string{resultFailed}

func init() {
httppipeline.Register(&APIAggregator{})
// FIXME: Rewrite APIAggregator becasue the HTTPProxy is eliminated
// I(@xxx7xxxx) think we should not enpower filter to cross pipelines.

// httppipeline.Register(&APIAggregator{})
}

type (
Expand All @@ -58,6 +61,8 @@ type (
super *supervisor.Supervisor
pipeSpec *httppipeline.FilterSpec
spec *Spec

muxMapper protocol.MuxMapper
}

// Spec describes APIAggregator.
Expand Down Expand Up @@ -150,6 +155,11 @@ func (aa *APIAggregator) Handle(ctx context.HTTPContext) (result string) {
return ctx.CallNextHandler(result)
}

// InjectMuxMapper injects mux mapper into APIAggregator.
func (aa *APIAggregator) InjectMuxMapper(mapper protocol.MuxMapper) {
aa.muxMapper = mapper
}

func (aa *APIAggregator) handle(ctx context.HTTPContext) (result string) {
buff := bytes.NewBuffer(nil)
if aa.spec.MaxBodyBytes > 0 {
Expand Down Expand Up @@ -186,17 +196,14 @@ func (aa *APIAggregator) handle(ctx context.HTTPContext) (result string) {
httpResps[i] = nil
return
}
ro, exists := supervisor.Global.GetRunningObject(name, supervisor.CategoryPipeline)

handler, exists := aa.muxMapper.GetHandler(name)

if !exists {
httpResps[i] = nil
} else {
handler, ok := ro.Instance().(protocol.HTTPHandler)
if !ok {
httpResps[i] = nil
} else {
handler.Handle(copyCtx)
httpResps[i] = copyCtx.Response()
}
handler.Handle(copyCtx)
httpResps[i] = copyCtx.Response()
}
}(i, proxy.HTTPProxyName, req)
}
Expand Down
23 changes: 14 additions & 9 deletions pkg/filter/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ A Bridge Filter route requests to from one pipeline to other pipelines or http p
var results = []string{resultDestinationNotFound, resultInvokeDestinationFailed}

func init() {
httppipeline.Register(&Bridge{})
// FIXME: Bridge is a temporary product for some historical reason.
// I(@xxx7xxxx) think we should not enpower filter to cross pipelines.

// httppipeline.Register(&Bridge{})
}

type (
Expand All @@ -59,6 +62,8 @@ type (
super *supervisor.Supervisor
pipeSpec *httppipeline.FilterSpec
spec *Spec

muxMapper protocol.MuxMapper
}

// Spec describes the Mock.
Expand Down Expand Up @@ -113,6 +118,11 @@ func (b *Bridge) Handle(ctx context.HTTPContext) (result string) {
return ctx.CallNextHandler(result)
}

// InjectMuxMapper injects mux mapper into Bridge.
func (b *Bridge) InjectMuxMapper(mapper protocol.MuxMapper) {
b.muxMapper = mapper
}

func (b *Bridge) handle(ctx context.HTTPContext) (result string) {
if len(b.spec.Destinations) <= 0 {
panic("not any destination defined")
Expand Down Expand Up @@ -141,21 +151,16 @@ func (b *Bridge) handle(ctx context.HTTPContext) (result string) {
return resultDestinationNotFound
}

ro, exists := supervisor.Global.GetRunningObject(dest, supervisor.CategoryPipeline)
handler, exists := b.muxMapper.GetHandler(dest)

if !exists {
logger.Errorf("failed to get running object %s", b.spec.Destinations[0])
ctx.Response().SetStatusCode(http.StatusServiceUnavailable)
return resultDestinationNotFound
}

handler, ok := ro.Instance().(protocol.HTTPHandler)
if !ok {
logger.Errorf("%s is not a handler", b.spec.Destinations[0])
ctx.Response().SetStatusCode(http.StatusServiceUnavailable)
return resultInvokeDestinationFailed
}

handler.Handle(ctx)

return ""
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/corsadaptor/corsadaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (a *CORSAdaptor) Init(pipeSpec *httppipeline.FilterSpec, super *supervisor.
a.reload()
}

// Inherit inherits previous generation of APIAggregator.
// Inherit inherits previous generation of CORSAdaptor.
func (a *CORSAdaptor) Inherit(pipeSpec *httppipeline.FilterSpec,
previousGeneration httppipeline.Filter, super *supervisor.Supervisor) {

Expand Down
4 changes: 2 additions & 2 deletions pkg/object/easemonitormetrics/easemonitormetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ func (emm *EaseMonitorMetrics) Inherit(superSpec *supervisor.Spec,
}

func (emm *EaseMonitorMetrics) reload() {
ssc, exists := emm.super.GetRunningObject((&statussynccontroller.StatusSyncController{}).Kind(),
supervisor.CategorySystemController)
ssc, exists := emm.super.GetSystemController(statussynccontroller.Kind)
if !exists {
logger.Errorf("BUG: status sync controller not found")
}
Expand Down Expand Up @@ -296,6 +295,7 @@ func (emm *EaseMonitorMetrics) record2Messages(record *statussynccontroller.Stat
Service: objectName,
}

// TODO: Change it to adapt new model.
switch status := status.ObjectStatus.(type) {
case *httppipeline.Status:
reqs, codes := emm.httpPipeline2Metrics(baseFields, status)
Expand Down
11 changes: 7 additions & 4 deletions pkg/object/function/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/megaease/easegress/pkg/filter/requestadaptor"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/object/httppipeline"
"github.com/megaease/easegress/pkg/protocol"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/httpheader"
"github.com/megaease/easegress/pkg/util/httpstat"
Expand Down Expand Up @@ -166,17 +167,19 @@ func (f *Function) DefaultSpec() interface{} {
}

// Init initializes Function.
func (f *Function) Init(superSpec *supervisor.Spec, super *supervisor.Supervisor) {
func (f *Function) Init(superSpec *supervisor.Spec,
super *supervisor.Supervisor, muxMapper protocol.MuxMapper) {

f.superSpec, f.spec, f.super = superSpec, superSpec.ObjectSpec().(*Spec), super
f.reload()
}

// Inherit inherits previous generation of Function.
func (f *Function) Inherit(superSpec *supervisor.Spec,
previousGeneration supervisor.Object, super *supervisor.Supervisor) {
func (f *Function) Inherit(superSpec *supervisor.Spec, previousGeneration supervisor.Object,
super *supervisor.Supervisor, muxMapper protocol.MuxMapper) {

previousGeneration.Close()
f.Init(superSpec, super)
f.Init(superSpec, super, muxMapper)
}

func (f *Function) reload() {
Expand Down
18 changes: 13 additions & 5 deletions pkg/object/httppipeline/httppipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/protocol"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/stringtool"
)
Expand All @@ -54,6 +55,7 @@ type (
superSpec *supervisor.Spec
spec *Spec

muxMapper protocol.MuxMapper
runningFilters []*runningFilter
ht *context.HTTPTemplate
}
Expand All @@ -77,7 +79,7 @@ type (
JumpIf map[string]string `yaml:"jumpIf" jsonschema:"omitempty"`
}

// Status contains all status gernerated by runtime, for displaying to users.
// Status is the status of HTTPPipeline.
Status struct {
Health string `yaml:"health"`

Expand Down Expand Up @@ -213,7 +215,7 @@ func convertToFilterBuffs(obj interface{}) map[string][]byte {
return rst
}

func (meta *FilterMetaSpec) validate() error {
func (meta *FilterMetaSpec) Validate() error {
if len(meta.Name) == 0 {
return fmt.Errorf("filter name is required")
}
Expand Down Expand Up @@ -317,16 +319,22 @@ func (hp *HTTPPipeline) DefaultSpec() interface{} {
}

// Init initilizes HTTPPipeline.
func (hp *HTTPPipeline) Init(superSpec *supervisor.Spec, super *supervisor.Supervisor) {
func (hp *HTTPPipeline) Init(superSpec *supervisor.Spec,
super *supervisor.Supervisor, muxMapper protocol.MuxMapper) {

hp.superSpec, hp.spec, hp.super = superSpec, superSpec.ObjectSpec().(*Spec), super
hp.muxMapper = muxMapper

hp.reload(nil /*no previous generation*/)
}

// Inherit inherits previous generation of HTTPPipeline.
func (hp *HTTPPipeline) Inherit(superSpec *supervisor.Spec,
previousGeneration supervisor.Object, super *supervisor.Supervisor) {
func (hp *HTTPPipeline) Inherit(superSpec *supervisor.Spec, previousGeneration supervisor.Object,
super *supervisor.Supervisor, muxMapper protocol.MuxMapper) {

hp.superSpec, hp.spec, hp.super = superSpec, superSpec.ObjectSpec().(*Spec), super
hp.muxMapper = muxMapper

hp.reload(previousGeneration.(*HTTPPipeline))

// NOTE: It's filters' responsibility to inherit and clean their resources.
Expand Down
46 changes: 8 additions & 38 deletions pkg/object/httpserver/httpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
package httpserver

import (
"time"

"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/protocol"
"github.com/megaease/easegress/pkg/supervisor"
)
Expand All @@ -31,8 +28,6 @@ const (

// Kind is the kind of HTTPServer.
Kind = "HTTPServer"

blockTimeout = 100 * time.Millisecond
)

func init() {
Expand All @@ -44,32 +39,8 @@ type (
HTTPServer struct {
runtime *runtime
}

// MuxMapper gets HTTP handler pipeline with mutex
MuxMapper interface {
Get(name string) (protocol.HTTPHandler, bool)
}

// SupervisorMapper calls supervisor for getting pipeline.
SupervisorMapper struct {
super *supervisor.Supervisor
}
)

// Get gets pipeline from EG's running object
func (s *SupervisorMapper) Get(name string) (protocol.HTTPHandler, bool) {
if ro, exist := s.super.GetRunningObject(name, supervisor.CategoryPipeline); exist == false {
return nil, false
} else {
if handler, ok := ro.Instance().(protocol.HTTPHandler); !ok {
logger.Errorf("BUG: %s is not a HTTPHandler", name)
return nil, false
} else {
return handler, true
}
}
}

// Category returns the category of HTTPServer.
func (hs *HTTPServer) Category() supervisor.ObjectCategory {
return Category
Expand All @@ -90,24 +61,28 @@ func (hs *HTTPServer) DefaultSpec() interface{} {
}

// Init initilizes HTTPServer.
func (hs *HTTPServer) Init(superSpec *supervisor.Spec, super *supervisor.Supervisor) {
hs.runtime = newRuntime(super)
func (hs *HTTPServer) Init(superSpec *supervisor.Spec,
super *supervisor.Supervisor, muxMapper protocol.MuxMapper) {

hs.runtime = newRuntime(super, muxMapper)

hs.runtime.eventChan <- &eventReload{
nextSuperSpec: superSpec,
super: super,
muxMapper: muxMapper,
}
}

// Inherit inherits previous generation of HTTPServer.
func (hs *HTTPServer) Inherit(superSpec *supervisor.Spec,
previousGeneration supervisor.Object, super *supervisor.Supervisor) {
func (hs *HTTPServer) Inherit(superSpec *supervisor.Spec, previousGeneration supervisor.Object,
super *supervisor.Supervisor, muxMapper protocol.MuxMapper) {

hs.runtime = previousGeneration.(*HTTPServer).runtime

hs.runtime.eventChan <- &eventReload{
nextSuperSpec: superSpec,
super: super,
muxMapper: muxMapper,
}
}

Expand All @@ -122,8 +97,3 @@ func (hs *HTTPServer) Status() *supervisor.Status {
func (hs *HTTPServer) Close() {
hs.runtime.Close()
}

// InjectMuxMapper inject a new mux mapper to route, it will cover the default map of supervisor.
func (hs *HTTPServer) InjectMuxMapper(mapper MuxMapper) {
hs.runtime.SetMuxMapper(mapper)
}
Loading