Skip to content

Commit

Permalink
[engine] Exits gateway normally if stop it during startup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiyanliu authored and Jack47 committed Dec 15, 2017
1 parent 961ada2 commit 7cf6ca8
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 3 deletions.
15 changes: 14 additions & 1 deletion src/engine/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

type Gateway struct {
sync.Mutex
runLock sync.Mutex
repo config.Store
mod *model.Model
gc *cluster.GatewayCluster
Expand Down Expand Up @@ -69,16 +70,25 @@ func NewGateway() (*Gateway, error) {
repo: repo,
mod: mod,
gc: gc,
schedulers: make(map[string]pipelineScheduler, 100),
schedulers: make(map[string]pipelineScheduler),
done: make(chan error, 1),
}, nil
}

func (gw *Gateway) Close() {
gw.runLock.Lock()
defer gw.runLock.Unlock()

gw.Lock()
defer gw.Unlock()

close(gw.done)
}

func (gw *Gateway) Run() (<-chan error, error) {
gw.runLock.Lock()
defer gw.runLock.Unlock()

if !gw.startAt.IsZero() {
return nil, fmt.Errorf("gateway already started")
}
Expand All @@ -105,6 +115,9 @@ func (gw *Gateway) Run() (<-chan error, error) {
}

func (gw *Gateway) Stop() {
gw.runLock.Lock()
defer gw.runLock.Unlock()

gw.Lock()
defer gw.Unlock()

Expand Down
14 changes: 14 additions & 0 deletions src/rest/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net"
"net/http"
"sync"
"time"

"github.com/ant0ine/go-json-rest/rest"
Expand Down Expand Up @@ -48,6 +49,7 @@ func (cam *clusterAvailabilityMiddleware) MiddlewareFunc(h rest.HandlerFunc) res
////

type Rest struct {
sync.Mutex
gateway *engine.Gateway
gc *gateway.GatewayCluster
server *http.Server
Expand All @@ -68,6 +70,9 @@ func NewRest(gateway *engine.Gateway) (*Rest, error) {
}

func (s *Rest) Start() (<-chan error, string, error) {
s.Lock()
defer s.Unlock()

listenAddr := fmt.Sprintf("%s:9090", option.RestHost)

adminServer, err := newAdminServer(s.gateway)
Expand Down Expand Up @@ -165,6 +170,7 @@ func (s *Rest) Start() (<-chan error, string, error) {
// server exits after closing channel, ignore safely
recover()
}()

err := s.server.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
if err != nil && !s.stopped {
s.done <- err
Expand All @@ -176,6 +182,9 @@ func (s *Rest) Start() (<-chan error, string, error) {
}

func (s *Rest) Stop() {
s.Lock()
defer s.Unlock()

s.stopped = true

if s.server != nil {
Expand All @@ -185,10 +194,15 @@ func (s *Rest) Stop() {
} else {
logger.Debugf("[rest interface is shut down gracefully]")
}
} else {
s.done <- nil
}
}

func (s *Rest) Close() {
s.Lock()
defer s.Unlock()

close(s.done)
}

Expand Down
5 changes: 3 additions & 2 deletions src/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,16 @@ func setupHeapDumpSignalHandler() {
}
defer f.Close()

logger.Debugf("[memory profiling started, heap dump to %s]",
logger.Debugf("[memory profiling started, heap dumps to %s]",
option.MemProfileFile)

// get up-to-date statistics
logger.Infof("[full gc is executing for heap dump, this may block the entire program]")
runtime.GC()

pprof.WriteHeapProfile(f)

logger.Infof("[memory profiling finished, heap dump to %s]",
logger.Infof("[memory profiling finished, heap dumps to %s]",
option.MemProfileFile)
}()
}
Expand Down

0 comments on commit 7cf6ca8

Please sign in to comment.