Skip to content

Commit

Permalink
Do onTraceReleased asynchronously in groupbytraceprocessor (open-tele…
Browse files Browse the repository at this point in the history
…metry#1808)

* Do onTraceReleased asynchronously in groupbytraceprocessor

* Add test for async release

* Fix lint

* Fix mockprocessor to be asynchronous

* do async onTraceReleased

* remove test for unchanged logic

* small optimizations

* narrow lock scope in get

* Fix lint

* Add test for async onRelease

* Use pdata.TraceID as map key
  • Loading branch information
pkositsyn committed Jan 22, 2021
1 parent 1ba501a commit ce4189c
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 30 deletions.
7 changes: 3 additions & 4 deletions processor/groupbytraceprocessor/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,13 @@ func (em *eventMachine) handleEventWithObservability(event string, do func() err
ctx, _ := tag.New(context.Background(), tag.Upsert(tag.MustNewKey("event"), event))
stats.Record(ctx, mEventLatency.M(duration.Milliseconds()))

logger := em.logger.With(zap.String("event", event))
if err != nil {
logger.Error("failed to process event", zap.Error(err))
em.logger.Error("failed to process event", zap.Error(err), zap.String("event", event))
}
if succeeded {
logger.Debug("event finished")
em.logger.Debug("event finished", zap.String("event", event))
} else {
logger.Debug("event aborted")
em.logger.Debug("event aborted", zap.String("event", event))
}
}

Expand Down
9 changes: 8 additions & 1 deletion processor/groupbytraceprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,14 @@ func (sp *groupByTraceProcessor) onTraceReleased(rss []pdata.ResourceSpans) erro
}
stats.Record(context.Background(), mReleasedSpans.M(int64(trace.SpanCount())))
stats.Record(context.Background(), mReleasedTraces.M(1))
return sp.nextConsumer.ConsumeTraces(context.Background(), trace)

// Do async consuming not to block event worker
go func() {
if err := sp.nextConsumer.ConsumeTraces(context.Background(), trace); err != nil {
sp.logger.Error("consume failed", zap.Error(err))
}
}()
return nil
}

func (sp *groupByTraceProcessor) onTraceRemoved(traceID pdata.TraceID) error {
Expand Down
29 changes: 29 additions & 0 deletions processor/groupbytraceprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -614,6 +615,20 @@ func TestErrorOnProcessResourceSpansContinuesProcessing(t *testing.T) {
assert.True(t, returnedError)
}

func TestAsyncOnRelease(t *testing.T) {
blockCh := make(chan struct{})
blocker := &blockingConsumer{
blockCh: blockCh,
}

sp := &groupByTraceProcessor{
logger: zap.NewNop(),
nextConsumer: blocker,
}
assert.NoError(t, sp.onTraceReleased(nil))
close(blockCh)
}

func BenchmarkConsumeTracesCompleteOnFirstBatch(b *testing.B) {
// prepare
config := Config{
Expand All @@ -638,13 +653,16 @@ func BenchmarkConsumeTracesCompleteOnFirstBatch(b *testing.B) {
}

type mockProcessor struct {
mutex sync.Mutex
onTraces func(context.Context, pdata.Traces) error
}

var _ component.TracesProcessor = (*mockProcessor)(nil)

func (m *mockProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
if m.onTraces != nil {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.onTraces(ctx, td)
}
return nil
Expand Down Expand Up @@ -700,6 +718,17 @@ func (st *mockStorage) shutdown() error {
return nil
}

type blockingConsumer struct {
blockCh <-chan struct{}
}

var _ consumer.TracesConsumer = (*blockingConsumer)(nil)

func (b *blockingConsumer) ConsumeTraces(context.Context, pdata.Traces) error {
<-b.blockCh
return nil
}

func simpleTraces() pdata.Traces {
return simpleTracesWithID(pdata.NewTraceID([16]byte{1, 2, 3, 4}))
}
Expand Down
35 changes: 10 additions & 25 deletions processor/groupbytraceprocessor/storage_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

type memoryStorage struct {
sync.RWMutex
content map[string][]pdata.ResourceSpans
content map[pdata.TraceID][]pdata.ResourceSpans
stopped bool
stoppedLock sync.RWMutex
metricsCollectionInterval time.Duration
Expand All @@ -35,34 +35,29 @@ var _ storage = (*memoryStorage)(nil)

func newMemoryStorage() *memoryStorage {
return &memoryStorage{
content: make(map[string][]pdata.ResourceSpans),
content: make(map[pdata.TraceID][]pdata.ResourceSpans),
metricsCollectionInterval: time.Second,
}
}

func (st *memoryStorage) createOrAppend(traceID pdata.TraceID, rs pdata.ResourceSpans) error {
sTraceID := traceID.HexString()

st.Lock()
defer st.Unlock()

if _, ok := st.content[sTraceID]; !ok {
st.content[sTraceID] = []pdata.ResourceSpans{}
}
// getting zero value is fine
content := st.content[traceID]

newRS := pdata.NewResourceSpans()
rs.CopyTo(newRS)
st.content[sTraceID] = append(st.content[sTraceID], newRS)
content = append(content, newRS)
st.content[traceID] = content

return nil
}
func (st *memoryStorage) get(traceID pdata.TraceID) ([]pdata.ResourceSpans, error) {
sTraceID := traceID.HexString()

st.RLock()
defer st.RUnlock()

rss, ok := st.content[sTraceID]
rss, ok := st.content[traceID]
st.RUnlock()
if !ok {
return nil, nil
}
Expand All @@ -80,21 +75,11 @@ func (st *memoryStorage) get(traceID pdata.TraceID) ([]pdata.ResourceSpans, erro
// delete will return a reference to a ResourceSpans. Changes to the returned object may not be applied
// to the version in the storage.
func (st *memoryStorage) delete(traceID pdata.TraceID) ([]pdata.ResourceSpans, error) {
sTraceID := traceID.HexString()

st.Lock()
defer st.Unlock()

rss := st.content[sTraceID]
var result []pdata.ResourceSpans
for _, rs := range rss {
newRS := pdata.NewResourceSpans()
rs.CopyTo(newRS)
result = append(result, newRS)
}
delete(st.content, sTraceID)

return result, nil
defer delete(st.content, traceID)
return st.content[traceID], nil
}

func (st *memoryStorage) start() error {
Expand Down

0 comments on commit ce4189c

Please sign in to comment.