Skip to content

Commit

Permalink
[remodeling]: Cut up supervisor to handle different categories of run…
Browse files Browse the repository at this point in the history
…ning objects
  • Loading branch information
xxx7xxxx authored and benja-wu committed Feb 9, 2021
1 parent 0e122a8 commit b854282
Show file tree
Hide file tree
Showing 27 changed files with 1,138 additions and 874 deletions.
8 changes: 4 additions & 4 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ func main() {
logger.Errorf("new cluster failed: %v", err)
os.Exit(1)
}
sdl := supervisor.MustNew(opt, cls)
supervisor.InitGlobalSupervisor(sdl)
super := supervisor.MustNew(opt, cls)
supervisor.InitGlobalSupervisor(super)
api := egapi.MustNewServer(opt, cls)

if graceupdate.CallOriProcessTerm(sdl.FirstDone()) {
if graceupdate.CallOriProcessTerm(super.FirstHandleDone()) {
pidfile.Write(opt)
}

Expand Down Expand Up @@ -100,7 +100,7 @@ func main() {
wg := &sync.WaitGroup{}
wg.Add(4)
api.Close(wg)
sdl.Close(wg)
super.Close(wg)
cls.Close(wg)
profile.Close(wg)
wg.Wait()
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ require (
github.com/spf13/cobra v0.0.3
github.com/spf13/pflag v1.0.3 // indirect
github.com/tcnksm/go-httpstat v0.2.1-0.20191008022543-e866bb274419
github.com/tidwall/gjson v1.6.3
github.com/tidwall/gjson v1.6.8
github.com/tidwall/sjson v1.1.5 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce
github.com/valyala/fasttemplate v1.2.1
Expand All @@ -83,6 +84,7 @@ require (
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect
gopkg.in/yaml.v2 v2.3.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)

go 1.13
10 changes: 8 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,16 @@ github.com/tcnksm/go-httpstat v0.2.1-0.20191008022543-e866bb274419 h1:elOIj31UL4
github.com/tcnksm/go-httpstat v0.2.1-0.20191008022543-e866bb274419/go.mod h1:s3JVJFtQxtBEBC9dwcdTTXS9xFnM3SXAZwPG41aurT8=
github.com/tidwall/gjson v1.6.3 h1:aHoiiem0dr7GHkW001T1SMTJ7X5PvyekH5WX0whWGnI=
github.com/tidwall/gjson v1.6.3/go.mod h1:BaHyNc5bjzYkPqgLq7mdVzeiRtULKULXLgZFKsxEHI0=
github.com/tidwall/gjson v1.6.8 h1:CTmXMClGYPAmln7652e69B7OLXfTi5ABcPPwjIWUv7w=
github.com/tidwall/gjson v1.6.8/go.mod h1:zeFuBCIqD4sN/gmqBzZ4j7Jd6UcA2Fc56x7QFsv+8fI=
github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/match v1.0.3 h1:FQUVvBImDutD8wJLN6c5eMzWtjgONK9MwIBCOrUJKeE=
github.com/tidwall/match v1.0.3/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.0.2 h1:Z7S3cePv9Jwm1KwS0513MRaoUe3S01WPbLNV40pwWZU=
github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/sjson v1.1.5 h1:wsUceI/XDyZk3J1FUvuuYlK62zJv2HO2Pzb8A5EWdUE=
github.com/tidwall/sjson v1.1.5/go.mod h1:VuJzsZnTowhSxWdOgsAnb886i4AjEyTkk7tNtsL7EYE=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down Expand Up @@ -518,8 +524,6 @@ github.com/yudai/pp v2.0.1+incompatible h1:Q4//iY4pNF6yPLZIigmvcl7k/bPgrcTPIFIcm
github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZkTdatxwunjIkc=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v0.0.0-20190830150955-898bd1351fcf h1:of2pArXp+6ARCcJmrbkmach6gLs0y6tDvEo/GK2q7Qk=
go.etcd.io/etcd v0.0.0-20190830150955-898bd1351fcf/go.mod h1:zkTjKtRNVCiLpZfDPZhuB3hSBK2PNzSSagFy7qRzsLU=
go.etcd.io/etcd v0.0.0-20201125193152-8a03d2e9614b h1:5makfKENOTVu2bNoHzSqwwz+g70ivWLSnExzd33/2bI=
go.etcd.io/etcd v0.0.0-20201125193152-8a03d2e9614b/go.mod h1:yVHk9ub3CSBatqGNg7GRmsnfLWtoW60w4eDYfh7vHDg=
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
Expand Down Expand Up @@ -719,6 +723,8 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
8 changes: 4 additions & 4 deletions pkg/api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *Server) _plusOneVersion() int64 {
return version
}

func (s *Server) _getObject(name string) supervisor.Spec {
func (s *Server) _getObject(name string) supervisor.ObjectSpec {
value, err := s.cluster.Get(s.cluster.Layout().ConfigObjectKey(name))
if err != nil {
clusterPanic(err)
Expand All @@ -66,13 +66,13 @@ func (s *Server) _getObject(name string) supervisor.Spec {
return spec
}

func (s *Server) _listObjects() []supervisor.Spec {
func (s *Server) _listObjects() []supervisor.ObjectSpec {
kvs, err := s.cluster.GetPrefix(s.cluster.Layout().ConfigObjectPrefix())
if err != nil {
clusterPanic(err)
}

specs := make([]supervisor.Spec, 0, len(kvs))
specs := make([]supervisor.ObjectSpec, 0, len(kvs))
for _, v := range kvs {
spec, err := supervisor.SpecFromYAML(v)
if err != nil {
Expand All @@ -84,7 +84,7 @@ func (s *Server) _listObjects() []supervisor.Spec {
return specs
}

func (s *Server) _putObject(spec supervisor.Spec) {
func (s *Server) _putObject(spec supervisor.ObjectSpec) {
err := s.cluster.Put(s.cluster.Layout().ConfigObjectKey(spec.GetName()),
supervisor.YAMLFromSpec(spec))
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *Server) setupObjectAPIs() {
s.apis = append(s.apis, objAPIs...)
}

func (s *Server) readObjectSpec(ctx iris.Context) (supervisor.Spec, error) {
func (s *Server) readObjectSpec(ctx iris.Context) (supervisor.ObjectSpec, error) {
body, err := ioutil.ReadAll(ctx.Request().Body)
if err != nil {
return nil, fmt.Errorf("read body failed: %v", err)
Expand Down Expand Up @@ -241,7 +241,7 @@ func (s *Server) listStatusObjects(ctx iris.Context) {
ctx.Write(buff)
}

type specsToSort []supervisor.Spec
type specsToSort []supervisor.ObjectSpec

func (s specsToSort) Less(i, j int) bool { return s[i].GetName() < s[j].GetName() }
func (s specsToSort) Len() int { return len(s) }
Expand Down
13 changes: 10 additions & 3 deletions pkg/filter/apiaggregator/apiaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/megaease/easegateway/pkg/context"
"github.com/megaease/easegateway/pkg/logger"
"github.com/megaease/easegateway/pkg/object/httppipeline"
"github.com/megaease/easegateway/pkg/object/httpserver"
"github.com/megaease/easegateway/pkg/supervisor"
"github.com/megaease/easegateway/pkg/tracing"
"github.com/megaease/easegateway/pkg/util/httpheader"
Expand Down Expand Up @@ -146,11 +147,17 @@ func (aa *APIAggregator) Handle(ctx context.HTTPContext) (result string) {
httpResps[i] = nil
return
}
err = supervisor.Global.SendHTTPRequet(name, copyCtx)
if err != nil {
ro, exists := supervisor.Global.GetRunningObject(name, supervisor.CategoryPipeline)
if !exists {
httpResps[i] = nil
} else {
httpResps[i] = copyCtx.Response()
handler, ok := ro.Instance().(httpserver.HTTPHandler)
if !ok {
httpResps[i] = nil
} else {
handler.Handle(copyCtx)
httpResps[i] = copyCtx.Response()
}
}

}(i, proxy.HTTPProxyName, req)
Expand Down
44 changes: 27 additions & 17 deletions pkg/filter/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/megaease/easegateway/pkg/context"
"github.com/megaease/easegateway/pkg/logger"
"github.com/megaease/easegateway/pkg/object/httppipeline"
"github.com/megaease/easegateway/pkg/object/httpserver"
"github.com/megaease/easegateway/pkg/supervisor"
)

Expand Down Expand Up @@ -73,31 +74,40 @@ func (m *Bridge) Handle(ctx context.HTTPContext) (result string) {

r := ctx.Request()
dest := r.Header().Get(bridgeDestHeader)
found := false
if dest == "" {
logger.Warnf("dest not defined, will choose the first dest: %s", m.spec.Destinations[0])
err := supervisor.Global.SendHTTPRequet(m.spec.Destinations[0], ctx)
if err != nil {
logger.Errorf("failed to invoke %s", m.spec.Destinations[0])
return invokeDestFailed
dest = m.spec.Destinations[0]
found = true
} else {
for _, d := range m.spec.Destinations {
if d == dest {
r.Header().Del(bridgeDestHeader)
found = true
break
}
}
}

return ""
if !found {
logger.Errorf("dest not found: %s", dest)
return destNotFound
}

for _, d := range m.spec.Destinations {
if d == dest {
r.Header().Del(bridgeDestHeader)
err := supervisor.Global.SendHTTPRequet(d, ctx)
if err != nil {
logger.Errorf("failed to invoke %s", m.spec.Destinations[0])
return invokeDestFailed
}
return ""
}
ro, exists := supervisor.Global.GetRunningObject(dest, supervisor.CategoryPipeline)
if !exists {
logger.Errorf("failed invok %s", m.spec.Destinations[0])
return invokeDestFailed
}

handler, ok := ro.Instance().(httpserver.HTTPHandler)
if !ok {
logger.Errorf("%s is not a handler", m.spec.Destinations[0])
return invokeDestFailed
}

logger.Errorf("dest not found: %s", dest)
return destNotFound
handler.Handle(ctx)
return ""
}

// Status returns status.
Expand Down
78 changes: 48 additions & 30 deletions pkg/object/easemonitormetrics/easemonitormetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/megaease/easegateway/pkg/logger"
"github.com/megaease/easegateway/pkg/object/httppipeline"
"github.com/megaease/easegateway/pkg/object/httpserver"
"github.com/megaease/easegateway/pkg/object/statussynccontroller"
"github.com/megaease/easegateway/pkg/option"
"github.com/megaease/easegateway/pkg/supervisor"
"github.com/megaease/easegateway/pkg/util/httpstat"
Expand All @@ -29,12 +30,7 @@ var (
)

func init() {
supervisor.Register(&supervisor.ObjectRecord{
Kind: Kind,
DefaultSpecFunc: DefaultSpec,
NewFunc: New,
DependObjectKinds: nil,
})
supervisor.Register(&EaseMonitorMetrics{})

hostIPv4 = getHostIPv4()
if hostIPv4 == "" {
Expand All @@ -47,6 +43,8 @@ type (
EaseMonitorMetrics struct {
spec *Spec

ssc *statussynccontroller.StatusSyncController

// sarama.AsyncProducer
client atomic.Value
clientMutex sync.Mutex
Expand All @@ -58,7 +56,7 @@ type (

// Spec describes the EaseMonitorMetrics.
Spec struct {
supervisor.ObjectMeta `yaml:",inline"`
supervisor.ObjectMetaSpec `yaml:",inline"`

Kafka *KafkaSpec `yaml:"kafka" jsonschema:"required"`
}
Expand All @@ -71,8 +69,7 @@ type (

// Status is the status of EaseMonitorMetrics.
Status struct {
Timestamp int64 `json:"timestamp"`
Health string `json:"health"`
Health string `json:"health"`
}

// GlobalFields is the global fieilds of EaseMonitor metrics.
Expand Down Expand Up @@ -131,38 +128,49 @@ type (
}
)

// DefaultSpec returns EaseMonitorMetrics default spec.
func DefaultSpec() *Spec {
// Category returns the category of EaseMonitorMetrics.
func (emm *EaseMonitorMetrics) Category() supervisor.ObjectCategory {
return supervisor.CategoryBusinessController
}

// Kind returns the kind of EaseMonitorMetrics.
func (emm *EaseMonitorMetrics) Kind() string {
return "EaseMonitorMetrics"
}

// DefaultSpec returns the default spec of EaseMonitorMetrics.
func (emm *EaseMonitorMetrics) DefaultSpec() supervisor.ObjectSpec {
return &Spec{
Kafka: &KafkaSpec{
Brokers: []string{"localhost:9092"},
},
}
}

// Validate validates Spec.
func (spec Spec) Validate() error {
return nil
}
// Renew renews EaseMonitorMetrics.
func (emm *EaseMonitorMetrics) Renew(spec supervisor.ObjectSpec,
previousGeneration supervisor.Object, super *supervisor.Supervisor) {

// New creates an EaseMonitorMetrics.
func New(spec *Spec, prev *EaseMonitorMetrics, handlers *sync.Map) *EaseMonitorMetrics {
emm := &EaseMonitorMetrics{
spec: spec,
done: make(chan struct{}),
if previousGeneration != nil {
previousGeneration.Close()
}
if prev != nil {
prev.Close()

ssc, exists := super.GetRunningObject((&statussynccontroller.StatusSyncController{}).Kind(),
supervisor.CategorySystemController)
if !exists {
logger.Errorf("BUG: status sync controller not found")
}

emm.ssc = ssc.Instance().(*statussynccontroller.StatusSyncController)
emm.spec = spec.(*Spec)
emm.done = make(chan struct{})

_, err := emm.getClient()
if err != nil {
logger.Errorf("%s get kafka producer client failed: %v", spec.Name, err)
logger.Errorf("%s get kafka producer client failed: %v", emm.spec.Name, err)
}

go emm.run()

return emm
}

func (emm *EaseMonitorMetrics) getClient() (sarama.AsyncProducer, error) {
Expand Down Expand Up @@ -227,15 +235,15 @@ func (emm *EaseMonitorMetrics) run() {
select {
case <-emm.done:
return
case <-time.After(supervisor.SyncStatusPaceInUnixSeconds * time.Second):
case <-time.After(statussynccontroller.SyncStatusPaceInUnixSeconds * time.Second):
client, err := emm.getClient()
if err != nil {
logger.Errorf("%s get kafka producer failed: %v",
emm.spec.Name, err)
continue
}

records := supervisor.Global.GetStatusesRecords()
records := emm.ssc.GetStatusesRecords()
for _, record := range records {
if record.UnixTimestmp <= emm.latestTimestamp {
continue
Expand All @@ -254,7 +262,7 @@ func (emm *EaseMonitorMetrics) run() {
}
}

func (emm *EaseMonitorMetrics) record2Messages(record *supervisor.StatusesRecord) []*sarama.ProducerMessage {
func (emm *EaseMonitorMetrics) record2Messages(record *statussynccontroller.StatusesRecord) []*sarama.ProducerMessage {
reqMetrics := []*RequestMetrics{}
codeMetrics := []*StatusCodeMetrics{}

Expand All @@ -268,7 +276,13 @@ func (emm *EaseMonitorMetrics) record2Messages(record *supervisor.StatusesRecord
Service: objectName,
}

switch status := status.(type) {
globalStatus, ok := status.(*statussynccontroller.UniservalStatus)
if !ok {
logger.Errorf("BUG: %s want %T, got %T", emm.spec.Name,
&statussynccontroller.UniservalStatus{}, status)
}

switch status := globalStatus.ObjectStatus.(type) {
case *httppipeline.Status:
reqs, codes := emm.httpPipeline2Metrics(baseFields, status)
reqMetrics = append(reqMetrics, reqs...)
Expand Down Expand Up @@ -424,9 +438,13 @@ func (emm *EaseMonitorMetrics) httpStat2Metrics(baseFields *GlobalFields, s *htt
}

// Status returns status of EtcdServiceRegister.
func (emm *EaseMonitorMetrics) Status() *Status {
func (emm *EaseMonitorMetrics) Status() interface{} {
s := &Status{}

if emm.spec == nil {
return s
}

_, err := emm.getClient()
if err != nil {
s.Health = err.Error()
Expand Down

0 comments on commit b854282

Please sign in to comment.