Skip to content

Commit

Permalink
Improve spanmetricsprocessor performance, reuse buffer for key (#16033)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Nov 3, 2022
1 parent c979d5c commit ad20df8
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 50 deletions.
11 changes: 11 additions & 0 deletions .chloggen/spanmetricsprocessorbuf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetricsprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Improve spanmetricsprocessor performance, reuse buffer to calculate key

# One or more tracking issues related to the change
issues: [16033]
79 changes: 36 additions & 43 deletions processor/spanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package spanmetricsprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor"

import (
"bytes"
"context"
"fmt"
"sort"
Expand Down Expand Up @@ -79,6 +80,8 @@ type processorImp struct {
histograms map[metricKey]*histogramData
latencyBounds []float64

keyBuf *bytes.Buffer

// An LRU cache of dimension key-value maps keyed by a unique identifier formed by a concatenation of its values:
// e.g. { "foo/barOK": { "serviceName": "foo", "operation": "/bar", "status_code": "OK" }}
metricKeyToDimensions *cache.Cache[metricKey, pcommon.Map]
Expand Down Expand Up @@ -143,6 +146,7 @@ func newProcessor(logger *zap.Logger, config config.Processor, nextConsumer cons
histograms: make(map[metricKey]*histogramData),
nextConsumer: nextConsumer,
dimensions: newDimensions(pConfig.Dimensions),
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
metricKeyToDimensions: metricKeyToDimensionsCache,
}, nil
}
Expand Down Expand Up @@ -363,41 +367,36 @@ func (p *processorImp) getDimensionsByMetricKey(k metricKey) (pcommon.Map, error
func (p *processorImp) aggregateMetrics(traces ptrace.Traces) {
for i := 0; i < traces.ResourceSpans().Len(); i++ {
rspans := traces.ResourceSpans().At(i)
attr, ok := rspans.Resource().Attributes().Get(conventions.AttributeServiceName)
resourceAttr := rspans.Resource().Attributes()
serviceAttr, ok := resourceAttr.Get(conventions.AttributeServiceName)
if !ok {
continue
}
p.aggregateMetricsForServiceSpans(rspans, attr.Str())
}
}

func (p *processorImp) aggregateMetricsForServiceSpans(rspans ptrace.ResourceSpans, serviceName string) {
ilsSlice := rspans.ScopeSpans()
for j := 0; j < ilsSlice.Len(); j++ {
ils := ilsSlice.At(j)
spans := ils.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
p.aggregateMetricsForSpan(serviceName, span, rspans.Resource().Attributes())
serviceName := serviceAttr.Str()
ilsSlice := rspans.ScopeSpans()
for j := 0; j < ilsSlice.Len(); j++ {
ils := ilsSlice.At(j)
spans := ils.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
// Protect against end timestamps before start timestamps. Assume 0 duration.
latencyInMilliseconds := float64(0)
startTime := span.StartTimestamp()
endTime := span.EndTimestamp()
if endTime > startTime {
latencyInMilliseconds = float64(endTime-startTime) / float64(time.Millisecond.Nanoseconds())
}
// Always reset the buffer before re-using.
p.keyBuf.Reset()
buildKey(p.keyBuf, serviceName, span, p.dimensions, resourceAttr)
key := metricKey(p.keyBuf.String())
p.cache(serviceName, span, key, resourceAttr)
p.updateHistogram(key, latencyInMilliseconds, span.TraceID(), span.SpanID())
}
}
}
}

func (p *processorImp) aggregateMetricsForSpan(serviceName string, span ptrace.Span, resourceAttr pcommon.Map) {
// Protect against end timestamps before start timestamps. Assume 0 duration.
latencyInMilliseconds := float64(0)
startTime := span.StartTimestamp()
endTime := span.EndTimestamp()
if endTime > startTime {
latencyInMilliseconds = float64(endTime-startTime) / float64(time.Millisecond.Nanoseconds())
}

key := buildKey(serviceName, span, p.dimensions, resourceAttr)

p.cache(serviceName, span, key, resourceAttr)
p.updateHistogram(key, latencyInMilliseconds, span.TraceID(), span.SpanID())
}

// resetAccumulatedMetrics resets the internal maps used to store created metric data. Also purge the cache for
// metricKeyToDimensions.
func (p *processorImp) resetAccumulatedMetrics() {
Expand Down Expand Up @@ -447,35 +446,29 @@ func (p *processorImp) buildDimensionKVs(serviceName string, span ptrace.Span, r
return dims
}

func concatDimensionValue(metricKeyBuilder *strings.Builder, value string, prefixSep bool) {
// It's worth noting that from pprof benchmarks, WriteString is the most expensive operation of this processor.
// Specifically, the need to grow the underlying []byte slice to make room for the appended string.
func concatDimensionValue(dest *bytes.Buffer, value string, prefixSep bool) {
if prefixSep {
metricKeyBuilder.WriteString(metricKeySeparator)
dest.WriteString(metricKeySeparator)
}
metricKeyBuilder.WriteString(value)
dest.WriteString(value)
}

// buildKey builds the metric key from the service name and span metadata such as operation, kind, status_code and
// will attempt to add any additional dimensions the user has configured that match the span's attributes
// or resource attributes. If the dimension exists in both, the span's attributes, being the most specific, takes precedence.
//
// The metric key is a simple concatenation of dimension values, delimited by a null character.
func buildKey(serviceName string, span ptrace.Span, optionalDims []dimension, resourceAttrs pcommon.Map) metricKey {
var metricKeyBuilder strings.Builder
concatDimensionValue(&metricKeyBuilder, serviceName, false)
concatDimensionValue(&metricKeyBuilder, span.Name(), true)
concatDimensionValue(&metricKeyBuilder, traceutil.SpanKindStr(span.Kind()), true)
concatDimensionValue(&metricKeyBuilder, traceutil.StatusCodeStr(span.Status().Code()), true)
func buildKey(dest *bytes.Buffer, serviceName string, span ptrace.Span, optionalDims []dimension, resourceAttrs pcommon.Map) {
concatDimensionValue(dest, serviceName, false)
concatDimensionValue(dest, span.Name(), true)
concatDimensionValue(dest, traceutil.SpanKindStr(span.Kind()), true)
concatDimensionValue(dest, traceutil.StatusCodeStr(span.Status().Code()), true)

for _, d := range optionalDims {
if v, ok := getDimensionValue(d, span.Attributes(), resourceAttrs); ok {
concatDimensionValue(&metricKeyBuilder, v.AsString(), true)
concatDimensionValue(dest, v.AsString(), true)
}
}

k := metricKey(metricKeyBuilder.String())
return k
}

// getDimensionValue gets the dimension value for the given configured dimension.
Expand Down
18 changes: 11 additions & 7 deletions processor/spanmetricsprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package spanmetricsprocessor

import (
"bytes"
"context"
"fmt"
"testing"
Expand Down Expand Up @@ -398,6 +399,7 @@ func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, de
// Add a resource attribute to test "process" attributes like IP, host, region, cluster, etc.
{regionResourceAttrName, nil},
},
keyBuf: new(bytes.Buffer),
metricKeyToDimensions: metricKeyToDimensions,
}
}
Expand Down Expand Up @@ -649,12 +651,14 @@ func newOTLPExporters(t *testing.T) (*otlpexporter.Config, component.MetricsExpo
func TestBuildKeySameServiceOperationCharSequence(t *testing.T) {
span0 := ptrace.NewSpan()
span0.SetName("c")
k0 := buildKey("ab", span0, nil, pcommon.NewMap())

buf := &bytes.Buffer{}
buildKey(buf, "ab", span0, nil, pcommon.NewMap())
k0 := metricKey(buf.String())
buf.Reset()
span1 := ptrace.NewSpan()
span1.SetName("bc")
k1 := buildKey("a", span1, nil, pcommon.NewMap())

buildKey(buf, "a", span1, nil, pcommon.NewMap())
k1 := metricKey(buf.String())
assert.NotEqual(t, k0, k1)
assert.Equal(t, metricKey("ab\u0000c\u0000SPAN_KIND_UNSPECIFIED\u0000STATUS_CODE_UNSET"), k0)
assert.Equal(t, metricKey("a\u0000bc\u0000SPAN_KIND_UNSPECIFIED\u0000STATUS_CODE_UNSET"), k1)
Expand Down Expand Up @@ -727,9 +731,9 @@ func TestBuildKeyWithDimensions(t *testing.T) {
span0 := ptrace.NewSpan()
span0.Attributes().FromRaw(tc.spanAttrMap)
span0.SetName("c")
k := buildKey("ab", span0, tc.optionalDims, resAttr)

assert.Equal(t, metricKey(tc.wantKey), k)
buf := &bytes.Buffer{}
buildKey(buf, "ab", span0, tc.optionalDims, resAttr)
assert.Equal(t, tc.wantKey, buf.String())
})
}
}
Expand Down

0 comments on commit ad20df8

Please sign in to comment.