Skip to content

Commit

Permalink
Refactor processor pipeline dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Aug 15, 2018
1 parent 9c549ce commit 040ec79
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 67 deletions.
1 change: 1 addition & 0 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ dockers:
- "v{{ .Major }}.{{ .Minor }}"
- latest
extra_files:
- .git
- cmd
- config
- Gopkg.lock
Expand Down
2 changes: 1 addition & 1 deletion config/test/smoke_in.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ output:
topic: benthos_topic
client_id: benthos_output
logger:
log_level: INFO
level: INFO
2 changes: 1 addition & 1 deletion config/test/smoke_out.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@ input:
output:
type: http_server
logger:
log_level: INFO
level: INFO
10 changes: 8 additions & 2 deletions lib/pipeline/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,14 @@ func (p *Pool) loop() {
}
}()
for {
t, open := <-w.TransactionChan()
if !open {
var t types.Transaction
var open bool
select {
case t, open = <-w.TransactionChan():
if !open {
return
}
case <-p.closeChan:
return
}
select {
Expand Down
191 changes: 132 additions & 59 deletions lib/pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import (
// The processor will read from a source, perform some processing, and then
// either propagate a new message or drop it.
type Processor struct {
running int32
running int32
dispatchers int32

log log.Modular
stats metrics.Type
Expand All @@ -50,6 +51,9 @@ type Processor struct {

messagesIn <-chan types.Transaction

mSndSucc metrics.StatCounter
mSndErr metrics.StatCounter

closeChan chan struct{}
closed chan struct{}
}
Expand All @@ -62,9 +66,12 @@ func NewProcessor(
) *Processor {
return &Processor{
running: 1,
dispatchers: 1,
msgProcessors: msgProcessors,
log: log.NewModule(".pipeline.processor"),
stats: stats,
mSndSucc: stats.GetCounter("pipeline.processor.send.success"),
mSndErr: stats.GetCounter("pipeline.processor.send.error"),
messagesOut: make(chan types.Transaction),
responsesIn: make(chan types.Response),
closeChan: make(chan struct{}),
Expand All @@ -77,21 +84,17 @@ func NewProcessor(
// loop is the processing loop of this pipeline.
func (p *Processor) loop() {
defer func() {
atomic.StoreInt32(&p.running, 0)

close(p.messagesOut)
close(p.closed)
if atomic.AddInt32(&p.dispatchers, -1) == 0 {
close(p.messagesOut)
close(p.closed)
}
}()

var (
mProcCount = p.stats.GetCounter("pipeline.processor.count")
mProcDropped = p.stats.GetCounter("pipeline.processor.dropped")
mSndSucc = p.stats.GetCounter("pipeline.processor.send.success")
mSndErr = p.stats.GetCounter("pipeline.processor.send.error")
)

throt := throttle.New(throttle.OptCloseChan(p.closeChan))

var open bool
for atomic.LoadInt32(&p.running) == 1 {
var tran types.Transaction
Expand Down Expand Up @@ -127,72 +130,142 @@ func (p *Processor) loop() {
continue
}

var skipAcks int64
sendMsg := func(m types.Message) {
resChan := make(chan types.Response)
transac := types.NewTransaction(m, resChan)
if len(resultMsgs) > 1 {
p.dispatchMessages(resultMsgs, tran.ResponseChan)
} else {
p.dispatchMessage(resultMsgs[0], tran.ResponseChan)
}
}
}

for {
select {
case p.messagesOut <- transac:
case <-p.closeChan:
return
}
// dispatchMessage attempts to send a single message result of processors over
// the shared messages channel. This send is retried until success.
func (p *Processor) dispatchMessage(m types.Message, ogResChan chan<- types.Response) {
resChan := make(chan types.Response)
transac := types.NewTransaction(m, resChan)

var res types.Response
var open bool
select {
case res, open = <-resChan:
if !open {
return
}
case <-p.closeChan:
return
}
var res types.Response

if skipAck := res.SkipAck(); res.Error() == nil || skipAck {
if skipAck {
atomic.AddInt64(&skipAcks, 1)
}
mSndSucc.Incr(1)
return
}
mSndErr.Incr(1)
if !throt.Retry() {
select {
case p.messagesOut <- transac:
case <-p.closeChan:
return
}

atomic.AddInt32(&p.dispatchers, 1)
go func() {
defer func() {
if atomic.AddInt32(&p.dispatchers, -1) == 0 {
close(p.messagesOut)
close(p.closed)
}
}()

throt := throttle.New(throttle.OptCloseChan(p.closeChan))

sendLoop:
for {
var open bool
select {
case res, open = <-resChan:
if !open {
return
}
case <-p.closeChan:
return
}
}

if len(resultMsgs) > 1 {
wg := sync.WaitGroup{}
wg.Add(len(resultMsgs))

for _, msg := range resultMsgs {
go func(m types.Message) {
defer wg.Done()
sendMsg(m)
}(msg)
if res.Error() == nil {
p.mSndSucc.Incr(1)
break sendLoop
}

wg.Wait()
} else {
sendMsg(resultMsgs[0])
}
throt.Reset()
p.mSndErr.Incr(1)
if !throt.Retry() {
return
}

var res types.Response
if skipAcks == int64(len(resultMsgs)) {
res = response.NewUnack()
} else {
res = response.NewAck()
select {
case p.messagesOut <- transac:
case <-p.closeChan:
return
}
}

select {
case tran.ResponseChan <- res:
case ogResChan <- res:
case <-p.closeChan:
return
}
}()
}

// dispatchMessages attempts to send a multiple messages results of processors
// over the shared messages channel. This send is retried until success.
func (p *Processor) dispatchMessages(msgs []types.Message, ogResChan chan<- types.Response) {
throt := throttle.New(throttle.OptCloseChan(p.closeChan))

var skipAcks int64
sendMsg := func(m types.Message) {
resChan := make(chan types.Response)
transac := types.NewTransaction(m, resChan)

for {
select {
case p.messagesOut <- transac:
case <-p.closeChan:
return
}

var res types.Response
var open bool
select {
case res, open = <-resChan:
if !open {
return
}
case <-p.closeChan:
return
}

if skipAck := res.SkipAck(); res.Error() == nil || skipAck {
if skipAck {
atomic.AddInt64(&skipAcks, 1)
}
p.mSndSucc.Incr(1)
return
}
p.mSndErr.Incr(1)
if !throt.Retry() {
return
}
}
}

wg := sync.WaitGroup{}
wg.Add(len(msgs))

for _, msg := range msgs {
go func(m types.Message) {
defer wg.Done()
sendMsg(m)
}(msg)
}

wg.Wait()
throt.Reset()

var res types.Response
if skipAcks == int64(len(msgs)) {
res = response.NewUnack()
} else {
res = response.NewAck()
}

select {
case ogResChan <- res:
case <-p.closeChan:
return
}
}

Expand Down
2 changes: 1 addition & 1 deletion resources/docker/benchmark/benthos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ output:

logger:
prefix: service
log_level: INFO
level: INFO

metrics:
type: prometheus
Expand Down
2 changes: 1 addition & 1 deletion resources/docker/benchmark/extract_data.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ output:

logger:
prefix: service
log_level: INFO
level: INFO
2 changes: 1 addition & 1 deletion resources/docker/benchmark/inject_data.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ output:

logger:
prefix: service
log_level: INFO
level: INFO
2 changes: 1 addition & 1 deletion resources/docker/profiling/benthos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ output:

logger:
prefix: service
log_level: INFO
level: INFO

metrics:
type: prometheus
Expand Down

0 comments on commit 040ec79

Please sign in to comment.