Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dynamic admin API && correct syncer && make interface cleaner #96

Merged
merged 1 commit into from
Jul 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ build_server:
CGO_ENABLED=0 go build -v -trimpath -ldflags ${GO_LD_FLAGS} \
-o ${TARGET_SERVER} ${MKFILE_DIR}cmd/server

dev_build: dev_build_client dev_build_server

dev_build_client:
@echo "build dev client"
cd ${MKFILE_DIR} && \
Expand Down
8 changes: 3 additions & 5 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,10 @@ func main() {
os.Exit(1)
}

apiServer := api.MustNewServer(opt, cls)

// NOTE: Supervisor needs to be created after API server.
// Because the objects created by supervisor could use global API server.
super := supervisor.MustNew(opt, cls)

apiServer := api.MustNewServer(opt, cls, super)

if graceupdate.CallOriProcessTerm(super.FirstHandleDone()) {
pidfile.Write(opt)
}
Expand All @@ -99,7 +97,7 @@ func main() {
}
restartCls := func() {
cls.StartServer()
apiServer = api.MustNewServer(opt, cls)
apiServer = api.MustNewServer(opt, cls, super)
}
graceupdate.NotifySigUsr2(closeCls, restartCls)

Expand Down
20 changes: 10 additions & 10 deletions example/sbin/status.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ cfgfile=$APPDIR/conf/config.yaml
option=$1
if [ "$option" = "-l" -o "$option" = "--long" ]
then
./egctl.sh member list
./egctl.sh member list
exit $?
fi

Expand All @@ -19,13 +19,13 @@ NC='\033[0m' # No Color


{
echo "Cluster Member Role ETCD Status LocalPeer Client API"
echo "Cluster Member Role Etcd Status LocalPeer Client API"
{
./egctl.sh member list \
| egrep 'cluster-name:|\bname\b:|cluster-role:|lastHeartbeatTime:|peer|client|api-addr|\bstate\b:' \
| while read line
do
if `echo $line | grep -q "cluster-name:"`
if `echo $line | grep -q "cluster-name:"`
then
cluster=`echo $line | awk '{print $2}'`
elif `echo $line | grep -q '^[ \t]*name:'`
Expand Down Expand Up @@ -62,14 +62,14 @@ NC='\033[0m' # No Color
if [ -n "$cluster" ] && [ -n "$name" ] && [ -n "$role" ] && [ -n "$status" ] \
&& [ -n "$peer" ] && [ -n "$client" ] && [ -n $api ]
then
if [ $role = "writer" ]
if [ $role = "writer" ]
then
if [ ! -n "$etcd" ]
then
continue
else
else
if [ $status = "offline" ]
then
then
etcd='Down'
fi
fi
Expand All @@ -86,12 +86,12 @@ NC='\033[0m' # No Color
unset etcd
fi
done
} | sort
} | column -t
} | sort
} | column -t

pid=`ps -eo pid,args | grep "$server" | grep -v grep | awk '{print $1}'`
if [ -n "$pid" ]
then
if [ -n "$pid" ]
then
alive="yes"
else
alive="no"
Expand Down
132 changes: 76 additions & 56 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ package api
import (
"fmt"
"net/http"
"sort"
"sync"
"time"

"github.com/megaease/easegress/pkg/logger"
yaml "gopkg.in/yaml.v2"
)

Expand All @@ -41,74 +44,86 @@ const (
ConfigVersionKey = "X-Config-Version"
)

func (s *Server) setupAPIs() {
s.setupListAPIs()
s.setupMemberAPIs()
s.setupObjectAPIs()
s.setupMetadaAPIs()
s.setupHealthAPIs()
s.setupAboutAPIs()
var (
apisMutex = sync.Mutex{}
apis = make(map[string]*APIGroup)
apisChangeChan = make(chan struct{}, 10)
)

type apisbyOrder []*APIGroup

func (a apisbyOrder) Less(i, j int) bool { return a[i].Group < a[j].Group }
func (a apisbyOrder) Len() int { return len(a) }
func (a apisbyOrder) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

// RegisterAPIs registers global admin APIs.
func RegisterAPIs(apiGroup *APIGroup) {
apisMutex.Lock()
defer apisMutex.Unlock()

_, exists := apis[apiGroup.Group]
if exists {
logger.Errorf("group %s existed", apiGroup.Group)
}
apis[apiGroup.Group] = apiGroup

logger.Infof("register api group %s", apiGroup.Group)
apisChangeChan <- struct{}{}
}

func UnregisterAPIs(group string) {
apisMutex.Lock()
defer apisMutex.Unlock()

_, exists := apis[group]
if !exists {
logger.Errorf("group %s not found", group)
return
}

delete(apis, group)

logger.Infof("unregister api group %s", group)
apisChangeChan <- struct{}{}
}

func (s *Server) setupListAPIs() {
listAPIs := []*APIEntry{
func (s *Server) registerAPIs() {
group := &APIGroup{
Group: "admin",
}
group.Entries = append(group.Entries, s.listAPIEntries()...)
group.Entries = append(group.Entries, s.memberAPIEntries()...)
group.Entries = append(group.Entries, s.objectAPIEntries()...)
group.Entries = append(group.Entries, s.metadataAPIEntries()...)
group.Entries = append(group.Entries, s.healthAPIEntries()...)
group.Entries = append(group.Entries, s.aboutAPIEntries()...)

RegisterAPIs(group)
}

func (s *Server) listAPIEntries() []*APIEntry {
return []*APIEntry{
{
Path: "",
Method: "GET",
Handler: s.listAPIs,
},
}

s.RegisterAPIs(listAPIs)
}

// RegisterAPIs registers APIs.
func (s *Server) RegisterAPIs(apis []*APIEntry) {
s.apisMutex.Lock()
defer s.apisMutex.Unlock()

s.apis = append(s.apis, apis...)

for _, api := range apis {
api.Path = APIPrefix + api.Path
switch api.Method {
case "GET":
s.router.Get(api.Path, api.Handler)
case "HEAD":
s.router.Head(api.Path, api.Handler)
case "PUT":
s.router.Put(api.Path, api.Handler)
case "POST":
s.router.Post(api.Path, api.Handler)
case "PATCH":
s.router.Patch(api.Path, api.Handler)
case "DELETE":
s.router.Delete(api.Path, api.Handler)
case "CONNECT":
s.router.Connect(api.Path, api.Handler)
case "OPTIONS":
s.router.Options(api.Path, api.Handler)
case "TRACE":
s.router.Trace(api.Path, api.Handler)
}
}
}

func (s *Server) setupHealthAPIs() {
healthAPIs := []*APIEntry{
func (s *Server) healthAPIEntries() []*APIEntry {
return []*APIEntry{
{
// https://stackoverflow.com/a/43381061/1705845
Path: "/healthz",
Method: "GET",
Handler: func(w http.ResponseWriter, r *http.Request) { /* 200 by default */ },
},
}

s.RegisterAPIs(healthAPIs)
}

func (s *Server) setupAboutAPIs() {
aboutAPIs := []*APIEntry{
func (s *Server) aboutAPIEntries() []*APIEntry {
return []*APIEntry{
{
Path: "/about",
Method: "GET",
Expand All @@ -118,19 +133,24 @@ func (s *Server) setupAboutAPIs() {
},
},
}

s.RegisterAPIs(aboutAPIs)
}

func (s *Server) listAPIs(w http.ResponseWriter, r *http.Request) {
s.apisMutex.RLock()
defer s.apisMutex.RUnlock()
apisMutex.Lock()
defer apisMutex.Unlock()

apiGroups := []*APIGroup{}

for _, group := range apis {
apiGroups = append(apiGroups, group)
}

sort.Sort(apisbyOrder(apiGroups))

buff, err := yaml.Marshal(s.apis)
buff, err := yaml.Marshal(apiGroups)
if err != nil {
panic(fmt.Errorf("marshal %#v to yaml failed: %v", s.apis, err))
panic(fmt.Errorf("marshal %#v to yaml failed: %v", apiGroups, err))
}
w.Header().Set("Content-Type", "text/vnd.yaml")
w.Write(buff)
return
}
4 changes: 2 additions & 2 deletions pkg/api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *Server) _getObject(name string) *supervisor.Spec {
return nil
}

spec, err := supervisor.NewSpec(*value)
spec, err := s.super.NewSpec(*value)
if err != nil {
panic(fmt.Errorf("bad spec(err: %v) from yaml: %s", err, *value))
}
Expand All @@ -91,7 +91,7 @@ func (s *Server) _listObjects() []*supervisor.Spec {

specs := make([]*supervisor.Spec, 0, len(kvs))
for _, v := range kvs {
spec, err := supervisor.NewSpec(v)
spec, err := s.super.NewSpec(v)
if err != nil {
panic(fmt.Errorf("bad spec(err: %v) from yaml: %s", err, v))
}
Expand Down
102 changes: 102 additions & 0 deletions pkg/api/dynamicmux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package api

import (
"net/http"
"sort"
"sync/atomic"

"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/megaease/easegress/pkg/logger"
)

type (
dynamicMux struct {
server *Server
done chan struct{}
router atomic.Value
}
)

func newDynamicMux(server *Server) *dynamicMux {
m := &dynamicMux{
server: server,
done: make(chan struct{}),
}

m.router.Store(chi.NewRouter())

go m.run()

return m
}

func (m *dynamicMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.router.Load().(*chi.Mux).ServeHTTP(w, r)
}

func (m *dynamicMux) run() {
for {
select {
case <-m.done:
return
case <-apisChangeChan:
m.reloadAPIs()
}
}
}

func (m *dynamicMux) reloadAPIs() {
apisMutex.Lock()
defer apisMutex.Unlock()

apiGroups := []*APIGroup{}

for _, group := range apis {
apiGroups = append(apiGroups, group)
}

sort.Sort(apisbyOrder(apiGroups))

router := chi.NewMux()
router.Use(middleware.StripSlashes)
router.Use(m.newAPILogger)
router.Use(m.newConfigVersionAttacher)
router.Use(m.newRecoverer)

for _, apiGroup := range apiGroups {
for _, api := range apiGroup.Entries {
path := APIPrefix + api.Path

switch api.Method {
case "GET":
router.Get(path, api.Handler)
case "HEAD":
router.Head(path, api.Handler)
case "PUT":
router.Put(path, api.Handler)
case "POST":
router.Post(path, api.Handler)
case "PATCH":
router.Patch(path, api.Handler)
case "DELETE":
router.Delete(path, api.Handler)
case "CONNECT":
router.Connect(path, api.Handler)
case "OPTIONS":
router.Options(path, api.Handler)
case "TRACE":
router.Trace(path, api.Handler)
default:
logger.Errorf("BUG: group %s unsupported method: %s",
apiGroup.Group, api.Method)
}
}
}

m.router.Store(router)
}

func (m *dynamicMux) close() {
close(m.done)
}
Loading