Skip to content

Commit

Permalink
[pipeline] Prevents goroutine creation in pipeline main execution path
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiyanliu authored and Jack47 committed Dec 15, 2017
1 parent 7cf6ca8 commit 1c96639
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 47 deletions.
7 changes: 5 additions & 2 deletions src/engine/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (
type pipelineInstance struct {
instance pipelines_gw.Pipeline
stop chan struct{}
stopped chan struct{}
done chan struct{}
}

func newPipelineInstance(instance pipelines_gw.Pipeline) *pipelineInstance {
return &pipelineInstance{
instance: instance,
stop: make(chan struct{}),
stopped: make(chan struct{}),
done: make(chan struct{}),
}
}
Expand All @@ -49,13 +51,15 @@ loop:
}
}

close(pi.stopped)
pi.instance.Close()
close(pi.done)
}

func (pi *pipelineInstance) terminate(scheduled bool) chan struct{} {
pi.instance.Stop(scheduled)
close(pi.stop)
pi.instance.Stop(scheduled)
<-pi.stopped
return pi.done
}

Expand Down Expand Up @@ -343,7 +347,6 @@ func (scheduler *dynamicPipelineScheduler) spawn() {
scheduler.launchChan <- &inputEvent{
queueLength: queueLength,
} // without getterName and getter

case <-scheduler.spawnStop:
return
}
Expand Down
188 changes: 145 additions & 43 deletions src/model/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ func linearPipelineConfigConstructor() pipelines_gw.Config {
}
}

////

type statisticsData struct {
startAt, finishAt time.Time
successful bool
}

type pluginStatisticsData struct {
statisticsData
pluginName string
}

//
// Linear pipeline implementation
//
Expand All @@ -40,6 +52,10 @@ type linearPipeline struct {
started, stopped uint32
runningPluginName string
runningPluginGeneration uint64
pipelineAndTaskStatChan chan *statisticsData
pluginStatChan chan *pluginStatisticsData
statUpdaterStop chan struct{}
statUpdaterDone chan *struct{}
}

func newLinearPipeline(ctx pipelines.PipelineContext, statistics *PipelineStatistics,
Expand All @@ -62,14 +78,23 @@ func newLinearPipeline(ctx pipelines.PipelineContext, statistics *PipelineStatis
return nil, fmt.Errorf("model object is nil")
}

return &linearPipeline{
ctx: ctx,
conf: c,
statistics: statistics,
mod: m,
rerunCancel: NoOpCancelFunc,
scheduleCancel: NoOpCancelFunc,
}, nil
pipeline := &linearPipeline{
ctx: ctx,
conf: c,
statistics: statistics,
mod: m,
rerunCancel: NoOpCancelFunc,
scheduleCancel: NoOpCancelFunc,
pipelineAndTaskStatChan: make(chan *statisticsData, 10240),
pluginStatChan: make(chan *pluginStatisticsData, 10240),
statUpdaterStop: make(chan struct{}),
statUpdaterDone: make(chan *struct{}),
}

go pipeline.pipelineAndTaskStatUpdater()
go pipeline.pluginStatUpdater()

return pipeline, nil
}

func (p *linearPipeline) Name() string {
Expand Down Expand Up @@ -167,23 +192,12 @@ func (p *linearPipeline) Run() error {
tsk.finish(t)
}

if !preempted {
go func() {
err1 := p.statistics.updatePipelineExecution(time.Now().Sub(startAt))
if err1 != nil {
logger.Errorf("[pipeline %s updates execution statistics failed: %v]", p.Name(), err1)
}

if t.Error() == nil {
err1 = p.statistics.updateTaskExecution(pipelines.SuccessStatistics)
} else {
err1 = p.statistics.updateTaskExecution(pipelines.FailureStatistics)
}

if err1 != nil {
logger.Errorf("[pipeline %s updates task execution statistics failed: %v]", p.Name(), err1)
}
}()
if !preempted && atomic.LoadUint32(&p.stopped) == 0 {
p.pipelineAndTaskStatChan <- &statisticsData{
startAt: startAt,
finishAt: time.Now(),
successful: t.Error() == nil,
}
}

return nil
Expand All @@ -203,16 +217,30 @@ func (p *linearPipeline) Stop(scheduled bool) {
}

if scheduled {
defer func() {
recover() // to prevent p.scheduleCancel() raises any issue in case of concurrent update/call
}()
func() {
defer func() {
recover() // to prevent p.scheduleCancel() raises any issue in case of concurrent update/call
}()

p.scheduleCancel()
p.scheduleCancel()
}()
} else {
if p.stopCancel != nil {
p.stopCancel()
}
}

// notify both updaters stop
close(p.statUpdaterStop)

// wait both updaters done
<-p.statUpdaterDone
<-p.statUpdaterDone

// close channels
close(p.statUpdaterDone)
close(p.pipelineAndTaskStatChan)
close(p.pluginStatChan)
}

func (p *linearPipeline) runPlugin(instance plugins.Plugin, pluginType plugins.PluginType, gen uint64,
Expand Down Expand Up @@ -254,20 +282,15 @@ func (p *linearPipeline) runPlugin(instance plugins.Plugin, pluginType plugins.P
p.rerunCancel = NoOpCancelFunc
p.scheduleCancel = NoOpCancelFunc

if !rerun && !preempted {
go func() {
var kind pipelines.StatisticsKind = pipelines.AllStatistics
if err != nil || t.Error() != nil {
kind = pipelines.FailureStatistics
} else {
kind = pipelines.SuccessStatistics
}

err1 := p.statistics.updatePluginExecution(instance.Name(), kind, finishAt.Sub(startAt))
if err1 != nil {
logger.Errorf("[plugin %s updates execution statistics failed: %v]", instance.Name(), err1)
}
}()
if !rerun && !preempted && atomic.LoadUint32(&p.stopped) == 0 {
p.pluginStatChan <- &pluginStatisticsData{
statisticsData: statisticsData{
startAt: startAt,
finishAt: finishAt,
successful: err != nil || t.Error() != nil,
},
pluginName: instance.Name(),
}
}

if err != nil {
Expand Down Expand Up @@ -306,3 +329,82 @@ func (p *linearPipeline) cancelAndRerunRunningPlugin(updatedPlugin *Plugin,

p.rerunCancel()
}

func (p *linearPipeline) updatePipelineAndTaskStat(data *statisticsData) {
err := p.statistics.updatePipelineExecution(data.finishAt.Sub(data.startAt))
if err != nil {
logger.Errorf("[pipeline %s updates execution statistics failed: %v]", p.Name(), err)
}

if data.successful {
err = p.statistics.updateTaskExecution(pipelines.SuccessStatistics)
} else {
err = p.statistics.updateTaskExecution(pipelines.FailureStatistics)
}

if err != nil {
logger.Errorf("[pipeline %s updates task execution statistics failed: %v]", p.Name(), err)
}
}

func (p *linearPipeline) drainPipelineAndTaskStatData() {
for {
select {
case data := <-p.pipelineAndTaskStatChan:
p.updatePipelineAndTaskStat(data)
default:
return
}
}
}

func (p *linearPipeline) pipelineAndTaskStatUpdater() {
for {
select {
case data := <-p.pipelineAndTaskStatChan:
p.updatePipelineAndTaskStat(data)
case <-p.statUpdaterStop:
p.drainPipelineAndTaskStatData()
p.statUpdaterDone <- nil // notify done
return
}
}
}

func (p *linearPipeline) updatePluginStat(data *pluginStatisticsData) {
kind := pipelines.AllStatistics
if data.successful {
kind = pipelines.SuccessStatistics
} else {
kind = pipelines.FailureStatistics
}

err := p.statistics.updatePluginExecution(data.pluginName, kind, data.finishAt.Sub(data.startAt))
if err != nil {
logger.Errorf("[plugin %s updates execution statistics failed: %v]", data.pluginName, err)
}
}

func (p *linearPipeline) drainPluginStatData() {
for {
select {
case data := <-p.pluginStatChan:
p.updatePluginStat(data)
default:
return
}
}
}

func (p *linearPipeline) pluginStatUpdater() {
for {
select {
case data := <-p.pluginStatChan:
p.updatePluginStat(data)
case <-p.statUpdaterStop:
p.drainPluginStatData()
p.statUpdaterDone <- nil // notify done
return
}
}
}
6 changes: 4 additions & 2 deletions src/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"os/signal"
"runtime"
"runtime/debug"
"runtime/pprof"
"syscall"

Expand Down Expand Up @@ -185,9 +186,10 @@ func setupHeapDumpSignalHandler() {
logger.Debugf("[memory profiling started, heap dumps to %s]",
option.MemProfileFile)

// get up-to-date statistics
logger.Infof("[full gc is executing for heap dump, this may block the entire program]")
runtime.GC()

runtime.GC() // get up-to-date statistics
debug.FreeOSMemory() // help developer when using outside monitor tool

pprof.WriteHeapProfile(f)

Expand Down

0 comments on commit 1c96639

Please sign in to comment.