Skip to content

Commit

Permalink
Make buffered channel to avoid goroutines leak in groupbytraceprocess…
Browse files Browse the repository at this point in the history
…or (#1505)

* Add license

* remove goleak

* Restore go.sum to the initial state
  • Loading branch information
pkositsyn committed Nov 9, 2020
1 parent de1b271 commit 648e110
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
12 changes: 11 additions & 1 deletion processor/groupbytraceprocessor/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,25 @@ func (em *eventMachine) shutdown() {
em.closed = true
em.shutdownLock.Unlock()

done := make(chan struct{})

// we never return an error here
ok, _ := doWithTimeout(em.shutdownTimeout, func() error {
for {
if len(em.events) == 0 {
return nil
}
time.Sleep(100 * time.Millisecond)

// Do not leak goroutine
select {
case <-done:
return nil
default:
}
}
})
close(done)

if !ok {
em.logger.Info("forcing the shutdown of the event manager", zap.Int("pending-events", len(em.events)))
Expand Down Expand Up @@ -254,7 +264,7 @@ func (em *eventMachine) handleEventWithObservability(event string, do func() err
// doWithTimeout wraps a function in a timeout, returning whether it succeeded before timing out.
// If the function returns an error within the timeout, it's considered as succeeded and the error will be returned back to the caller.
func doWithTimeout(timeout time.Duration, do func() error) (bool, error) {
done := make(chan error)
done := make(chan error, 1)
go func() {
done <- do()
}()
Expand Down
10 changes: 8 additions & 2 deletions processor/groupbytraceprocessor/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,17 +389,23 @@ func TestForceShutdown(t *testing.T) {

// verify
assert.True(t, duration > 20*time.Millisecond)

// wait for shutdown goroutine to end
time.Sleep(100 * time.Millisecond)
}

func TestDoWithTimeout(t *testing.T) {
// prepare
start := time.Now()

done := make(chan struct{})

// test
doWithTimeout(10*time.Millisecond, func() error {
time.Sleep(20 * time.Second)
doWithTimeout(5*time.Millisecond, func() error {
<-done
return nil
})
close(done)

// verify
assert.WithinDuration(t, start, time.Now(), 20*time.Millisecond)
Expand Down

0 comments on commit 648e110

Please sign in to comment.