Skip to content

Commit

Permalink
Introducing TrafficController to manage TrafficGate and Pipeline (#55)
Browse files Browse the repository at this point in the history
* [supervisor]: remodel supervisor to only handle controller

* [easemonitormetrics]: Update EaseMonitorMetrics for adapting to new model

* [api]: Clean some message of api

* [trafficcontroller]: Clean naming and typo
  • Loading branch information
xxx7xxxx committed Jun 24, 2021
1 parent fe0cdc9 commit d0d9c8b
Show file tree
Hide file tree
Showing 29 changed files with 1,712 additions and 687 deletions.
12 changes: 12 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ build_server:
CGO_ENABLED=0 go build -v -trimpath -ldflags ${GO_LD_FLAGS} \
-o ${TARGET_SERVER} ${MKFILE_DIR}cmd/server

dev_build_client:
@echo "build dev client"
cd ${MKFILE_DIR} && \
go build -v -race -ldflags ${GO_LD_FLAGS} \
-o ${TARGET_CLIENT} ${MKFILE_DIR}cmd/client

dev_build_server:
@echo "build dev server"
cd ${MKFILE_DIR} && \
go build -v -race -ldflags ${GO_LD_FLAGS} \
-o ${TARGET_SERVER} ${MKFILE_DIR}cmd/server

build_docker:
docker build -t megaease/easegress:${RELEASE} -f ./build/package/Dockerfile .

Expand Down
1 change: 0 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func main() {
os.Exit(1)
}
super := supervisor.MustNew(opt, cls)
supervisor.InitGlobalSupervisor(super)
apiServer := api.MustNewServer(opt, cls)

if graceupdate.CallOriProcessTerm(super.FirstHandleDone()) {
Expand Down
Binary file modified doc/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
34 changes: 17 additions & 17 deletions example/config/http-pipeline-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ flow:
# - filter: fallback
- filter: rateLimiter
- filter: requestAdaptor
- filter: proxy
- filter: proxy
jumpIf: { fallback: END }
- filter: responseAdaptor

Expand Down Expand Up @@ -43,11 +43,11 @@ filters:
- name: policy-one
timeoutDuration: 1000ms
limitRefreshPeriod: 5000ms
limitForPeriod: 2
limitForPeriod: 2
- name: policy-two
timeoutDuration: 100ms
limitRefreshPeriod: 10ms
limitForPeriod: 30
limitForPeriod: 30
defaultPolicyRef: policy-one
urls:
- methods: [GET, POST, PUT, DELETE]
Expand All @@ -68,8 +68,8 @@ filters:
set:
X-Adapt-Key: goodplan
add: {}
- name: proxy
kind: Proxy
- name: proxy
kind: Proxy
failureCodes: [500, 501]
fallback:
forCodes: true
Expand All @@ -88,18 +88,18 @@ filters:
headers:
X-Filter:
exact: mirror
candidatePool:
servers:
- url: https://127.0.0.1:9093
- url: https://127.0.0.1:9094
loadBalance:
policy: random
headerHashKey: ""
memoryCache: null
filter:
headers:
X-Filter:
exact: candidate
candidatePools:
- servers:
- url: https://127.0.0.1:9093
- url: https://127.0.0.1:9094
loadBalance:
policy: random
headerHashKey: ""
memoryCache: null
filter:
headers:
X-Filter:
exact: candidate
mainPool:
serversTags: ["v2"]
servers:
Expand Down
6 changes: 2 additions & 4 deletions pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

"github.com/go-chi/chi/v5"
"go.uber.org/zap"

"github.com/megaease/easegress/pkg/cluster"
"github.com/megaease/easegress/pkg/logger"
Expand Down Expand Up @@ -91,15 +90,14 @@ func MustNewServer(opt *option.Options, cluster cluster.Cluster) *Server {
func (s *Server) Close(wg *sync.WaitGroup) {
defer wg.Done()

// Give the server a bit to close connections
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := s.srv.Shutdown(ctx); err != nil {
logger.Errorf("Could not gracefully shutdown the server", zap.Error(err))
logger.Errorf("gracefully shutdown the server failed: %v", err)
}

logger.Infof("Server stopped")
logger.Infof("server stopped")
}

func (s *Server) getMutex() (cluster.Mutex, error) {
Expand Down
25 changes: 16 additions & 9 deletions pkg/filter/apiaggregator/apiaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ const (
var results = []string{resultFailed}

func init() {
httppipeline.Register(&APIAggregator{})
// FIXME: Rewrite APIAggregator becasue the HTTPProxy is eliminated
// I(@xxx7xxxx) think we should not enpower filter to cross pipelines.

// httppipeline.Register(&APIAggregator{})
}

type (
Expand All @@ -58,6 +61,8 @@ type (
super *supervisor.Supervisor
pipeSpec *httppipeline.FilterSpec
spec *Spec

muxMapper protocol.MuxMapper
}

// Spec describes APIAggregator.
Expand Down Expand Up @@ -150,6 +155,11 @@ func (aa *APIAggregator) Handle(ctx context.HTTPContext) (result string) {
return ctx.CallNextHandler(result)
}

// InjectMuxMapper injects mux mapper into APIAggregator.
func (aa *APIAggregator) InjectMuxMapper(mapper protocol.MuxMapper) {
aa.muxMapper = mapper
}

func (aa *APIAggregator) handle(ctx context.HTTPContext) (result string) {
buff := bytes.NewBuffer(nil)
if aa.spec.MaxBodyBytes > 0 {
Expand Down Expand Up @@ -186,17 +196,14 @@ func (aa *APIAggregator) handle(ctx context.HTTPContext) (result string) {
httpResps[i] = nil
return
}
ro, exists := supervisor.Global.GetRunningObject(name, supervisor.CategoryPipeline)

handler, exists := aa.muxMapper.GetHandler(name)

if !exists {
httpResps[i] = nil
} else {
handler, ok := ro.Instance().(protocol.HTTPHandler)
if !ok {
httpResps[i] = nil
} else {
handler.Handle(copyCtx)
httpResps[i] = copyCtx.Response()
}
handler.Handle(copyCtx)
httpResps[i] = copyCtx.Response()
}
}(i, proxy.HTTPProxyName, req)
}
Expand Down
23 changes: 14 additions & 9 deletions pkg/filter/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ A Bridge Filter route requests to from one pipeline to other pipelines or http p
var results = []string{resultDestinationNotFound, resultInvokeDestinationFailed}

func init() {
httppipeline.Register(&Bridge{})
// FIXME: Bridge is a temporary product for some historical reason.
// I(@xxx7xxxx) think we should not enpower filter to cross pipelines.

// httppipeline.Register(&Bridge{})
}

type (
Expand All @@ -59,6 +62,8 @@ type (
super *supervisor.Supervisor
pipeSpec *httppipeline.FilterSpec
spec *Spec

muxMapper protocol.MuxMapper
}

// Spec describes the Mock.
Expand Down Expand Up @@ -113,6 +118,11 @@ func (b *Bridge) Handle(ctx context.HTTPContext) (result string) {
return ctx.CallNextHandler(result)
}

// InjectMuxMapper injects mux mapper into Bridge.
func (b *Bridge) InjectMuxMapper(mapper protocol.MuxMapper) {
b.muxMapper = mapper
}

func (b *Bridge) handle(ctx context.HTTPContext) (result string) {
if len(b.spec.Destinations) <= 0 {
panic("not any destination defined")
Expand Down Expand Up @@ -141,21 +151,16 @@ func (b *Bridge) handle(ctx context.HTTPContext) (result string) {
return resultDestinationNotFound
}

ro, exists := supervisor.Global.GetRunningObject(dest, supervisor.CategoryPipeline)
handler, exists := b.muxMapper.GetHandler(dest)

if !exists {
logger.Errorf("failed to get running object %s", b.spec.Destinations[0])
ctx.Response().SetStatusCode(http.StatusServiceUnavailable)
return resultDestinationNotFound
}

handler, ok := ro.Instance().(protocol.HTTPHandler)
if !ok {
logger.Errorf("%s is not a handler", b.spec.Destinations[0])
ctx.Response().SetStatusCode(http.StatusServiceUnavailable)
return resultInvokeDestinationFailed
}

handler.Handle(ctx)

return ""
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/corsadaptor/corsadaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (a *CORSAdaptor) Init(pipeSpec *httppipeline.FilterSpec, super *supervisor.
a.reload()
}

// Inherit inherits previous generation of APIAggregator.
// Inherit inherits previous generation of CORSAdaptor.
func (a *CORSAdaptor) Inherit(pipeSpec *httppipeline.FilterSpec,
previousGeneration httppipeline.Filter, super *supervisor.Supervisor) {

Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type (
// Status is the status of Proxy.
Status struct {
MainPool *PoolStatus `yaml:"mainPool"`
CandidatePools []*PoolStatus `yaml:"candidatePool,omitempty"`
CandidatePools []*PoolStatus `yaml:"candidatePools,omitempty"`
MirrorPool *PoolStatus `yaml:"mirrorPool,omitempty"`
}
)
Expand Down
28 changes: 16 additions & 12 deletions pkg/object/easemonitormetrics/easemonitormetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/megaease/easegress/pkg/object/httppipeline"
"github.com/megaease/easegress/pkg/object/httpserver"
"github.com/megaease/easegress/pkg/object/statussynccontroller"
"github.com/megaease/easegress/pkg/object/trafficcontroller"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/httpstat"
)
Expand Down Expand Up @@ -176,8 +177,7 @@ func (emm *EaseMonitorMetrics) Inherit(superSpec *supervisor.Spec,
}

func (emm *EaseMonitorMetrics) reload() {
ssc, exists := emm.super.GetRunningObject((&statussynccontroller.StatusSyncController{}).Kind(),
supervisor.CategorySystemController)
ssc, exists := emm.super.GetSystemController(statussynccontroller.Kind)
if !exists {
logger.Errorf("BUG: status sync controller not found")
}
Expand Down Expand Up @@ -297,14 +297,19 @@ func (emm *EaseMonitorMetrics) record2Messages(record *statussynccontroller.Stat
}

switch status := status.ObjectStatus.(type) {
case *httppipeline.Status:
reqs, codes := emm.httpPipeline2Metrics(baseFields, status)
reqMetrics = append(reqMetrics, reqs...)
codeMetrics = append(codeMetrics, codes...)
case *httpserver.Status:
reqs, codes := emm.httpServer2Metrics(baseFields, status)
reqMetrics = append(reqMetrics, reqs...)
codeMetrics = append(codeMetrics, codes...)
case *trafficcontroller.StatusInSameNamespace:
for name, server := range status.HTTPServers {
baseFields.Service = fmt.Sprintf("%s/%s", baseFields.Service, name)
reqs, codes := emm.httpServer2Metrics(baseFields, server)
reqMetrics = append(reqMetrics, reqs...)
codeMetrics = append(codeMetrics, codes...)
}
for name, pipeline := range status.HTTPPipelines {
baseFields.Service = fmt.Sprintf("%s/%s", baseFields.Service, name)
reqs, codes := emm.httpPipeline2Metrics(baseFields, pipeline)
reqMetrics = append(reqMetrics, reqs...)
codeMetrics = append(codeMetrics, codes...)
}
default:
continue
}
Expand Down Expand Up @@ -338,8 +343,7 @@ func (emm *EaseMonitorMetrics) record2Messages(record *statussynccontroller.Stat
return messages
}

func (emm *EaseMonitorMetrics) httpPipeline2Metrics(
baseFields *GlobalFields, pipelineStatus *httppipeline.Status) (
func (emm *EaseMonitorMetrics) httpPipeline2Metrics(baseFields *GlobalFields, pipelineStatus *httppipeline.Status) (
reqMetrics []*RequestMetrics, codeMetrics []*StatusCodeMetrics) {

for filterName, filterStatus := range pipelineStatus.Filters {
Expand Down
11 changes: 7 additions & 4 deletions pkg/object/function/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/megaease/easegress/pkg/filter/requestadaptor"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/object/httppipeline"
"github.com/megaease/easegress/pkg/protocol"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/httpheader"
"github.com/megaease/easegress/pkg/util/httpstat"
Expand Down Expand Up @@ -166,17 +167,19 @@ func (f *Function) DefaultSpec() interface{} {
}

// Init initializes Function.
func (f *Function) Init(superSpec *supervisor.Spec, super *supervisor.Supervisor) {
func (f *Function) Init(superSpec *supervisor.Spec,
super *supervisor.Supervisor, muxMapper protocol.MuxMapper) {

f.superSpec, f.spec, f.super = superSpec, superSpec.ObjectSpec().(*Spec), super
f.reload()
}

// Inherit inherits previous generation of Function.
func (f *Function) Inherit(superSpec *supervisor.Spec,
previousGeneration supervisor.Object, super *supervisor.Supervisor) {
func (f *Function) Inherit(superSpec *supervisor.Spec, previousGeneration supervisor.Object,
super *supervisor.Supervisor, muxMapper protocol.MuxMapper) {

previousGeneration.Close()
f.Init(superSpec, super)
f.Init(superSpec, super, muxMapper)
}

func (f *Function) reload() {
Expand Down
18 changes: 13 additions & 5 deletions pkg/object/httppipeline/httppipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/protocol"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/stringtool"
)
Expand All @@ -54,6 +55,7 @@ type (
superSpec *supervisor.Spec
spec *Spec

muxMapper protocol.MuxMapper
runningFilters []*runningFilter
ht *context.HTTPTemplate
}
Expand All @@ -77,7 +79,7 @@ type (
JumpIf map[string]string `yaml:"jumpIf" jsonschema:"omitempty"`
}

// Status contains all status gernerated by runtime, for displaying to users.
// Status is the status of HTTPPipeline.
Status struct {
Health string `yaml:"health"`

Expand Down Expand Up @@ -213,7 +215,7 @@ func convertToFilterBuffs(obj interface{}) map[string][]byte {
return rst
}

func (meta *FilterMetaSpec) validate() error {
func (meta *FilterMetaSpec) Validate() error {
if len(meta.Name) == 0 {
return fmt.Errorf("filter name is required")
}
Expand Down Expand Up @@ -317,16 +319,22 @@ func (hp *HTTPPipeline) DefaultSpec() interface{} {
}

// Init initilizes HTTPPipeline.
func (hp *HTTPPipeline) Init(superSpec *supervisor.Spec, super *supervisor.Supervisor) {
func (hp *HTTPPipeline) Init(superSpec *supervisor.Spec,
super *supervisor.Supervisor, muxMapper protocol.MuxMapper) {

hp.superSpec, hp.spec, hp.super = superSpec, superSpec.ObjectSpec().(*Spec), super
hp.muxMapper = muxMapper

hp.reload(nil /*no previous generation*/)
}

// Inherit inherits previous generation of HTTPPipeline.
func (hp *HTTPPipeline) Inherit(superSpec *supervisor.Spec,
previousGeneration supervisor.Object, super *supervisor.Supervisor) {
func (hp *HTTPPipeline) Inherit(superSpec *supervisor.Spec, previousGeneration supervisor.Object,
super *supervisor.Supervisor, muxMapper protocol.MuxMapper) {

hp.superSpec, hp.spec, hp.super = superSpec, superSpec.ObjectSpec().(*Spec), super
hp.muxMapper = muxMapper

hp.reload(previousGeneration.(*HTTPPipeline))

// NOTE: It's filters' responsibility to inherit and clean their resources.
Expand Down
Loading

0 comments on commit d0d9c8b

Please sign in to comment.