Skip to content

Commit

Permalink
make httpserver not restart when maxConnections changed (#480)
Browse files Browse the repository at this point in the history
* make httpserver not restart when maxConnections changed

* update according to code review

* fix init httpserver
  • Loading branch information
qdongxu committed Mar 22, 2020
1 parent 05ac825 commit 12ea643
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 5 deletions.
74 changes: 74 additions & 0 deletions pkg/object/httpserver/listen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package httpserver

import (
"net"
"sync"

sem2 "github.com/megaease/easegateway/pkg/util/sem"
)

// LimitListener returns a Listener that accepts at most n simultaneous
// connections from the provided Listener.
func NewLimitListener(l net.Listener, n uint32) *LimitListener {
return &LimitListener{
Listener: l,
sem: sem2.NewSem(n),
done: make(chan struct{}),
}
}

type LimitListener struct {
net.Listener
sem *sem2.Semaphore
closeOnce sync.Once // ensures the done chan is only closed once
done chan struct{} // no values sent; closed when Close is called
}

// acquire acquires the limiting semaphore. Returns true if successfully
// accquired, false if the listener is closed and the semaphore is not
// acquired.
func (l *LimitListener) acquire() bool {
select {
case <-l.done:
return false
case <-l.sem.AcquireRaw():
return true
}
}
func (l *LimitListener) release() { l.sem.Release() }

func (l *LimitListener) Accept() (net.Conn, error) {
acquired := l.acquire()
// If the semaphore isn't acquired because the listener was closed, expect
// that this call to accept won't block, but immediately return an error.
c, err := l.Listener.Accept()
if err != nil {
if acquired {
l.release()
}
return nil, err
}
return &limitListenerConn{Conn: c, release: l.release}, nil
}

func (l *LimitListener) SetMaxConnection(n uint32) {
l.sem.SetMaxCount(n)
}

func (l *LimitListener) Close() error {
err := l.Listener.Close()
l.closeOnce.Do(func() { close(l.done) })
return err
}

type limitListenerConn struct {
net.Conn
releaseOnce sync.Once
release func()
}

func (l *limitListenerConn) Close() error {
err := l.Conn.Close()
l.releaseOnce.Do(l.release)
return err
}
17 changes: 12 additions & 5 deletions pkg/object/httpserver/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/megaease/easegateway/pkg/logger"
"github.com/megaease/easegateway/pkg/util/httpstat"
"github.com/megaease/easegateway/pkg/util/topn"

"golang.org/x/net/netutil"
)

const (
Expand Down Expand Up @@ -58,8 +56,9 @@ type (
state atomic.Value // stateType
err atomic.Value // error

httpStat *httpstat.HTTPStat
topN *topn.TopN
httpStat *httpstat.HTTPStat
topN *topn.TopN
limitListener *LimitListener
}

// Status contains all status gernerated by runtime, for displaying to users.
Expand Down Expand Up @@ -148,6 +147,11 @@ func (r *runtime) reload(nextSpec *Spec) {
r.mux.reloadRules(nextSpec)
}

// r.limitListener does not created just after the process started and the config load for the first time.
if nextSpec != nil && r.limitListener != nil {
r.limitListener.SetMaxConnection(nextSpec.MaxConnections)
}

// NOTE: Due to the mechanism of scheduler,
// nextSpec must not be nil, just defensive programming here.
switch {
Expand Down Expand Up @@ -202,6 +206,8 @@ func (r *runtime) needRestartServer(nextSpec *Spec) bool {
y := *nextSpec
x.Rules, y.Rules = nil, nil

x.MaxConnections, y.MaxConnections = 0, 0

// The update of rules need not to shutdown server.
return !reflect.DeepEqual(x, y)
}
Expand All @@ -226,7 +232,8 @@ func (r *runtime) startServer() {
return
}

limitListener := netutil.LimitListener(listener, int(r.spec.MaxConnections))
limitListener := NewLimitListener(listener, r.spec.MaxConnections)
r.limitListener = limitListener

srv := &http.Server{
Addr: fmt.Sprintf(":%d", r.spec.Port),
Expand Down
68 changes: 68 additions & 0 deletions pkg/util/sem/semaphore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package sem

import "sync"

const capacity = 2000000

type Semaphore struct {
sem uint32
lock *sync.Mutex
guardChan chan *struct{}
}

func NewSem(n uint32) *Semaphore {
s := &Semaphore{
sem: n,
lock: &sync.Mutex{},
guardChan: make(chan *struct{}, capacity),
}

go func() {
for i := uint32(0); i < n; i++ {
s.guardChan <- &struct{}{}
}
}()

return s
}

func (s *Semaphore) Acquire() {
<-s.guardChan
}

func (s *Semaphore) AcquireRaw() chan *struct{} {
return s.guardChan
}

func (s *Semaphore) Release() {
s.guardChan <- &struct{}{}
}

func (s *Semaphore) SetMaxCount(n uint32) {
s.lock.Lock()
defer s.lock.Unlock()

if n > capacity {
n = capacity
}

if n == s.sem {
return
}

old := s.sem
s.sem = n

go func() {
if n > old {
for i := uint32(0); i < n-old; i++ {
s.guardChan <- &struct{}{}
}
return
}

for i := uint32(0); i < old-n; i++ {
<-s.guardChan
}
}()
}
137 changes: 137 additions & 0 deletions pkg/util/sem/semaphore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package sem

import (
"math/rand"
"sync"
"testing"
"time"
)

type Case struct {
maxSem int
}

func TestSemaphore0(t *testing.T) {
s := NewSem(0)

c := make(chan struct{})
go func(s *Semaphore, c chan struct{}) {
s.Acquire()
c <- struct{}{}
}(s, c)

select {
case <-c:
t.Errorf("trans: 1, maxSem: 0")
case <-time.After(100 * time.Millisecond):
}
}
func TestSemaphoreRobust(t *testing.T) {
s := NewSem(10)
w := &sync.WaitGroup{}
w.Add(101)
// change maxSem randomly
go func() {
for i := 0; i < 500; i++ {
time.Sleep(1 * time.Millisecond)
s.SetMaxCount(uint32(rand.Intn(100000)))
}
w.Done()
}()

for x := 0; x < 100; x++ {
go func() {
for j := 0; j < 100; j++ {
s.Acquire()
time.Sleep(5 * time.Millisecond)
s.Release()
}
w.Done()
}()
}
w.Wait()

// confirm it still works
s.SetMaxCount(20)
time.Sleep(10 * time.Millisecond)

runCase(s, &Case{23}, t)

}

func TestSemaphoreN(t *testing.T) {
var s = NewSem(20)
Cases := []*Case{
{maxSem: 37},
{maxSem: 45},
{maxSem: 3},
{maxSem: 1000},
{maxSem: 235},
{maxSem: 800},
{maxSem: 587},
}

for _, c := range Cases {
runCase(s, c, t)
}
}

func BenchmarkSemaphore(b *testing.B) {
s := NewSem(uint32(b.N/2 + 1))
for i := 0; i < b.N; i++ {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
s.Acquire()
s.Release()
}
})
}
}
func runCase(s *Semaphore, c *Case, t *testing.T) {
s.SetMaxCount(uint32(c.maxSem))
time.Sleep(10 * time.Millisecond)

w := &sync.WaitGroup{}
w.Add(c.maxSem)
for i := 0; i < c.maxSem; i++ {
go func(w *sync.WaitGroup) {
s.Acquire()
defer s.Release()
time.Sleep(100 * time.Millisecond)
w.Done()
}(w)
}
begin := time.Now()
w.Wait()
d := time.Since(begin)
if d > 100*time.Millisecond+10*time.Millisecond {
t.Errorf("time too long: %v, sem: %d, trans: %d", d, c.maxSem, c.maxSem)
}
if d < 100*time.Millisecond-10*time.Millisecond {
t.Errorf("time too short: %v, sem: %d, trans: %d", d, c.maxSem, c.maxSem)
}

s.SetMaxCount(uint32(c.maxSem - 1))
time.Sleep(10 * time.Millisecond)

w = &sync.WaitGroup{}
w.Add(c.maxSem)
for i := 0; i < c.maxSem; i++ {
go func(w *sync.WaitGroup) {
s.Acquire()
defer s.Release()
time.Sleep(100 * time.Millisecond)
w.Done()
}(w)
}
begin = time.Now()
w.Wait()
d = time.Since(begin)
if d < 200*time.Millisecond-10*time.Millisecond {
t.Errorf("time too short: %v, sem: %d, trans: %d", d, c.maxSem-1, c.maxSem)
}

if d > 200*time.Millisecond+10*time.Millisecond {
t.Errorf("time too long: %v, sem: %d, trans: %d", d, c.maxSem-1, c.maxSem)
}
}

0 comments on commit 12ea643

Please sign in to comment.