Skip to content

Commit

Permalink
[mesh]: Complete canary and loadbalance apis
Browse files Browse the repository at this point in the history
  • Loading branch information
xxx7xxxx committed Mar 6, 2021
1 parent 0287f77 commit 77ce590
Show file tree
Hide file tree
Showing 16 changed files with 554 additions and 144 deletions.
10 changes: 5 additions & 5 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"
"syscall"

egapi "github.com/megaease/easegateway/pkg/api"
"github.com/megaease/easegateway/pkg/api"
"github.com/megaease/easegateway/pkg/cluster"
"github.com/megaease/easegateway/pkg/common"
"github.com/megaease/easegateway/pkg/env"
Expand Down Expand Up @@ -68,7 +68,7 @@ func main() {
}
super := supervisor.MustNew(opt, cls)
supervisor.InitGlobalSupervisor(super)
api := egapi.MustNewServer(opt, cls)
apiServer := api.MustNewServer(opt, cls)

if graceupdate.CallOriProcessTerm(super.FirstHandleDone()) {
pidfile.Write(opt)
Expand All @@ -77,13 +77,13 @@ func main() {
closeCls := func() {
wg := &sync.WaitGroup{}
wg.Add(2)
api.Close(wg)
apiServer.Close(wg)
cls.CloseServer(wg)
wg.Wait()
}
restartCls := func() {
cls.StartServer()
api = egapi.MustNewServer(opt, cls)
apiServer = api.MustNewServer(opt, cls)
}
graceupdate.NotifySigUsr2(closeCls, restartCls)

Expand All @@ -99,7 +99,7 @@ func main() {

wg := &sync.WaitGroup{}
wg.Add(4)
api.Close(wg)
apiServer.Close(wg)
super.Close(wg)
cls.Close(wg)
profile.Close(wg)
Expand Down
15 changes: 11 additions & 4 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ type (
}
)

var (
// GlobalServer is the global api server.
GlobalServer *Server
)

// MustNewServer creates an api server.
func MustNewServer(opt *option.Options, cluster cluster.Cluster) *Server {
app := iris.New()
Expand Down Expand Up @@ -87,6 +92,8 @@ func MustNewServer(opt *option.Options, cluster cluster.Cluster) *Server {
}
}()

GlobalServer = s

return s
}

Expand Down Expand Up @@ -215,24 +222,24 @@ func (s *Server) getMutex() (cluster.Mutex, error) {
func (s *Server) Lock() {
mutex, err := s.getMutex()
if err != nil {
clusterPanic(err)
ClusterPanic(err)
}

err = mutex.Lock()
if err != nil {
clusterPanic(err)
ClusterPanic(err)
}
}

// Unlock unlocks cluster operations.
func (s *Server) Unlock() {
mutex, err := s.getMutex()
if err != nil {
clusterPanic(err)
ClusterPanic(err)
}

err = mutex.Unlock()
if err != nil {
clusterPanic(err)
ClusterPanic(err)
}
}
22 changes: 11 additions & 11 deletions pkg/api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
func (s *Server) _purgeMember(memberName string) {
err := s.cluster.PurgeMember(memberName)
if err != nil {
clusterPanic(fmt.Errorf("purge member %s failed: %s", memberName, err))
ClusterPanic(fmt.Errorf("purge member %s failed: %s", memberName, err))
}
}

func (s *Server) _getVersion() int64 {
value, err := s.cluster.Get(s.cluster.Layout().ConfigVersion())
if err != nil {
clusterPanic(err)
ClusterPanic(err)
}

if value == nil {
Expand All @@ -42,7 +42,7 @@ func (s *Server) _plusOneVersion() int64 {

err := s.cluster.Put(s.cluster.Layout().ConfigVersion(), value)
if err != nil {
clusterPanic(err)
ClusterPanic(err)
}

return version
Expand All @@ -51,7 +51,7 @@ func (s *Server) _plusOneVersion() int64 {
func (s *Server) _getObject(name string) *supervisor.Spec {
value, err := s.cluster.Get(s.cluster.Layout().ConfigObjectKey(name))
if err != nil {
clusterPanic(err)
ClusterPanic(err)
}

if value == nil {
Expand All @@ -69,7 +69,7 @@ func (s *Server) _getObject(name string) *supervisor.Spec {
func (s *Server) _listObjects() []*supervisor.Spec {
kvs, err := s.cluster.GetPrefix(s.cluster.Layout().ConfigObjectPrefix())
if err != nil {
clusterPanic(err)
ClusterPanic(err)
}

specs := make([]*supervisor.Spec, 0, len(kvs))
Expand All @@ -88,22 +88,22 @@ func (s *Server) _putObject(spec *supervisor.Spec) {
err := s.cluster.Put(s.cluster.Layout().ConfigObjectKey(spec.Name()),
spec.YAMLConfig())
if err != nil {
clusterPanic(err)
ClusterPanic(err)
}
}

func (s *Server) _deleteObject(name string) {
err := s.cluster.Delete(s.cluster.Layout().ConfigObjectKey(name))
if err != nil {
clusterPanic(err)
ClusterPanic(err)
}
}

func (s *Server) _getStatusObject(name string) map[string]string {
prefix := s.cluster.Layout().StatusObjectPrefix(name)
kvs, err := s.cluster.GetPrefix(prefix)
if err != nil {
clusterPanic(err)
ClusterPanic(err)
}

status := make(map[string]string)
Expand All @@ -119,7 +119,7 @@ func (s *Server) _listStatusObjects() map[string]map[string]interface{} {
prefix := s.cluster.Layout().StatusObjectsPrefix()
kvs, err := s.cluster.GetPrefix(prefix)
if err != nil {
clusterPanic(err)
ClusterPanic(err)
}

status := make(map[string]map[string]interface{})
Expand All @@ -128,7 +128,7 @@ func (s *Server) _listStatusObjects() map[string]map[string]interface{} {

om := strings.Split(k, "/")
if len(om) != 2 {
clusterPanic(fmt.Errorf("the key %s can't be split into two fields by /", k))
ClusterPanic(fmt.Errorf("the key %s can't be split into two fields by /", k))
}
objectName, memberName := om[0], om[1]
_, exists := status[objectName]
Expand All @@ -140,7 +140,7 @@ func (s *Server) _listStatusObjects() map[string]map[string]interface{} {
i := map[string]interface{}{}
err = yaml.Unmarshal([]byte(v), &i)
if err != nil {
clusterPanic(fmt.Errorf("unmarshal %s to yaml failed: %v", v, err))
ClusterPanic(fmt.Errorf("unmarshal %s to yaml failed: %v", v, err))
}
status[objectName][memberName] = i
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/api/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ func (ce clusterErr) Error() string {
return string(ce)
}

func clusterPanic(err error) {
// ClusterPanic panics because of the cluster-level fault.
func ClusterPanic(err error) {
panic(clusterErr(err.Error()))
}

func handleAPIError(ctx iris.Context, code int, err error) {
// HandleAPIError handles api error.
func HandleAPIError(ctx iris.Context, code int, err error) {
ctx.StatusCode(code)
buff, err := yaml.Marshal(APIErr{
Code: code,
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (r ListMembersResp) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
func (s *Server) listMembers(ctx iris.Context) {
kv, err := s.cluster.GetPrefix(s.cluster.Layout().StatusMemberPrefix())
if err != nil {
clusterPanic(err)
ClusterPanic(err)
}

resp := make(ListMembersResp, 0)
Expand Down Expand Up @@ -72,10 +72,10 @@ func (s *Server) purgeMember(ctx iris.Context) {

leaseStr, err := s.cluster.Get(s.cluster.Layout().OtherLease(memberName))
if err != nil {
clusterPanic(err)
ClusterPanic(err)
}
if leaseStr == nil {
handleAPIError(ctx, iris.StatusNotFound, fmt.Errorf("not found"))
HandleAPIError(ctx, iris.StatusNotFound, fmt.Errorf("not found"))
return
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/api/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *Server) getFilterDescription(ctx iris.Context) {

fm, exits := filterMetaBook[kind]
if !exits {
handleAPIError(ctx, iris.StatusNotFound, fmt.Errorf("not found"))
HandleAPIError(ctx, iris.StatusNotFound, fmt.Errorf("not found"))
return
}

Expand All @@ -106,7 +106,7 @@ func (s *Server) getFilterSchema(ctx iris.Context) {

fm, exits := filterMetaBook[kind]
if !exits {
handleAPIError(ctx, iris.StatusNotFound, fmt.Errorf("not found"))
HandleAPIError(ctx, iris.StatusNotFound, fmt.Errorf("not found"))
return
}

Expand All @@ -124,7 +124,7 @@ func (s *Server) getFilterResults(ctx iris.Context) {

fm, exits := filterMetaBook[kind]
if !exits {
handleAPIError(ctx, iris.StatusNotFound, fmt.Errorf("not found"))
HandleAPIError(ctx, iris.StatusNotFound, fmt.Errorf("not found"))
return
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/api/middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func newRecoverer() func(context.Context) {
logger.Errorf("recover from %s, err: %v, stack trace:\n%s\n",
ctx.HandlerName(), err, debug.Stack())
if ce, ok := err.(clusterErr); ok {
handleAPIError(ctx, http.StatusServiceUnavailable, ce)
HandleAPIError(ctx, http.StatusServiceUnavailable, ce)
} else {
handleAPIError(ctx, http.StatusInternalServerError, fmt.Errorf("%v", err))
HandleAPIError(ctx, http.StatusInternalServerError, fmt.Errorf("%v", err))
}
}
}()
Expand Down
16 changes: 8 additions & 8 deletions pkg/api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s *Server) upgradeConfigVersion(ctx iris.Context) {
func (s *Server) createObject(ctx iris.Context) {
spec, err := s.readObjectSpec(ctx)
if err != nil {
handleAPIError(ctx, iris.StatusBadRequest, err)
HandleAPIError(ctx, iris.StatusBadRequest, err)
return
}

Expand All @@ -112,7 +112,7 @@ func (s *Server) createObject(ctx iris.Context) {

existedSpec := s._getObject(name)
if existedSpec != nil {
handleAPIError(ctx, iris.StatusConflict, fmt.Errorf("conflict name: %s", name))
HandleAPIError(ctx, iris.StatusConflict, fmt.Errorf("conflict name: %s", name))
return
}

Expand All @@ -132,7 +132,7 @@ func (s *Server) deleteObject(ctx iris.Context) {

spec := s._getObject(name)
if spec == nil {
handleAPIError(ctx, iris.StatusNotFound, fmt.Errorf("not found"))
HandleAPIError(ctx, iris.StatusNotFound, fmt.Errorf("not found"))
return
}

Expand All @@ -147,7 +147,7 @@ func (s *Server) getObject(ctx iris.Context) {

spec := s._getObject(name)
if spec == nil {
handleAPIError(ctx, iris.StatusNotFound, fmt.Errorf("not found"))
HandleAPIError(ctx, iris.StatusNotFound, fmt.Errorf("not found"))
return
}

Expand All @@ -159,7 +159,7 @@ func (s *Server) getObject(ctx iris.Context) {
func (s *Server) updateObject(ctx iris.Context) {
spec, err := s.readObjectSpec(ctx)
if err != nil {
handleAPIError(ctx, iris.StatusBadRequest, err)
HandleAPIError(ctx, iris.StatusBadRequest, err)
return
}

Expand All @@ -170,12 +170,12 @@ func (s *Server) updateObject(ctx iris.Context) {

existedSpec := s._getObject(name)
if existedSpec == nil {
handleAPIError(ctx, iris.StatusNotFound, fmt.Errorf("not found"))
HandleAPIError(ctx, iris.StatusNotFound, fmt.Errorf("not found"))
return
}

if existedSpec.Kind() != spec.Kind() {
handleAPIError(ctx, iris.StatusBadRequest,
HandleAPIError(ctx, iris.StatusBadRequest,
fmt.Errorf("different kinds: %s, %s",
existedSpec.Kind(), spec.Kind()))
return
Expand Down Expand Up @@ -207,7 +207,7 @@ func (s *Server) getStatusObject(ctx iris.Context) {
spec := s._getObject(name)

if spec == nil {
handleAPIError(ctx, iris.StatusNotFound, fmt.Errorf("not found"))
HandleAPIError(ctx, iris.StatusNotFound, fmt.Errorf("not found"))
return
}

Expand Down
24 changes: 12 additions & 12 deletions pkg/object/meshcontroller/layout/layout.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,32 @@ const (
tenantPrefix = "/mesh/tenants"
)

// GenServerKey generates storage serivce key
func GenServerKey(serviceName string) string {
// ServiceKey returns serivce key.
func ServiceKey(serviceName string) string {
return fmt.Sprintf(serviceSpecFormat, serviceName)
}

// GenServiceInstanceKey generates storage instance key
func GenServiceInstanceKey(serviceName, instanceID string) string {
// ServiceInstanceKey returns service instance key.
func ServiceInstanceKey(serviceName, instanceID string) string {
return fmt.Sprintf(serviceInstanceFormat, serviceName, instanceID)
}

// GenServiceInstancePrefix generates one serivce's storage instance prefix
func GenServiceInstancePrefix(serviceName string) string {
// ServiceInstancePrefix returns prefix of the serivce instances.
func ServiceInstancePrefix(serviceName string) string {
return fmt.Sprintf(serviceInstancePrefixFormat, serviceName)
}

// GenServiceHeartbeatKey generates storage instance hearbeat key
func GenServiceHeartbeatKey(serviceName, instanceID string) string {
// ServiceHeartbeatKey returns service instance hearbeat key.
func ServiceHeartbeatKey(serviceName, instanceID string) string {
return fmt.Sprintf(serviceInstanceFormat, serviceName, instanceID)
}

// GenTenantKey generates storage tenant key
func GenTenantKey(tenant string) string {
// TenantKey returns tenant key.
func TenantKey(tenant string) string {
return fmt.Sprintf(tenantFormat, tenant)
}

// GetTenantPrefix gets the tenant storage prefix
func GetTenantPrefix() string {
// TenantPrefix returns tenant prefix.
func TenantPrefix() string {
return tenantPrefix
}
Loading

0 comments on commit 77ce590

Please sign in to comment.