Skip to content

Commit

Permalink
init object pool (#940)
Browse files Browse the repository at this point in the history
* init object pool

* fix by bot adv

* fid and clean code

* cas

* fix: 1.remove pool base on sync.Mutex;2.expose fields of Spec;3.rename new pool func name

* use sync.Cond instead of spinlock

* remove unnecessary cond.Lock

* reactor code

* fix compile bug
  • Loading branch information
sodaRyCN committed Mar 16, 2023
1 parent 2ad418a commit f870f90
Show file tree
Hide file tree
Showing 2 changed files with 389 additions and 0 deletions.
215 changes: 215 additions & 0 deletions pkg/util/objectpool/objectpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Copyright (c) 2017, MegaEase
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package objectpool provides Pool of interface PoolObject
package objectpool

import (
"context"
"fmt"
"github.com/megaease/easegress/pkg/logger"
"sync"
)

// PoolObject is an interface that about definition of object that managed by pool
type PoolObject interface {
Destroy() // destroy the object
HealthCheck() bool // check the object is health or not
}

type (
// Pool manage the PoolObject
Pool struct {
initSize int // initial size
maxSize int // max size
size int // current size
new func() (PoolObject, error) // create a new object, it must return a health object or err
store chan PoolObject // store the object
cond *sync.Cond // when conditions are met, it wakes all goroutines waiting on sync.Cond
checkWhenGet bool // whether to health check when get PoolObject
checkWhenPut bool // whether to health check when put PoolObject
}

// Spec Pool's spec
Spec struct {
InitSize int // initial size
MaxSize int // max size
New func() (PoolObject, error) // create a new object
CheckWhenGet bool // whether to health check when get PoolObject
CheckWhenPut bool // whether to health check when put PoolObject
}
)

// New returns a new pool
func New(initSize, maxSize int, new func() (PoolObject, error)) *Pool {
return NewWithSpec(Spec{
InitSize: initSize,
MaxSize: maxSize,
New: new,
CheckWhenGet: true,
CheckWhenPut: true,
})
}

// NewWithSpec returns a new pool
func NewWithSpec(spec Spec) *Pool {
p := &Pool{
initSize: spec.InitSize,
maxSize: spec.MaxSize,
new: spec.New,
checkWhenPut: spec.CheckWhenPut,
checkWhenGet: spec.CheckWhenGet,
store: make(chan PoolObject, spec.MaxSize),
cond: sync.NewCond(&sync.Mutex{}),
}

for i := 0; i < p.initSize; i++ {
obj, err := p.new()
if err != nil {
logger.Errorf("create pool object failed: %v", err)
continue
}
p.size++
p.store <- obj
}

return p
}

// Validate validate
func (s *Spec) Validate() error {
if s.InitSize > s.MaxSize {
s.MaxSize = s.InitSize
}
if s.MaxSize <= 0 {
return fmt.Errorf("pool max size must be positive")
}
if s.InitSize < 0 {
return fmt.Errorf("pool init size must greate than or equals 0")
}

return nil
}

// The fast path, try get an object from the pool directly
func (p *Pool) fastGet() PoolObject {
select {
case obj := <-p.store:
return obj
default:
return nil
}
}

// The slow path, we need to wait for an object or create a new one.
func (p *Pool) slowGet(ctx context.Context) (PoolObject, error) {
// we need to watch ctx.Done in another goroutine, so that we can stop
// the slow path when the context is done.
// we also need to stop the watch when the slow path is done.
stop := make(chan struct{})
defer close(stop)

go func() {
select {
case <-ctx.Done():
p.cond.Broadcast()
case <-stop:
}
}()

p.cond.L.Lock()
defer p.cond.L.Unlock()

for {
select {
case <-ctx.Done():
return nil, ctx.Err()

case obj := <-p.store:
return obj, nil

default:
}

// try creating a new object
if p.size < p.maxSize {
if obj, err := p.new(); err == nil {
p.size++
return obj, nil
}
}

// the pool reaches its max size and there is no object available
p.cond.Wait()
}
}

// Get returns an object from the pool,
//
// if there's an available object, it will return it directly;
// if there's no free object, it will create a one if the pool is not full;
// if the pool is full, it will block until an object is returned to the pool.
func (p *Pool) Get(ctx context.Context) (PoolObject, error) {
for {
obj := p.fastGet()
if obj == nil {
var err error
obj, err = p.slowGet(ctx)
if err != nil {
return nil, err
}
}

if !p.checkWhenGet || obj.HealthCheck() {
return obj, nil
}

p.putUnhealthyObject(obj)
}
}

func (p *Pool) putUnhealthyObject(obj PoolObject) {
p.cond.L.Lock()
p.size--
p.cond.L.Unlock()

p.cond.Signal()
obj.Destroy()
}

// Put return the object to the pool
func (p *Pool) Put(obj PoolObject) {
if obj == nil {
panic("pool: put nil object")
}

if p.checkWhenPut && !obj.HealthCheck() {
p.putUnhealthyObject(obj)
return
}

p.store <- obj
p.cond.Signal()
}

// Close closes the pool and clean all the objects
func (p *Pool) Close() {
close(p.store)
for obj := range p.store {
obj.Destroy()
}
}
174 changes: 174 additions & 0 deletions pkg/util/objectpool/objectpool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright (c) 2017, MegaEase
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package objectpool

import (
"context"
"github.com/megaease/easegress/pkg/logger"
"github.com/stretchr/testify/assert"
"math/rand"
"os"
"sync"
"testing"
)

func TestMain(m *testing.M) {
logger.InitNop()
code := m.Run()
os.Exit(code)
}

func TestValidate(t *testing.T) {
assertions := assert.New(t)

spec := &Spec{
InitSize: 3,
MaxSize: 2,
}

err := spec.Validate()
assertions.Nil(err)
assertions.True(spec.MaxSize == spec.InitSize)

spec.MaxSize = 0
assertions.NoError(spec.Validate())

spec.InitSize, spec.MaxSize = 0, 0
assertions.Error(spec.Validate())
}

type fakeNormalPoolObject struct {
random bool
health bool
}

func (f *fakeNormalPoolObject) Destroy() {

}

func (f *fakeNormalPoolObject) HealthCheck() bool {
if !f.random {
return f.health
}
random := rand.Intn(10)
return random != 8
}

// fakeAlmostUnHealthPoolObject 80% unhealthy poolObject
type fakeAlmostUnHealthPoolObject struct {
}

func (f *fakeAlmostUnHealthPoolObject) Destroy() {

}

func (f *fakeAlmostUnHealthPoolObject) HealthCheck() bool {
random := rand.Intn(10)
return random >= 8
}

func TestNewSimplePool(t *testing.T) {
init, max := 2, 4
pool := New(init, max, func() (PoolObject, error) {
return &fakeNormalPoolObject{random: false, health: true}, nil
})

as := assert.New(t)
as.Equal(len(pool.store), init)
as.Equal(cap(pool.store), max)
}

func getAndPut(pool *Pool) {
iPoolObject, _ := pool.Get(context.Background())
if iPoolObject != nil {
pool.Put(iPoolObject)
}
}

func benchmarkWithIPoolObjectNumAndGoroutineNum(iPoolObjNum, goRoutineNum int, fake PoolObject, b *testing.B) {
pool := New(iPoolObjNum/2, iPoolObjNum, func() (PoolObject, error) {
return fake, nil
})
ch := make(chan struct{})
startedWait := sync.WaitGroup{}
startedWait.Add(goRoutineNum - 1)
for i := 0; i < goRoutineNum-1; i++ {
go func() {
done := false
for {
select {
case <-ch:
return
default:
if !done {
startedWait.Done()
done = true
}
getAndPut(pool)
}
}
}()
}
startedWait.Wait()
b.ResetTimer()
for i := 0; i < b.N; i++ {
getAndPut(pool)
}
b.StopTimer()
close(ch)
}

func BenchmarkWithoutRace(b *testing.B) {
benchmarkWithIPoolObjectNumAndGoroutineNum(1, 1, &fakeNormalPoolObject{random: true}, b)
}

func BenchmarkIPoolObjectEqualsGoroutine(b *testing.B) {
benchmarkWithIPoolObjectNumAndGoroutineNum(4, 4, &fakeNormalPoolObject{random: true}, b)
}

func BenchmarkGoroutine2TimesIPoolObject(b *testing.B) {
benchmarkWithIPoolObjectNumAndGoroutineNum(2, 4, &fakeNormalPoolObject{random: true}, b)
}

func BenchmarkGoroutine4TimesIPoolObject(b *testing.B) {
benchmarkWithIPoolObjectNumAndGoroutineNum(2, 8, &fakeNormalPoolObject{random: true}, b)
}

func BenchmarkGoroutine2TimesIPoolObjectWithAlmostUnHealthIPoolObject(b *testing.B) {
benchmarkWithIPoolObjectNumAndGoroutineNum(2, 4, &fakeAlmostUnHealthPoolObject{}, b)
}

func BenchmarkGoroutine4TimesIPoolObjectWithAlmostUnHealthIPoolObject(b *testing.B) {
benchmarkWithIPoolObjectNumAndGoroutineNum(2, 8, &fakeAlmostUnHealthPoolObject{}, b)
}

// fakeUnHealthPoolObject 100% unhealthy poolObject
type fakeUnHealthPoolObject struct {
}

func (f *fakeUnHealthPoolObject) Destroy() {

}

func (f *fakeUnHealthPoolObject) HealthCheck() bool {
return false
}

func BenchmarkGoroutine2TimesIPoolObjectWithUnhHealthyPool(b *testing.B) {
benchmarkWithIPoolObjectNumAndGoroutineNum(2, 4, &fakeUnHealthPoolObject{}, b)
}

0 comments on commit f870f90

Please sign in to comment.