Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do onTraceReleased asynchronously in groupbytraceprocessor #1808

Merged
merged 11 commits into from
Jan 22, 2021
7 changes: 3 additions & 4 deletions processor/groupbytraceprocessor/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,13 @@ func (em *eventMachine) handleEventWithObservability(event string, do func() err
ctx, _ := tag.New(context.Background(), tag.Upsert(tag.MustNewKey("event"), event))
stats.Record(ctx, mEventLatency.M(duration.Milliseconds()))

logger := em.logger.With(zap.String("event", event))
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logger.Error("failed to process event", zap.Error(err))
em.logger.Error("failed to process event", zap.Error(err), zap.String("event", event))
}
if succeeded {
logger.Debug("event finished")
em.logger.Debug("event finished", zap.String("event", event))
} else {
logger.Debug("event aborted")
em.logger.Debug("event aborted", zap.String("event", event))
}
}

Expand Down
9 changes: 8 additions & 1 deletion processor/groupbytraceprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,14 @@ func (sp *groupByTraceProcessor) onTraceReleased(rss []pdata.ResourceSpans) erro
}
stats.Record(context.Background(), mReleasedSpans.M(int64(trace.SpanCount())))
stats.Record(context.Background(), mReleasedTraces.M(1))
return sp.nextConsumer.ConsumeTraces(context.Background(), trace)

// Do async consuming not to block event worker
go func() {
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
if err := sp.nextConsumer.ConsumeTraces(context.Background(), trace); err != nil {
sp.logger.Error("consume failed", zap.Error(err))
}
}()
return nil
}

func (sp *groupByTraceProcessor) onTraceRemoved(traceID pdata.TraceID) error {
Expand Down
29 changes: 29 additions & 0 deletions processor/groupbytraceprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -637,6 +638,20 @@ func TestErrorOnProcessResourceSpansContinuesProcessing(t *testing.T) {
assert.True(t, returnedError)
}

func TestAsyncOnRelease(t *testing.T) {
blockCh := make(chan struct{})
blocker := &blockingConsumer{
blockCh: blockCh,
}

sp := &groupByTraceProcessor{
logger: zap.NewNop(),
nextConsumer: blocker,
}
assert.NoError(t, sp.onTraceReleased(nil))
close(blockCh)
}

func BenchmarkConsumeTracesCompleteOnFirstBatch(b *testing.B) {
// prepare
config := Config{
Expand All @@ -662,13 +677,16 @@ func BenchmarkConsumeTracesCompleteOnFirstBatch(b *testing.B) {
}

type mockProcessor struct {
mutex sync.Mutex
onTraces func(context.Context, pdata.Traces) error
}

var _ component.TracesProcessor = (*mockProcessor)(nil)

func (m *mockProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
if m.onTraces != nil {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.onTraces(ctx, td)
}
return nil
Expand Down Expand Up @@ -724,6 +742,17 @@ func (st *mockStorage) shutdown() error {
return nil
}

type blockingConsumer struct {
blockCh <-chan struct{}
}

var _ consumer.TracesConsumer = (*blockingConsumer)(nil)

func (b *blockingConsumer) ConsumeTraces(context.Context, pdata.Traces) error {
<-b.blockCh
return nil
}

func simpleTraces() pdata.Traces {
return simpleTracesWithID(pdata.NewTraceID([16]byte{1, 2, 3, 4}))
}
Expand Down
35 changes: 10 additions & 25 deletions processor/groupbytraceprocessor/storage_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

type memoryStorage struct {
sync.RWMutex
content map[string][]pdata.ResourceSpans
content map[pdata.TraceID][]pdata.ResourceSpans
stopped bool
stoppedLock sync.RWMutex
metricsCollectionInterval time.Duration
Expand All @@ -35,34 +35,29 @@ var _ storage = (*memoryStorage)(nil)

func newMemoryStorage() *memoryStorage {
return &memoryStorage{
content: make(map[string][]pdata.ResourceSpans),
content: make(map[pdata.TraceID][]pdata.ResourceSpans),
metricsCollectionInterval: time.Second,
}
}

func (st *memoryStorage) createOrAppend(traceID pdata.TraceID, rs pdata.ResourceSpans) error {
sTraceID := traceID.HexString()

st.Lock()
defer st.Unlock()

if _, ok := st.content[sTraceID]; !ok {
st.content[sTraceID] = []pdata.ResourceSpans{}
}
// getting zero value is fine
content := st.content[traceID]

newRS := pdata.NewResourceSpans()
rs.CopyTo(newRS)
st.content[sTraceID] = append(st.content[sTraceID], newRS)
content = append(content, newRS)
st.content[traceID] = content

return nil
}
func (st *memoryStorage) get(traceID pdata.TraceID) ([]pdata.ResourceSpans, error) {
sTraceID := traceID.HexString()

st.RLock()
defer st.RUnlock()

rss, ok := st.content[sTraceID]
rss, ok := st.content[traceID]
st.RUnlock()
if !ok {
return nil, nil
}
Expand All @@ -80,21 +75,11 @@ func (st *memoryStorage) get(traceID pdata.TraceID) ([]pdata.ResourceSpans, erro
// delete will return a reference to a ResourceSpans. Changes to the returned object may not be applied
// to the version in the storage.
func (st *memoryStorage) delete(traceID pdata.TraceID) ([]pdata.ResourceSpans, error) {
sTraceID := traceID.HexString()

st.Lock()
defer st.Unlock()

rss := st.content[sTraceID]
var result []pdata.ResourceSpans
for _, rs := range rss {
newRS := pdata.NewResourceSpans()
rs.CopyTo(newRS)
result = append(result, newRS)
}
delete(st.content, sTraceID)

return result, nil
defer delete(st.content, traceID)
return st.content[traceID], nil
}

func (st *memoryStorage) start() error {
Expand Down