Skip to content

Commit

Permalink
[mesh]: Keep registering spec & fix some style (#311)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxx7xxxx committed Oct 19, 2021
1 parent 514ba09 commit 96dd37d
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ doc/
example/
helm-charts/
pkg/
vendor/
vendor/
11 changes: 9 additions & 2 deletions example/config/mesh-controller-example.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
name: mesh-controller-example
kind: MeshController
name: easemesh-controller
apiPort: 13009
ingressPort: 19527
externalServiceRegistry:
heartbeatInterval: 5s
registryType: consul
externalServiceRegistry: nacos-service-registry-example
#security:
# mtlsMode: strict
# certProvider: selfSign
# rootCertTTL: 20h
# appCertTTL: 10h
2 changes: 1 addition & 1 deletion pkg/object/meshcontroller/layout/layout.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
ingress = "/mesh/ingress/%s" // + ingressName
ingressPrefix = "/mesh/ingress/"

serviceInstanceCert = "/mesh/cert/service-cert/%s/%s" // + ServiceName + InstanceID
serviceInstanceCert = "/mesh/cert/service-cert/%s/%s" // +serviceName +instanceID
allServiceCertPrefix = "/mesh/cert/service-cert/"
rootCert = "/mesh/cert/root-cert"
ingressControllerInstanceCertKey = "/mesh/cert/ingress-controller-cert/%s"
Expand Down
5 changes: 3 additions & 2 deletions pkg/object/meshcontroller/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,14 @@ func (m *Master) cleanDeadInstances() {
if err != nil {
api.ClusterPanic(err)
} else {
logger.Infof("deleted speckey: %s", specKey)
logger.Infof("clean instance spec: %s", specKey)
}

statusKey := layout.ServiceInstanceStatusKey(_spec.ServiceName, _spec.InstanceID)
if err = m.store.Delete(statusKey); err != nil {
api.ClusterPanic(err)
} else {
logger.Infof("deleted statuskey: %s", statusKey)
logger.Infof("clean instance status: %s", statusKey)
}
}
}
Expand Down
81 changes: 44 additions & 37 deletions pkg/object/meshcontroller/registrycenter/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,51 +151,58 @@ func needUpdateRecord(originIns, ins *spec.ServiceInstanceSpec) bool {
}

func (rcs *Server) register(ins *spec.ServiceInstanceSpec, ingressReady ReadyFunc, egressReady ReadyFunc) {
var tryTimes int
routine := func() (err error) {
defer func() {
if err1 := recover(); err1 != nil {
logger.Errorf("registry center recover from: %v, stack trace:\n%s\n",
err, debug.Stack())
err = fmt.Errorf("%v", err1)
}
}()

if !ingressReady() || !egressReady() {
return fmt.Errorf("ingress ready: %v egress ready: %v", ingressReady(), egressReady())
}

if originIns := rcs.service.GetServiceInstanceSpec(rcs.serviceName, rcs.instanceID); originIns != nil {
if !needUpdateRecord(originIns, ins) {
rcs.mutex.Lock()
rcs.registered = true
rcs.mutex.Unlock()
return nil
}
}

ins.Status = spec.ServiceStatusUp
ins.RegistryTime = time.Now().Format(time.RFC3339)
rcs.service.PutServiceInstanceSpec(ins)

rcs.mutex.Lock()
rcs.registered = true
rcs.mutex.Unlock()

return nil
}

var tryTimes int
var firstSucceed bool
for {
select {
case <-rcs.done:
return
default:
rcs.mutex.Lock()
if rcs.registered {
rcs.mutex.Unlock()
return
}
// wrapper for the recover
routine := func() {
defer func() {
if err := recover(); err != nil {
logger.Errorf("registry center recover from: %v, stack trace:\n%s\n",
err, debug.Stack())
}
}()
// level triggered, loop until it success
tryTimes++
if !ingressReady() || !egressReady() {
logger.Infof("ingress ready: %v egress ready: %v", ingressReady(), egressReady())
return
tryTimes++

err := routine()
if err != nil {
logger.Errorf("register failed: %v", err)
time.Sleep(5 * time.Second)
} else {
if !firstSucceed {
logger.Infof("register instance spec succeed")
firstSucceed = true
}

if originIns := rcs.service.GetServiceInstanceSpec(rcs.serviceName, rcs.instanceID); originIns != nil {
logger.Infof("register in original ins: %#v, current ins: %#v", originIns, ins)
if !needUpdateRecord(originIns, ins) {
rcs.registered = true
return
}
}

ins.Status = spec.ServiceStatusUp
ins.RegistryTime = time.Now().Format(time.RFC3339)
rcs.registered = true
rcs.service.PutServiceInstanceSpec(ins)
logger.Infof("registry SUCC service: %s instanceID: %s registry try times: %d", ins.ServiceName, ins.InstanceID, tryTimes)
}

routine()
time.Sleep(1 * time.Second)
rcs.mutex.Unlock()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/object/meshcontroller/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ kind: HTTPServer
name: %s
port: %d
keepAlive: false
https: %s
https: %s
certBase64: %s
keyBase64: %s
caCertBase64: %s
Expand Down
7 changes: 5 additions & 2 deletions pkg/object/meshcontroller/worker/egress.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@ func NewEgressServer(superSpec *supervisor.Spec, super *supervisor.Supervisor,
panic(fmt.Errorf("BUG: want *TrafficController, got %T", entity.Instance()))
}

inf := informer.NewInformer(storage.New(superSpec.Name(), super.Cluster()), serviceName)

return &EgressServer{
super: super,
superSpec: superSpec,

inf: informer.NewInformer(storage.New(superSpec.Name(), super.Cluster()), serviceName),
inf: inf,
tc: tc,
namespace: fmt.Sprintf("%s/%s", superSpec.Name(), "egress"),
pipelines: make(map[string]*supervisor.ObjectEntity),
Expand Down Expand Up @@ -211,7 +213,7 @@ func (egs *EgressServer) reloadHTTPServer(specs map[string]*spec.Service) bool {
if admSpec.EnablemTLS() {
cert = egs.service.GetServiceInstanceCert(egs.serviceName, egs.instanceID)
rootCert = egs.service.GetRootCert()
logger.Infof("egress enable TLS, init pipeline with cert: %#v", cert)
logger.Infof("egress enable TLS")
}

pipelines := make(map[string]*supervisor.ObjectEntity)
Expand Down Expand Up @@ -300,6 +302,7 @@ func (egs *EgressServer) Close() {
defer egs.mutex.Unlock()

egs.inf.Close()

if egs._ready() {
egs.tc.DeleteHTTPServer(egs.namespace, egs.httpServer.Spec().Name())
for _, entity := range egs.pipelines {
Expand Down
5 changes: 4 additions & 1 deletion pkg/object/meshcontroller/worker/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func NewIngressServer(superSpec *supervisor.Spec, super *supervisor.Supervisor,
panic(fmt.Errorf("BUG: want *TrafficController, got %T", entity.Instance()))
}

inf := informer.NewInformer(storage.New(superSpec.Name(), super.Cluster()), serviceName)

return &IngressServer{
super: super,
superSpec: superSpec,
Expand All @@ -78,7 +80,7 @@ func NewIngressServer(superSpec *supervisor.Spec, super *supervisor.Supervisor,
httpServer: nil,
serviceName: serviceName,
instanceID: instaceID,
inf: informer.NewInformer(storage.New(superSpec.Name(), super.Cluster()), serviceName),
inf: inf,
mutex: sync.RWMutex{},
service: service,
}
Expand Down Expand Up @@ -229,6 +231,7 @@ func (ings *IngressServer) Close() {
defer ings.mutex.Unlock()

ings.inf.Close()

if ings._ready() {
ings.tc.DeleteHTTPServer(ings.namespace, ings.httpServer.Spec().Name())
for _, entity := range ings.pipelines {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/jmxtool/agent_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (agent *AgentClient) UpdateService(newService *spec.Service, version int64)
if err != nil {
return fmt.Errorf("handleRequest error: %v", err)
}
logger.Infof("Update Service, URL: %s,request: %s, result: %v", url, string(bytes), string(bodyString))
logger.Debugf("update service: URL: %s request: %s result: %v", url, string(bytes), string(bodyString))
return err
}

Expand Down

0 comments on commit 96dd37d

Please sign in to comment.