Skip to content

Commit

Permalink
[serviceregistry]: Complete first edition of service registry
Browse files Browse the repository at this point in the history
  • Loading branch information
xxx7xxxx committed Nov 8, 2020
1 parent c008783 commit 57e358b
Show file tree
Hide file tree
Showing 26 changed files with 1,464 additions and 82 deletions.
2 changes: 2 additions & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func main() {
common.Exit(0, msg)
}

option.Global = opt

err = env.InitServerDir(opt)
if err != nil {
log.Printf("failed to init env: %v", err)
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module github.com/megaease/easegateway

require (
github.com/ArthurHlt/go-eureka-client v1.1.0
github.com/Joker/jade v1.0.0 // indirect
github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398 // indirect
github.com/ajg/form v0.0.0-20160822230020-523a5da1a92f // indirect
Expand All @@ -14,7 +15,7 @@ require (
github.com/facebookgo/freeport v0.0.0-20150612182905-d4adf43b75b9 // indirect
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870 // indirect
github.com/fatih/color v1.7.0
github.com/fatih/color v1.9.0
github.com/fatih/structs v1.1.0 // indirect
github.com/flosch/pongo2 v0.0.0-20180809100617-24195e6d38b0 // indirect
github.com/gavv/monotime v0.0.0-20171021193802-6f8212e8d10d // indirect
Expand All @@ -24,6 +25,7 @@ require (
github.com/google/uuid v1.1.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e // indirect
github.com/gorilla/websocket v1.4.0 // indirect
github.com/hashicorp/consul/api v1.7.0
github.com/hashicorp/golang-lru v0.5.1
github.com/imkira/go-interpol v1.1.0 // indirect
github.com/iris-contrib/blackfriday v2.0.0+incompatible // indirect
Expand Down Expand Up @@ -58,7 +60,6 @@ require (
github.com/ryanuber/columnize v2.1.0+incompatible // indirect
github.com/sergi/go-diff v1.0.0 // indirect
github.com/shurcooL/sanitized_anchor_name v0.0.0-20170918181015-86672fcb3f95 // indirect
github.com/sirupsen/logrus v1.3.0 // indirect
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d // indirect
github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c // indirect
github.com/sony/gobreaker v0.0.0-20181109014844-d928aaea92e1
Expand All @@ -80,7 +81,7 @@ require (
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c
google.golang.org/genproto v0.0.0-20190111180523-db91494dd46c // indirect
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect
gopkg.in/yaml.v2 v2.2.2
gopkg.in/yaml.v2 v2.2.8
)

go 1.13
90 changes: 90 additions & 0 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (c *cluster) getClient() (*clientv3.Client, error) {
DialTimeout: dialTimeout,
DialKeepAliveTime: dialKeepAliveTime,
DialKeepAliveTimeout: dialKeepAliveTimeout,
LogConfig: logger.EtcdClientLoggerConfig(c.opt),
LogConfig: logger.EtcdClientLoggerConfig(c.opt, logger.EtcdClientFilename),
})

if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/context/httprequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ func (r *httpRequest) finish() {
// NOTE: We don't use this line in case of large flow attack.
// io.Copy(ioutil.Discard, r.std.Body)

r.std.Body.Close()
// NOTE: The server will do it for us.
// r.std.Body.Close()

r.body.Close()
}

func (r *httpRequest) Std() *http.Request {
Expand Down
6 changes: 3 additions & 3 deletions pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ var (
restAPILogger *zap.Logger
)

// EtcdClientLoggerConfig generates the config ofetcd client logger.
func EtcdClientLoggerConfig(opt *option.Options) *zap.Config {
// EtcdClientLoggerConfig generates the config of etcd client logger.
func EtcdClientLoggerConfig(opt *option.Options, filename string) *zap.Config {
encoderConfig := defaultEncoderConfig()

level := zap.NewAtomicLevel()
level.SetLevel(zapcore.DebugLevel)

outputPaths := []string{filepath.Join(opt.AbsLogDir, EtcdClientFilename)}
outputPaths := []string{filepath.Join(opt.AbsLogDir, filename)}

return &zap.Config{
Level: level,
Expand Down
3 changes: 2 additions & 1 deletion pkg/object/httppipeline/httppipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/megaease/easegateway/pkg/context"
"github.com/megaease/easegateway/pkg/logger"
"github.com/megaease/easegateway/pkg/object/httpserver"
"github.com/megaease/easegateway/pkg/object/serviceregistry/etcdserviceregistry"
"github.com/megaease/easegateway/pkg/scheduler"
"github.com/megaease/easegateway/pkg/util/stringtool"
"github.com/megaease/easegateway/pkg/v"
Expand All @@ -30,7 +31,7 @@ func init() {
Kind: Kind,
DefaultSpecFunc: DefaultSpec,
NewFunc: New,
DependObjectKinds: []string{httpserver.Kind},
DependObjectKinds: []string{httpserver.Kind, etcdserviceregistry.Kind},
})
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
package consulserviceregistry

import (
"sync"
"time"

"github.com/megaease/easegateway/pkg/logger"
"github.com/megaease/easegateway/pkg/object/serviceregistry"
"github.com/megaease/easegateway/pkg/scheduler"

"github.com/hashicorp/consul/api"
)

const (
// Kind is ConsulServiceRegistry kind.
Kind = "ConsulServiceRegistry"
)

func init() {
scheduler.Register(&scheduler.ObjectRecord{
Kind: Kind,
DefaultSpecFunc: DefaultSpec,
NewFunc: New,
DependObjectKinds: nil,
})
}

type (
// ConsulServiceRegistry is Object ConsulServiceRegistry.
ConsulServiceRegistry struct {
spec *Spec

clientMutex sync.RWMutex
client *api.Client

statusMutex sync.Mutex
serversNum map[string]int

done chan struct{}
}

// Spec describes the ConsulServiceRegistry.
Spec struct {
scheduler.ObjectMeta `yaml:",inline"`

Address string `yaml:"address" jsonschema:"required"`
Scheme string `yaml:"scheme" jsonschema:"omitempty,enum=http,enum=https"`
Datacenter string `yaml:"datacenter" jsonschema:"omitempty"`
Token string `yaml:"token" jsonschema:"omitempty"`
Namespace string `yaml:"namespace" jsonschema:"omitempty"`
SyncInterval string `yaml:"syncInterval" jsonschema:"required,format=duration"`
ServiceTags []string `yaml:"serviceTags" jsonschema:"omitempty"`
}

// Status is the status of ConsulServiceRegistry.
Status struct {
Timestamp int64 `yaml:"timestamp"`
Health string `yaml:"health"`
ServersNum map[string]int `yaml:"serversNum"`
}
)

// DefaultSpec returns ConsulServiceRegistry default spec.
func DefaultSpec() *Spec {
return &Spec{
Address: "127.0.0.1:8500",
Scheme: "http",
SyncInterval: "10s",
}
}

// Validate validates Spec.
func (spec Spec) Validate() error {
return nil
}

// New creates an ConsulServiceRegistry.
func New(spec *Spec, prev *ConsulServiceRegistry, handlers *sync.Map) *ConsulServiceRegistry {
csr := &ConsulServiceRegistry{
spec: spec,
serversNum: map[string]int{},
done: make(chan struct{}),
}
if prev != nil {
prev.Close()
}

_, err := csr.getClient()
if err != nil {
logger.Errorf("%s get consul client failed: %v", spec.Name, err)
}

go csr.run()

return csr
}

func (csr *ConsulServiceRegistry) getClient() (*api.Client, error) {
csr.clientMutex.RLock()
if csr.client != nil {
client := csr.client
csr.clientMutex.RUnlock()
return client, nil
}
csr.clientMutex.RUnlock()

return csr.buildClient()
}

func (csr *ConsulServiceRegistry) buildClient() (*api.Client, error) {
csr.clientMutex.Lock()
defer csr.clientMutex.Unlock()

// DCL
if csr.client != nil {
return csr.client, nil
}

config := api.DefaultConfig()
config.Address = csr.spec.Address
if config.Scheme != "" {
config.Scheme = csr.spec.Scheme
}
if config.Datacenter != "" {
config.Datacenter = csr.spec.Datacenter
}
if config.Token != "" {
config.Token = csr.spec.Token
}
if config.Namespace != "" {
config.Namespace = csr.spec.Namespace
}

client, err := api.NewClient(config)

if err != nil {
return nil, err
}

csr.client = client

return client, nil
}

func (csr *ConsulServiceRegistry) closeClient() {
csr.clientMutex.Lock()
defer csr.clientMutex.Unlock()

if csr.client == nil {
return
}

csr.client = nil
}

func (csr *ConsulServiceRegistry) run() {
syncInterval, err := time.ParseDuration(csr.spec.SyncInterval)
if err != nil {
logger.Errorf("BUG: parse duration %s failed: %v",
csr.spec.SyncInterval, err)
return
}

csr.update()

for {
select {
case <-csr.done:
return
case <-time.After(syncInterval):
csr.update()
}
}
}

func (csr *ConsulServiceRegistry) update() {
client, err := csr.getClient()
if err != nil {
logger.Errorf("%s get consul client failed: %v",
csr.spec.Name, err)
return
}

q := &api.QueryOptions{
Namespace: csr.spec.Namespace,
Datacenter: csr.spec.Datacenter,
}
catalog := client.Catalog()

resp, _, err := catalog.Services(q)
if err != nil {
logger.Errorf("%s pull catalog services failed: %v",
csr.spec.Name, err)
return
}

servers := []*serviceregistry.Server{}
serversNum := map[string]int{}
for serviceName := range resp {
services, _, err := catalog.ServiceMultipleTags(serviceName,
csr.spec.ServiceTags, q)
if err != nil {
logger.Errorf("%s pull catalog service %s failed: %v",
csr.spec.Name, serviceName, err)
continue
}
for _, service := range services {
server := &serviceregistry.Server{
ServiceName: serviceName,
}
server.HostIP = service.ServiceAddress
if server.HostIP == "" {
server.HostIP = service.Address
}
server.Port = int16(service.ServicePort)
server.Tags = service.ServiceTags

if err := server.Validate(); err != nil {
logger.Errorf("invalid server: %v", err)
continue
}

servers = append(servers, server)
serversNum[serviceName]++
}
}

serviceregistry.Global.ReplaceServers(csr.spec.Name, servers)

csr.statusMutex.Lock()
csr.serversNum = serversNum
csr.statusMutex.Unlock()
}

// Status returns status of ConsulServiceRegister.
func (csr *ConsulServiceRegistry) Status() *Status {
s := &Status{}

_, err := csr.getClient()
if err != nil {
s.Health = err.Error()
} else {
s.Health = "ready"
}

csr.statusMutex.Lock()
serversNum := csr.serversNum
csr.statusMutex.Unlock()

s.ServersNum = serversNum

return s
}

// Close closes ConsulServiceRegistry.
func (csr *ConsulServiceRegistry) Close() {
csr.closeClient()
close(csr.done)

serviceregistry.Global.CloseRegistry(csr.spec.Name)
}
Loading

0 comments on commit 57e358b

Please sign in to comment.