Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing TrafficController to manage TrafficGate and Pipeline #55

Merged
merged 4 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[easemonitormetrics]: Update EaseMonitorMetrics for adapting to new m…
…odel
  • Loading branch information
xxx7xxxx committed Jun 22, 2021
commit 89980a6e8fc6a179d524998dad9f16b4c66b9a53
24 changes: 12 additions & 12 deletions example/config/http-pipeline-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,18 @@ filters:
headers:
X-Filter:
exact: mirror
candidatePool:
servers:
- url: http:https://127.0.0.1:9093
- url: http:https://127.0.0.1:9094
loadBalance:
policy: random
headerHashKey: ""
memoryCache: null
filter:
headers:
X-Filter:
exact: candidate
candidatePools:
- servers:
- url: http:https://127.0.0.1:9093
- url: http:https://127.0.0.1:9094
loadBalance:
policy: random
headerHashKey: ""
memoryCache: null
filter:
headers:
X-Filter:
exact: candidate
mainPool:
serversTags: ["v2"]
servers:
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type (
// Status is the status of Proxy.
Status struct {
MainPool *PoolStatus `yaml:"mainPool"`
CandidatePools []*PoolStatus `yaml:"candidatePool,omitempty"`
CandidatePools []*PoolStatus `yaml:"candidatePools,omitempty"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter reference needs to be updated accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been updated already. This is a legacy typo.

MirrorPool *PoolStatus `yaml:"mirrorPool,omitempty"`
}
)
Expand Down
26 changes: 15 additions & 11 deletions pkg/object/easemonitormetrics/easemonitormetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/megaease/easegress/pkg/object/httppipeline"
"github.com/megaease/easegress/pkg/object/httpserver"
"github.com/megaease/easegress/pkg/object/statussynccontroller"
"github.com/megaease/easegress/pkg/object/trafficcontroller"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/httpstat"
)
Expand Down Expand Up @@ -295,16 +296,20 @@ func (emm *EaseMonitorMetrics) record2Messages(record *statussynccontroller.Stat
Service: objectName,
}

// TODO: Change it to adapt new model.
switch status := status.ObjectStatus.(type) {
case *httppipeline.Status:
reqs, codes := emm.httpPipeline2Metrics(baseFields, status)
reqMetrics = append(reqMetrics, reqs...)
codeMetrics = append(codeMetrics, codes...)
case *httpserver.Status:
reqs, codes := emm.httpServer2Metrics(baseFields, status)
reqMetrics = append(reqMetrics, reqs...)
codeMetrics = append(codeMetrics, codes...)
case *trafficcontroller.StatusOneSpace:
for name, server := range status.HTTPServers {
baseFields.Service = fmt.Sprintf("%s/%s", baseFields.Service, name)
reqs, codes := emm.httpServer2Metrics(baseFields, server)
reqMetrics = append(reqMetrics, reqs...)
codeMetrics = append(codeMetrics, codes...)
}
for name, pipeline := range status.HTTPPipelines {
baseFields.Service = fmt.Sprintf("%s/%s", baseFields.Service, name)
reqs, codes := emm.httpPipeline2Metrics(baseFields, pipeline)
reqMetrics = append(reqMetrics, reqs...)
codeMetrics = append(codeMetrics, codes...)
}
default:
continue
}
Expand Down Expand Up @@ -338,8 +343,7 @@ func (emm *EaseMonitorMetrics) record2Messages(record *statussynccontroller.Stat
return messages
}

func (emm *EaseMonitorMetrics) httpPipeline2Metrics(
baseFields *GlobalFields, pipelineStatus *httppipeline.Status) (
func (emm *EaseMonitorMetrics) httpPipeline2Metrics(baseFields *GlobalFields, pipelineStatus *httppipeline.Status) (
reqMetrics []*RequestMetrics, codeMetrics []*StatusCodeMetrics) {

for filterName, filterStatus := range pipelineStatus.Filters {
Expand Down
27 changes: 12 additions & 15 deletions pkg/object/rawconfigtrafficcontroller/rawconfigtrafficcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ type (
}

// Status is the status of RawConfigTrafficController.
Status struct {
HTTPServers map[string]interface{} `yaml:"httpServers"`
HTTPipelines map[string]interface{} `yaml:"httpPipelines"`
}
Status = trafficcontroller.StatusOneSpace
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to easemonitormetrics.go implementation, StatusOneSpace seems to only be used to let HTTPServer and HTTPPiepline's status records reporting to the same metric name. (SameSpace?) Proposal to add more comments about this status or change it to a more meaningful name.(StatusSameNamespace)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

)

func init() {
Expand Down Expand Up @@ -187,20 +184,20 @@ func (rctc *RawConfigTrafficController) handleEvent(event *supervisor.ObjectEnti
// Status returns the status of RawConfigTrafficController.
func (rctc *RawConfigTrafficController) Status() *supervisor.Status {
status := &Status{
HTTPServers: make(map[string]interface{}),
HTTPipelines: make(map[string]interface{}),
Namespace: rctc.namespace,
HTTPServers: make(map[string]*httpserver.Status),
HTTPPipelines: make(map[string]*httppipeline.Status),
}

servers := rctc.tc.ListHTTPServers(rctc.namespace)
pipelines := rctc.tc.ListHTTPPipelines(rctc.namespace)

for _, entity := range servers {
status.HTTPServers[entity.Spec().Name()] = entity.Instance().Status().ObjectStatus
}
rctc.tc.WalkHTTPServers(rctc.namespace, func(entity *supervisor.ObjectEntity) bool {
status.HTTPServers[entity.Spec().Name()] = entity.Instance().Status().ObjectStatus.(*httpserver.Status)
return true
})

for _, entity := range pipelines {
status.HTTPipelines[entity.Spec().Name()] = entity.Instance().Status().ObjectStatus
}
rctc.tc.WalkHTTPPipelines(rctc.namespace, func(entity *supervisor.ObjectEntity) bool {
status.HTTPPipelines[entity.Spec().Name()] = entity.Instance().Status().ObjectStatus.(*httppipeline.Status)
return true
})

return &supervisor.Status{
ObjectStatus: status,
Expand Down
82 changes: 80 additions & 2 deletions pkg/object/trafficcontroller/trafficcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package trafficcontroller

import (
"fmt"
"runtime/debug"
"sync"

"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/object/httppipeline"
"github.com/megaease/easegress/pkg/object/httpserver"
"github.com/megaease/easegress/pkg/protocol"
"github.com/megaease/easegress/pkg/supervisor"
)
Expand Down Expand Up @@ -56,9 +59,25 @@ type (
httppipelines sync.Map
}

// WalkHTTPServerFunc is the type of the function called for
// walking http server and http pipeline.
WalkFunc = supervisor.WalkFunc

// Spec describes TrafficController.
Spec struct {
}

Status struct {
Namespaces []string `yaml:"namespaces"`
}

// StatusOneSpace is the universal status in one space.
// TrafficController won't use it.
StatusOneSpace struct {
Namespace string `yaml:"namespace"`
HTTPServers map[string]*httpserver.Status `yaml:"httpServers"`
HTTPPipelines map[string]*httppipeline.Status `yaml:"httpPipelines"`
}
)

func init() {
Expand Down Expand Up @@ -291,6 +310,48 @@ func (tc *TrafficController) ListHTTPServers(namespace string) []*supervisor.Obj
return entities
}

func (tc *TrafficController) WalkHTTPServers(namespace string, walkFn WalkFunc) {
defer func() {
if err := recover(); err != nil {
logger.Errorf("walkHTTPServers recover from err: %v, stack trace:\n%s\n",
err, debug.Stack())
}
}()

tc.mutex.Lock()
defer tc.mutex.Unlock()

space, exists := tc.spaces[namespace]
if !exists {
return
}

space.httpservers.Range(func(k, v interface{}) bool {
return walkFn(v.(*supervisor.ObjectEntity))
})
}

func (tc *TrafficController) WalkHTTPPipelines(namespace string, walkFn WalkFunc) {
defer func() {
if err := recover(); err != nil {
logger.Errorf("walkHTTPPipelines recover from err: %v, stack trace:\n%s\n",
err, debug.Stack())
}
}()

tc.mutex.Lock()
defer tc.mutex.Unlock()

space, exists := tc.spaces[namespace]
if !exists {
return
}

space.httppipelines.Range(func(k, v interface{}) bool {
return walkFn(v.(*supervisor.ObjectEntity))
})
}

func (tc *TrafficController) CreateHTTPPipelineForSpec(namespace string, superSpec *supervisor.Spec) (
*supervisor.ObjectEntity, error) {

Expand Down Expand Up @@ -488,8 +549,22 @@ func (tc *TrafficController) _cleanSpace(namespace string) {

// Status returns the status of TrafficController.
func (tc *TrafficController) Status() *supervisor.Status {
// NOTE: TrafficController won't report any namespaced statuses.
// Higher controllers should report their own namespaced status.

tc.mutex.Lock()
defer tc.mutex.Unlock()

namespaces := []string{}

for namespace := range tc.spaces {
namespaces = append(namespaces, namespace)
}

return &supervisor.Status{
ObjectStatus: struct{}{},
ObjectStatus: &Status{
Namespaces: namespaces,
},
}
}

Expand All @@ -498,7 +573,7 @@ func (tc *TrafficController) Close() {
tc.mutex.Lock()
defer tc.mutex.Unlock()

for _, space := range tc.spaces {
for name, space := range tc.spaces {
space.httpservers.Range(func(k, v interface{}) bool {
entity := v.(*supervisor.ObjectEntity)
entity.CloseWithRecovery()
Expand All @@ -512,5 +587,8 @@ func (tc *TrafficController) Close() {
logger.Infof("delete http pipeline %s/%s", space.namespace, k)
return true
})

delete(tc.spaces, name)
logger.Infof("delete namespace %s", name)
}
}
1 change: 1 addition & 0 deletions pkg/supervisor/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type (

// Status is the universal status for all objects.
Status struct {
// ObjectStatus must be a map or struct (empty is allowed),
// If the ObjectStatus contains field `timestamp`,
// it will be covered by the top-level Timestamp here.
ObjectStatus interface{}
Expand Down
9 changes: 1 addition & 8 deletions pkg/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,12 @@ type (
}

// WalkFunc is the type of the function called for
// each running object visited by WalkObjectEntitys.
// walking object entity.
WalkFunc func(objectEntity *ObjectEntity) bool
)

// MustNew creates a Supervisor.
func MustNew(opt *option.Options, cls cluster.Cluster) *Supervisor {
statusToKeep := []string{}
for _, rootObject := range objectRegistry {
if rootObject.Category() == CategorySystemController {
statusToKeep = append(statusToKeep, rootObject.Kind())
}
}

s := &Supervisor{
options: opt,
cls: cls,
Expand Down