diff --git a/processor/groupbytraceprocessor/event.go b/processor/groupbytraceprocessor/event.go index ea5b9d91fc8d3..6a8dc9cf442cb 100644 --- a/processor/groupbytraceprocessor/event.go +++ b/processor/groupbytraceprocessor/event.go @@ -250,14 +250,13 @@ func (em *eventMachine) handleEventWithObservability(event string, do func() err ctx, _ := tag.New(context.Background(), tag.Upsert(tag.MustNewKey("event"), event)) stats.Record(ctx, mEventLatency.M(duration.Milliseconds())) - logger := em.logger.With(zap.String("event", event)) if err != nil { - logger.Error("failed to process event", zap.Error(err)) + em.logger.Error("failed to process event", zap.Error(err), zap.String("event", event)) } if succeeded { - logger.Debug("event finished") + em.logger.Debug("event finished", zap.String("event", event)) } else { - logger.Debug("event aborted") + em.logger.Debug("event aborted", zap.String("event", event)) } } diff --git a/processor/groupbytraceprocessor/processor.go b/processor/groupbytraceprocessor/processor.go index 93da0e1427e0c..9547991cd399f 100644 --- a/processor/groupbytraceprocessor/processor.go +++ b/processor/groupbytraceprocessor/processor.go @@ -233,7 +233,14 @@ func (sp *groupByTraceProcessor) onTraceReleased(rss []pdata.ResourceSpans) erro } stats.Record(context.Background(), mReleasedSpans.M(int64(trace.SpanCount()))) stats.Record(context.Background(), mReleasedTraces.M(1)) - return sp.nextConsumer.ConsumeTraces(context.Background(), trace) + + // Do async consuming not to block event worker + go func() { + if err := sp.nextConsumer.ConsumeTraces(context.Background(), trace); err != nil { + sp.logger.Error("consume failed", zap.Error(err)) + } + }() + return nil } func (sp *groupByTraceProcessor) onTraceRemoved(traceID pdata.TraceID) error { diff --git a/processor/groupbytraceprocessor/processor_test.go b/processor/groupbytraceprocessor/processor_test.go index 822982b9d255f..6c7b7024b96f0 100644 --- a/processor/groupbytraceprocessor/processor_test.go +++ b/processor/groupbytraceprocessor/processor_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" "go.uber.org/zap" ) @@ -637,6 +638,20 @@ func TestErrorOnProcessResourceSpansContinuesProcessing(t *testing.T) { assert.True(t, returnedError) } +func TestAsyncOnRelease(t *testing.T) { + blockCh := make(chan struct{}) + blocker := &blockingConsumer{ + blockCh: blockCh, + } + + sp := &groupByTraceProcessor{ + logger: zap.NewNop(), + nextConsumer: blocker, + } + assert.NoError(t, sp.onTraceReleased(nil)) + close(blockCh) +} + func BenchmarkConsumeTracesCompleteOnFirstBatch(b *testing.B) { // prepare config := Config{ @@ -662,6 +677,7 @@ func BenchmarkConsumeTracesCompleteOnFirstBatch(b *testing.B) { } type mockProcessor struct { + mutex sync.Mutex onTraces func(context.Context, pdata.Traces) error } @@ -669,6 +685,8 @@ var _ component.TracesProcessor = (*mockProcessor)(nil) func (m *mockProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error { if m.onTraces != nil { + m.mutex.Lock() + defer m.mutex.Unlock() return m.onTraces(ctx, td) } return nil @@ -724,6 +742,17 @@ func (st *mockStorage) shutdown() error { return nil } +type blockingConsumer struct { + blockCh <-chan struct{} +} + +var _ consumer.TracesConsumer = (*blockingConsumer)(nil) + +func (b *blockingConsumer) ConsumeTraces(context.Context, pdata.Traces) error { + <-b.blockCh + return nil +} + func simpleTraces() pdata.Traces { return simpleTracesWithID(pdata.NewTraceID([16]byte{1, 2, 3, 4})) } diff --git a/processor/groupbytraceprocessor/storage_memory.go b/processor/groupbytraceprocessor/storage_memory.go index 35645b0692455..8bcc1f3f6a0ab 100644 --- a/processor/groupbytraceprocessor/storage_memory.go +++ b/processor/groupbytraceprocessor/storage_memory.go @@ -25,7 +25,7 @@ import ( type memoryStorage struct { sync.RWMutex - content map[string][]pdata.ResourceSpans + content map[pdata.TraceID][]pdata.ResourceSpans stopped bool stoppedLock sync.RWMutex metricsCollectionInterval time.Duration @@ -35,34 +35,29 @@ var _ storage = (*memoryStorage)(nil) func newMemoryStorage() *memoryStorage { return &memoryStorage{ - content: make(map[string][]pdata.ResourceSpans), + content: make(map[pdata.TraceID][]pdata.ResourceSpans), metricsCollectionInterval: time.Second, } } func (st *memoryStorage) createOrAppend(traceID pdata.TraceID, rs pdata.ResourceSpans) error { - sTraceID := traceID.HexString() - st.Lock() defer st.Unlock() - if _, ok := st.content[sTraceID]; !ok { - st.content[sTraceID] = []pdata.ResourceSpans{} - } + // getting zero value is fine + content := st.content[traceID] newRS := pdata.NewResourceSpans() rs.CopyTo(newRS) - st.content[sTraceID] = append(st.content[sTraceID], newRS) + content = append(content, newRS) + st.content[traceID] = content return nil } func (st *memoryStorage) get(traceID pdata.TraceID) ([]pdata.ResourceSpans, error) { - sTraceID := traceID.HexString() - st.RLock() - defer st.RUnlock() - - rss, ok := st.content[sTraceID] + rss, ok := st.content[traceID] + st.RUnlock() if !ok { return nil, nil } @@ -80,21 +75,11 @@ func (st *memoryStorage) get(traceID pdata.TraceID) ([]pdata.ResourceSpans, erro // delete will return a reference to a ResourceSpans. Changes to the returned object may not be applied // to the version in the storage. func (st *memoryStorage) delete(traceID pdata.TraceID) ([]pdata.ResourceSpans, error) { - sTraceID := traceID.HexString() - st.Lock() defer st.Unlock() - rss := st.content[sTraceID] - var result []pdata.ResourceSpans - for _, rs := range rss { - newRS := pdata.NewResourceSpans() - rs.CopyTo(newRS) - result = append(result, newRS) - } - delete(st.content, sTraceID) - - return result, nil + defer delete(st.content, traceID) + return st.content[traceID], nil } func (st *memoryStorage) start() error {