Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing TrafficController to manage TrafficGate and Pipeline #55

Merged
merged 4 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ build_server:
CGO_ENABLED=0 go build -v -trimpath -ldflags ${GO_LD_FLAGS} \
-o ${TARGET_SERVER} ${MKFILE_DIR}cmd/server

dev_build_client:
@echo "build dev client"
cd ${MKFILE_DIR} && \
go build -v -race -ldflags ${GO_LD_FLAGS} \
-o ${TARGET_CLIENT} ${MKFILE_DIR}cmd/client

dev_build_server:
@echo "build dev server"
cd ${MKFILE_DIR} && \
go build -v -race -ldflags ${GO_LD_FLAGS} \
-o ${TARGET_SERVER} ${MKFILE_DIR}cmd/server

build_docker:
docker build -t megaease/easegress:${RELEASE} -f ./build/package/Dockerfile .

Expand Down
1 change: 0 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func main() {
os.Exit(1)
}
super := supervisor.MustNew(opt, cls)
supervisor.InitGlobalSupervisor(super)
apiServer := api.MustNewServer(opt, cls)

if graceupdate.CallOriProcessTerm(super.FirstHandleDone()) {
Expand Down
Binary file modified doc/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
34 changes: 17 additions & 17 deletions example/config/http-pipeline-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ flow:
# - filter: fallback
- filter: rateLimiter
- filter: requestAdaptor
- filter: proxy
- filter: proxy
jumpIf: { fallback: END }
- filter: responseAdaptor

Expand Down Expand Up @@ -43,11 +43,11 @@ filters:
- name: policy-one
timeoutDuration: 1000ms
limitRefreshPeriod: 5000ms
limitForPeriod: 2
limitForPeriod: 2
- name: policy-two
timeoutDuration: 100ms
limitRefreshPeriod: 10ms
limitForPeriod: 30
limitForPeriod: 30
defaultPolicyRef: policy-one
urls:
- methods: [GET, POST, PUT, DELETE]
Expand All @@ -68,8 +68,8 @@ filters:
set:
X-Adapt-Key: goodplan
add: {}
- name: proxy
kind: Proxy
- name: proxy
kind: Proxy
failureCodes: [500, 501]
fallback:
forCodes: true
Expand All @@ -88,18 +88,18 @@ filters:
headers:
X-Filter:
exact: mirror
candidatePool:
servers:
- url: http:https://127.0.0.1:9093
- url: http:https://127.0.0.1:9094
loadBalance:
policy: random
headerHashKey: ""
memoryCache: null
filter:
headers:
X-Filter:
exact: candidate
candidatePools:
- servers:
- url: http:https://127.0.0.1:9093
- url: http:https://127.0.0.1:9094
loadBalance:
policy: random
headerHashKey: ""
memoryCache: null
filter:
headers:
X-Filter:
exact: candidate
mainPool:
serversTags: ["v2"]
servers:
Expand Down
6 changes: 2 additions & 4 deletions pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

"github.com/go-chi/chi/v5"
"go.uber.org/zap"

"github.com/megaease/easegress/pkg/cluster"
"github.com/megaease/easegress/pkg/logger"
Expand Down Expand Up @@ -91,15 +90,14 @@ func MustNewServer(opt *option.Options, cluster cluster.Cluster) *Server {
func (s *Server) Close(wg *sync.WaitGroup) {
defer wg.Done()

// Give the server a bit to close connections
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := s.srv.Shutdown(ctx); err != nil {
logger.Errorf("Could not gracefully shutdown the server", zap.Error(err))
logger.Errorf("gracefully shutdown the server failed: %v", err)
}

logger.Infof("Server stopped")
logger.Infof("server stopped")
}

func (s *Server) getMutex() (cluster.Mutex, error) {
Expand Down
25 changes: 16 additions & 9 deletions pkg/filter/apiaggregator/apiaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ const (
var results = []string{resultFailed}

func init() {
httppipeline.Register(&APIAggregator{})
// FIXME: Rewrite APIAggregator becasue the HTTPProxy is eliminated
// I(@xxx7xxxx) think we should not enpower filter to cross pipelines.

// httppipeline.Register(&APIAggregator{})
}

type (
Expand All @@ -58,6 +61,8 @@ type (
super *supervisor.Supervisor
pipeSpec *httppipeline.FilterSpec
spec *Spec

muxMapper protocol.MuxMapper
}

// Spec describes APIAggregator.
Expand Down Expand Up @@ -150,6 +155,11 @@ func (aa *APIAggregator) Handle(ctx context.HTTPContext) (result string) {
return ctx.CallNextHandler(result)
}

// InjectMuxMapper injects mux mapper into APIAggregator.
func (aa *APIAggregator) InjectMuxMapper(mapper protocol.MuxMapper) {
aa.muxMapper = mapper
}

func (aa *APIAggregator) handle(ctx context.HTTPContext) (result string) {
buff := bytes.NewBuffer(nil)
if aa.spec.MaxBodyBytes > 0 {
Expand Down Expand Up @@ -186,17 +196,14 @@ func (aa *APIAggregator) handle(ctx context.HTTPContext) (result string) {
httpResps[i] = nil
return
}
ro, exists := supervisor.Global.GetRunningObject(name, supervisor.CategoryPipeline)

handler, exists := aa.muxMapper.GetHandler(name)

if !exists {
httpResps[i] = nil
} else {
handler, ok := ro.Instance().(protocol.HTTPHandler)
if !ok {
httpResps[i] = nil
} else {
handler.Handle(copyCtx)
httpResps[i] = copyCtx.Response()
}
handler.Handle(copyCtx)
httpResps[i] = copyCtx.Response()
}
}(i, proxy.HTTPProxyName, req)
}
Expand Down
23 changes: 14 additions & 9 deletions pkg/filter/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ A Bridge Filter route requests to from one pipeline to other pipelines or http p
var results = []string{resultDestinationNotFound, resultInvokeDestinationFailed}

func init() {
httppipeline.Register(&Bridge{})
// FIXME: Bridge is a temporary product for some historical reason.
// I(@xxx7xxxx) think we should not enpower filter to cross pipelines.

// httppipeline.Register(&Bridge{})
}

type (
Expand All @@ -59,6 +62,8 @@ type (
super *supervisor.Supervisor
pipeSpec *httppipeline.FilterSpec
spec *Spec

muxMapper protocol.MuxMapper
}

// Spec describes the Mock.
Expand Down Expand Up @@ -113,6 +118,11 @@ func (b *Bridge) Handle(ctx context.HTTPContext) (result string) {
return ctx.CallNextHandler(result)
}

// InjectMuxMapper injects mux mapper into Bridge.
func (b *Bridge) InjectMuxMapper(mapper protocol.MuxMapper) {
b.muxMapper = mapper
}

func (b *Bridge) handle(ctx context.HTTPContext) (result string) {
if len(b.spec.Destinations) <= 0 {
panic("not any destination defined")
Expand Down Expand Up @@ -141,21 +151,16 @@ func (b *Bridge) handle(ctx context.HTTPContext) (result string) {
return resultDestinationNotFound
}

ro, exists := supervisor.Global.GetRunningObject(dest, supervisor.CategoryPipeline)
handler, exists := b.muxMapper.GetHandler(dest)

if !exists {
logger.Errorf("failed to get running object %s", b.spec.Destinations[0])
ctx.Response().SetStatusCode(http.StatusServiceUnavailable)
return resultDestinationNotFound
}

handler, ok := ro.Instance().(protocol.HTTPHandler)
if !ok {
logger.Errorf("%s is not a handler", b.spec.Destinations[0])
ctx.Response().SetStatusCode(http.StatusServiceUnavailable)
return resultInvokeDestinationFailed
}

handler.Handle(ctx)

return ""
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/corsadaptor/corsadaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (a *CORSAdaptor) Init(pipeSpec *httppipeline.FilterSpec, super *supervisor.
a.reload()
}

// Inherit inherits previous generation of APIAggregator.
// Inherit inherits previous generation of CORSAdaptor.
func (a *CORSAdaptor) Inherit(pipeSpec *httppipeline.FilterSpec,
previousGeneration httppipeline.Filter, super *supervisor.Supervisor) {

Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type (
// Status is the status of Proxy.
Status struct {
MainPool *PoolStatus `yaml:"mainPool"`
CandidatePools []*PoolStatus `yaml:"candidatePool,omitempty"`
CandidatePools []*PoolStatus `yaml:"candidatePools,omitempty"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter reference needs to be updated accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been updated already. This is a legacy typo.

MirrorPool *PoolStatus `yaml:"mirrorPool,omitempty"`
}
)
Expand Down
28 changes: 16 additions & 12 deletions pkg/object/easemonitormetrics/easemonitormetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/megaease/easegress/pkg/object/httppipeline"
"github.com/megaease/easegress/pkg/object/httpserver"
"github.com/megaease/easegress/pkg/object/statussynccontroller"
"github.com/megaease/easegress/pkg/object/trafficcontroller"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/httpstat"
)
Expand Down Expand Up @@ -176,8 +177,7 @@ func (emm *EaseMonitorMetrics) Inherit(superSpec *supervisor.Spec,
}

func (emm *EaseMonitorMetrics) reload() {
ssc, exists := emm.super.GetRunningObject((&statussynccontroller.StatusSyncController{}).Kind(),
supervisor.CategorySystemController)
ssc, exists := emm.super.GetSystemController(statussynccontroller.Kind)
if !exists {
logger.Errorf("BUG: status sync controller not found")
}
Expand Down Expand Up @@ -297,14 +297,19 @@ func (emm *EaseMonitorMetrics) record2Messages(record *statussynccontroller.Stat
}

switch status := status.ObjectStatus.(type) {
case *httppipeline.Status:
reqs, codes := emm.httpPipeline2Metrics(baseFields, status)
reqMetrics = append(reqMetrics, reqs...)
codeMetrics = append(codeMetrics, codes...)
case *httpserver.Status:
reqs, codes := emm.httpServer2Metrics(baseFields, status)
reqMetrics = append(reqMetrics, reqs...)
codeMetrics = append(codeMetrics, codes...)
case *trafficcontroller.StatusInSameNamespace:
for name, server := range status.HTTPServers {
baseFields.Service = fmt.Sprintf("%s/%s", baseFields.Service, name)
reqs, codes := emm.httpServer2Metrics(baseFields, server)
reqMetrics = append(reqMetrics, reqs...)
codeMetrics = append(codeMetrics, codes...)
}
for name, pipeline := range status.HTTPPipelines {
baseFields.Service = fmt.Sprintf("%s/%s", baseFields.Service, name)
reqs, codes := emm.httpPipeline2Metrics(baseFields, pipeline)
reqMetrics = append(reqMetrics, reqs...)
codeMetrics = append(codeMetrics, codes...)
}
default:
continue
}
Expand Down Expand Up @@ -338,8 +343,7 @@ func (emm *EaseMonitorMetrics) record2Messages(record *statussynccontroller.Stat
return messages
}

func (emm *EaseMonitorMetrics) httpPipeline2Metrics(
baseFields *GlobalFields, pipelineStatus *httppipeline.Status) (
func (emm *EaseMonitorMetrics) httpPipeline2Metrics(baseFields *GlobalFields, pipelineStatus *httppipeline.Status) (
reqMetrics []*RequestMetrics, codeMetrics []*StatusCodeMetrics) {

for filterName, filterStatus := range pipelineStatus.Filters {
Expand Down
11 changes: 7 additions & 4 deletions pkg/object/function/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/megaease/easegress/pkg/filter/requestadaptor"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/object/httppipeline"
"github.com/megaease/easegress/pkg/protocol"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/httpheader"
"github.com/megaease/easegress/pkg/util/httpstat"
Expand Down Expand Up @@ -166,17 +167,19 @@ func (f *Function) DefaultSpec() interface{} {
}

// Init initializes Function.
func (f *Function) Init(superSpec *supervisor.Spec, super *supervisor.Supervisor) {
func (f *Function) Init(superSpec *supervisor.Spec,
super *supervisor.Supervisor, muxMapper protocol.MuxMapper) {

f.superSpec, f.spec, f.super = superSpec, superSpec.ObjectSpec().(*Spec), super
f.reload()
}

// Inherit inherits previous generation of Function.
func (f *Function) Inherit(superSpec *supervisor.Spec,
previousGeneration supervisor.Object, super *supervisor.Supervisor) {
func (f *Function) Inherit(superSpec *supervisor.Spec, previousGeneration supervisor.Object,
super *supervisor.Supervisor, muxMapper protocol.MuxMapper) {

previousGeneration.Close()
f.Init(superSpec, super)
f.Init(superSpec, super, muxMapper)
}

func (f *Function) reload() {
Expand Down
18 changes: 13 additions & 5 deletions pkg/object/httppipeline/httppipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/protocol"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/stringtool"
)
Expand All @@ -54,6 +55,7 @@ type (
superSpec *supervisor.Spec
spec *Spec

muxMapper protocol.MuxMapper
runningFilters []*runningFilter
ht *context.HTTPTemplate
}
Expand All @@ -77,7 +79,7 @@ type (
JumpIf map[string]string `yaml:"jumpIf" jsonschema:"omitempty"`
}

// Status contains all status gernerated by runtime, for displaying to users.
// Status is the status of HTTPPipeline.
Status struct {
Health string `yaml:"health"`

Expand Down Expand Up @@ -213,7 +215,7 @@ func convertToFilterBuffs(obj interface{}) map[string][]byte {
return rst
}

func (meta *FilterMetaSpec) validate() error {
func (meta *FilterMetaSpec) Validate() error {
if len(meta.Name) == 0 {
return fmt.Errorf("filter name is required")
}
Expand Down Expand Up @@ -317,16 +319,22 @@ func (hp *HTTPPipeline) DefaultSpec() interface{} {
}

// Init initilizes HTTPPipeline.
func (hp *HTTPPipeline) Init(superSpec *supervisor.Spec, super *supervisor.Supervisor) {
func (hp *HTTPPipeline) Init(superSpec *supervisor.Spec,
super *supervisor.Supervisor, muxMapper protocol.MuxMapper) {

hp.superSpec, hp.spec, hp.super = superSpec, superSpec.ObjectSpec().(*Spec), super
hp.muxMapper = muxMapper

hp.reload(nil /*no previous generation*/)
}

// Inherit inherits previous generation of HTTPPipeline.
func (hp *HTTPPipeline) Inherit(superSpec *supervisor.Spec,
previousGeneration supervisor.Object, super *supervisor.Supervisor) {
func (hp *HTTPPipeline) Inherit(superSpec *supervisor.Spec, previousGeneration supervisor.Object,
super *supervisor.Supervisor, muxMapper protocol.MuxMapper) {

hp.superSpec, hp.spec, hp.super = superSpec, superSpec.ObjectSpec().(*Spec), super
hp.muxMapper = muxMapper

hp.reload(previousGeneration.(*HTTPPipeline))

// NOTE: It's filters' responsibility to inherit and clean their resources.
Expand Down
Loading