Skip to content

Commit

Permalink
[pipeline] Don't spawn too much pipeline instance in the instant of a…
Browse files Browse the repository at this point in the history
… heavy load impact
  • Loading branch information
zhiyanliu authored and Jack47 committed Dec 13, 2017
1 parent 580d05a commit 961ada2
Showing 1 changed file with 69 additions and 13 deletions.
82 changes: 69 additions & 13 deletions src/engine/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ func (scheduler *commonPipelineScheduler) StopPipeline() {
////

const (
SCHEDULER_DYNAMIC_SPAWN_MIN_INTERVAL_MS = 100
SCHEDULER_DYNAMIC_SPAWN_MIN_INTERVAL_MS = 200
SCHEDULER_DYNAMIC_FAST_SCALE_INTERVAL_MS = 1000
SCHEDULER_DYNAMIC_FAST_SCALE_RATIO = 1.5
SCHEDULER_DYNAMIC_FAST_SCALE_RATIO = 1.2
SCHEDULER_DYNAMIC_FAST_SCALE_MIN_COUNT = 5
)

Expand All @@ -185,6 +185,7 @@ type dynamicPipelineScheduler struct {
gettersLock sync.RWMutex
getters map[string]pipelines.SourceInputQueueLengthGetter
launchChan chan *inputEvent
spawnStop chan struct{}
shrinkStop chan struct{}
launchTimes map[string]time.Time
shrinkTimeLock sync.RWMutex
Expand All @@ -196,6 +197,7 @@ func newDynamicPipelineScheduler(pipeline *model.Pipeline) *dynamicPipelineSched
commonPipelineScheduler: newCommonPipelineScheduler(pipeline),
getters: make(map[string]pipelines.SourceInputQueueLengthGetter, 1),
launchChan: make(chan *inputEvent, 1024), // buffer for trigger() calls before scheduler starts
spawnStop: make(chan struct{}),
shrinkStop: make(chan struct{}),
launchTimes: make(map[string]time.Time, 1),
}
Expand All @@ -222,6 +224,7 @@ func (scheduler *dynamicPipelineScheduler) Start(ctx pipelines.PipelineContext,
logger.Debugf("[initialized pipeline instance(s) for pipeline %s (total=%d)]",
scheduler.PipelineName(), parallelism)

go scheduler.launch()
go scheduler.spawn()
go scheduler.shrink()
}
Expand All @@ -245,7 +248,7 @@ func (scheduler *dynamicPipelineScheduler) trigger(getterName string, getter pip
}
}

func (scheduler *dynamicPipelineScheduler) spawn() {
func (scheduler *dynamicPipelineScheduler) launch() {
for {
select {
case info := <-scheduler.launchChan:
Expand All @@ -254,19 +257,26 @@ func (scheduler *dynamicPipelineScheduler) spawn() {
}

now := time.Now()
lastScheduleAt := scheduler.launchTimes[info.getterName]

if now.Sub(lastScheduleAt).Seconds()*1000 < SCHEDULER_DYNAMIC_SPAWN_MIN_INTERVAL_MS {
// pipeline instance schedule needs time
continue
}
if info.getterName != "" && info.getter != nil { // calls from trigger()
lastScheduleAt := scheduler.launchTimes[info.getterName]

if now.Sub(lastScheduleAt).Seconds()*1000 < SCHEDULER_DYNAMIC_SPAWN_MIN_INTERVAL_MS {
// pipeline instance schedule needs time
continue
}

scheduler.launchTimes[info.getterName] = now
scheduler.launchTimes[info.getterName] = now

// book for shrink
scheduler.gettersLock.Lock()
scheduler.getters[info.getterName] = info.getter
scheduler.gettersLock.Unlock()
// book for spawn and shrink
scheduler.gettersLock.Lock()
scheduler.getters[info.getterName] = info.getter
scheduler.gettersLock.Unlock()
} else { // calls from spawn()
for getterName := range scheduler.launchTimes {
scheduler.launchTimes[getterName] = now
}
}

scheduler.shrinkTimeLock.RLock()

Expand Down Expand Up @@ -295,6 +305,51 @@ func (scheduler *dynamicPipelineScheduler) spawn() {
}
}

func (scheduler *dynamicPipelineScheduler) spawn() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
scheduler.instancesLock.RLock()

currentParallelism := uint32(len(scheduler.instances))

if currentParallelism == option.PipelineMaxParallelism {
scheduler.instancesLock.RUnlock()
continue // less than the cap of pipeline parallelism
}

scheduler.instancesLock.RUnlock()

scheduler.gettersLock.RLock()

var queueLength uint32
for _, getter := range scheduler.getters {
l := getter()
if l+queueLength > queueLength { // defense overflow
queueLength = l + queueLength
}
}

scheduler.gettersLock.RUnlock()

if queueLength == 0 {
// current parallelism is enough
continue // spawn only
}

scheduler.launchChan <- &inputEvent{
queueLength: queueLength,
} // without getterName and getter

case <-scheduler.spawnStop:
return
}
}
}

func (scheduler *dynamicPipelineScheduler) shrink() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
Expand Down Expand Up @@ -371,6 +426,7 @@ func (scheduler *dynamicPipelineScheduler) Stop() {
}

close(scheduler.launchChan)
close(scheduler.spawnStop)
close(scheduler.shrinkStop)

atomic.StoreUint32(&scheduler.started, 0)
Expand Down

0 comments on commit 961ada2

Please sign in to comment.