Skip to content

Commit

Permalink
Tunes the param of dynamic scheduler to reduce the amount of pipeline…
Browse files Browse the repository at this point in the history
… instance
  • Loading branch information
zhiyanliu authored and Jack47 committed Dec 19, 2017
1 parent 419ddd9 commit c08c9ff
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions src/engine/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ func (scheduler *commonPipelineScheduler) StopPipeline() {
////

const (
SCHEDULER_DYNAMIC_SPAWN_MIN_INTERVAL_MS = 200
SCHEDULER_DYNAMIC_SPAWN_MIN_INTERVAL_MS = 500
SCHEDULER_DYNAMIC_SPAWN_MAX_IN_EACH = 500
SCHEDULER_DYNAMIC_FAST_SCALE_INTERVAL_MS = 1000
SCHEDULER_DYNAMIC_FAST_SCALE_RATIO = 1.2
SCHEDULER_DYNAMIC_FAST_SCALE_MIN_COUNT = 5
Expand Down Expand Up @@ -201,7 +202,7 @@ func newDynamicPipelineScheduler(pipeline *model.Pipeline) *dynamicPipelineSched
return &dynamicPipelineScheduler{
commonPipelineScheduler: newCommonPipelineScheduler(pipeline),
getters: make(map[string]pipelines.SourceInputQueueLengthGetter, 1),
launchChan: make(chan *inputEvent, 1024), // buffer for trigger() calls before scheduler starts
launchChan: make(chan *inputEvent, 128), // buffer for trigger() calls before scheduler starts
spawnStop: make(chan struct{}),
shrinkStop: make(chan struct{}),
launchTimes: make(map[string]time.Time, 1),
Expand Down Expand Up @@ -246,11 +247,16 @@ func (scheduler *dynamicPipelineScheduler) trigger(getterName string, getter pip
return
}

scheduler.launchChan <- &inputEvent{
event := &inputEvent{
getterName: getterName,
getter: getter,
queueLength: queueLength,
}

select {
case scheduler.launchChan <- event:
default: // skip if busy, spawn() routine will redress
}
}

func (scheduler *dynamicPipelineScheduler) launch() {
Expand Down Expand Up @@ -297,6 +303,10 @@ func (scheduler *dynamicPipelineScheduler) launch() {
}
}

if info.queueLength > SCHEDULER_DYNAMIC_SPAWN_MAX_IN_EACH {
info.queueLength = SCHEDULER_DYNAMIC_SPAWN_MAX_IN_EACH
}

scheduler.shrinkTimeLock.RUnlock()

parallelism, delta := scheduler.startPipeline(
Expand Down

0 comments on commit c08c9ff

Please sign in to comment.