From ad20df84d0bda4ab297be5842808ef0988ac3174 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 3 Nov 2022 12:10:26 -0700 Subject: [PATCH] Improve spanmetricsprocessor performance, reuse buffer for key (#16033) Signed-off-by: Bogdan Drutu --- .chloggen/spanmetricsprocessorbuf.yaml | 11 +++ processor/spanmetricsprocessor/processor.go | 79 +++++++++---------- .../spanmetricsprocessor/processor_test.go | 18 +++-- 3 files changed, 58 insertions(+), 50 deletions(-) create mode 100755 .chloggen/spanmetricsprocessorbuf.yaml diff --git a/.chloggen/spanmetricsprocessorbuf.yaml b/.chloggen/spanmetricsprocessorbuf.yaml new file mode 100755 index 0000000000000..037ec55293870 --- /dev/null +++ b/.chloggen/spanmetricsprocessorbuf.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: spanmetricsprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Improve spanmetricsprocessor performance, reuse buffer to calculate key + +# One or more tracking issues related to the change +issues: [16033] diff --git a/processor/spanmetricsprocessor/processor.go b/processor/spanmetricsprocessor/processor.go index f5a0bcd2e6740..fee13a24fcce8 100644 --- a/processor/spanmetricsprocessor/processor.go +++ b/processor/spanmetricsprocessor/processor.go @@ -15,6 +15,7 @@ package spanmetricsprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor" import ( + "bytes" "context" "fmt" "sort" @@ -79,6 +80,8 @@ type processorImp struct { histograms map[metricKey]*histogramData latencyBounds []float64 + keyBuf *bytes.Buffer + // An LRU cache of dimension key-value maps keyed by a unique identifier formed by a concatenation of its values: // e.g. { "foo/barOK": { "serviceName": "foo", "operation": "/bar", "status_code": "OK" }} metricKeyToDimensions *cache.Cache[metricKey, pcommon.Map] @@ -143,6 +146,7 @@ func newProcessor(logger *zap.Logger, config config.Processor, nextConsumer cons histograms: make(map[metricKey]*histogramData), nextConsumer: nextConsumer, dimensions: newDimensions(pConfig.Dimensions), + keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)), metricKeyToDimensions: metricKeyToDimensionsCache, }, nil } @@ -363,41 +367,36 @@ func (p *processorImp) getDimensionsByMetricKey(k metricKey) (pcommon.Map, error func (p *processorImp) aggregateMetrics(traces ptrace.Traces) { for i := 0; i < traces.ResourceSpans().Len(); i++ { rspans := traces.ResourceSpans().At(i) - attr, ok := rspans.Resource().Attributes().Get(conventions.AttributeServiceName) + resourceAttr := rspans.Resource().Attributes() + serviceAttr, ok := resourceAttr.Get(conventions.AttributeServiceName) if !ok { continue } - p.aggregateMetricsForServiceSpans(rspans, attr.Str()) - } -} - -func (p *processorImp) aggregateMetricsForServiceSpans(rspans ptrace.ResourceSpans, serviceName string) { - ilsSlice := rspans.ScopeSpans() - for j := 0; j < ilsSlice.Len(); j++ { - ils := ilsSlice.At(j) - spans := ils.Spans() - for k := 0; k < spans.Len(); k++ { - span := spans.At(k) - p.aggregateMetricsForSpan(serviceName, span, rspans.Resource().Attributes()) + serviceName := serviceAttr.Str() + ilsSlice := rspans.ScopeSpans() + for j := 0; j < ilsSlice.Len(); j++ { + ils := ilsSlice.At(j) + spans := ils.Spans() + for k := 0; k < spans.Len(); k++ { + span := spans.At(k) + // Protect against end timestamps before start timestamps. Assume 0 duration. + latencyInMilliseconds := float64(0) + startTime := span.StartTimestamp() + endTime := span.EndTimestamp() + if endTime > startTime { + latencyInMilliseconds = float64(endTime-startTime) / float64(time.Millisecond.Nanoseconds()) + } + // Always reset the buffer before re-using. + p.keyBuf.Reset() + buildKey(p.keyBuf, serviceName, span, p.dimensions, resourceAttr) + key := metricKey(p.keyBuf.String()) + p.cache(serviceName, span, key, resourceAttr) + p.updateHistogram(key, latencyInMilliseconds, span.TraceID(), span.SpanID()) + } } } } -func (p *processorImp) aggregateMetricsForSpan(serviceName string, span ptrace.Span, resourceAttr pcommon.Map) { - // Protect against end timestamps before start timestamps. Assume 0 duration. - latencyInMilliseconds := float64(0) - startTime := span.StartTimestamp() - endTime := span.EndTimestamp() - if endTime > startTime { - latencyInMilliseconds = float64(endTime-startTime) / float64(time.Millisecond.Nanoseconds()) - } - - key := buildKey(serviceName, span, p.dimensions, resourceAttr) - - p.cache(serviceName, span, key, resourceAttr) - p.updateHistogram(key, latencyInMilliseconds, span.TraceID(), span.SpanID()) -} - // resetAccumulatedMetrics resets the internal maps used to store created metric data. Also purge the cache for // metricKeyToDimensions. func (p *processorImp) resetAccumulatedMetrics() { @@ -447,13 +446,11 @@ func (p *processorImp) buildDimensionKVs(serviceName string, span ptrace.Span, r return dims } -func concatDimensionValue(metricKeyBuilder *strings.Builder, value string, prefixSep bool) { - // It's worth noting that from pprof benchmarks, WriteString is the most expensive operation of this processor. - // Specifically, the need to grow the underlying []byte slice to make room for the appended string. +func concatDimensionValue(dest *bytes.Buffer, value string, prefixSep bool) { if prefixSep { - metricKeyBuilder.WriteString(metricKeySeparator) + dest.WriteString(metricKeySeparator) } - metricKeyBuilder.WriteString(value) + dest.WriteString(value) } // buildKey builds the metric key from the service name and span metadata such as operation, kind, status_code and @@ -461,21 +458,17 @@ func concatDimensionValue(metricKeyBuilder *strings.Builder, value string, prefi // or resource attributes. If the dimension exists in both, the span's attributes, being the most specific, takes precedence. // // The metric key is a simple concatenation of dimension values, delimited by a null character. -func buildKey(serviceName string, span ptrace.Span, optionalDims []dimension, resourceAttrs pcommon.Map) metricKey { - var metricKeyBuilder strings.Builder - concatDimensionValue(&metricKeyBuilder, serviceName, false) - concatDimensionValue(&metricKeyBuilder, span.Name(), true) - concatDimensionValue(&metricKeyBuilder, traceutil.SpanKindStr(span.Kind()), true) - concatDimensionValue(&metricKeyBuilder, traceutil.StatusCodeStr(span.Status().Code()), true) +func buildKey(dest *bytes.Buffer, serviceName string, span ptrace.Span, optionalDims []dimension, resourceAttrs pcommon.Map) { + concatDimensionValue(dest, serviceName, false) + concatDimensionValue(dest, span.Name(), true) + concatDimensionValue(dest, traceutil.SpanKindStr(span.Kind()), true) + concatDimensionValue(dest, traceutil.StatusCodeStr(span.Status().Code()), true) for _, d := range optionalDims { if v, ok := getDimensionValue(d, span.Attributes(), resourceAttrs); ok { - concatDimensionValue(&metricKeyBuilder, v.AsString(), true) + concatDimensionValue(dest, v.AsString(), true) } } - - k := metricKey(metricKeyBuilder.String()) - return k } // getDimensionValue gets the dimension value for the given configured dimension. diff --git a/processor/spanmetricsprocessor/processor_test.go b/processor/spanmetricsprocessor/processor_test.go index cfcae57d6dd68..e4ae84c180228 100644 --- a/processor/spanmetricsprocessor/processor_test.go +++ b/processor/spanmetricsprocessor/processor_test.go @@ -15,6 +15,7 @@ package spanmetricsprocessor import ( + "bytes" "context" "fmt" "testing" @@ -398,6 +399,7 @@ func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, de // Add a resource attribute to test "process" attributes like IP, host, region, cluster, etc. {regionResourceAttrName, nil}, }, + keyBuf: new(bytes.Buffer), metricKeyToDimensions: metricKeyToDimensions, } } @@ -649,12 +651,14 @@ func newOTLPExporters(t *testing.T) (*otlpexporter.Config, component.MetricsExpo func TestBuildKeySameServiceOperationCharSequence(t *testing.T) { span0 := ptrace.NewSpan() span0.SetName("c") - k0 := buildKey("ab", span0, nil, pcommon.NewMap()) - + buf := &bytes.Buffer{} + buildKey(buf, "ab", span0, nil, pcommon.NewMap()) + k0 := metricKey(buf.String()) + buf.Reset() span1 := ptrace.NewSpan() span1.SetName("bc") - k1 := buildKey("a", span1, nil, pcommon.NewMap()) - + buildKey(buf, "a", span1, nil, pcommon.NewMap()) + k1 := metricKey(buf.String()) assert.NotEqual(t, k0, k1) assert.Equal(t, metricKey("ab\u0000c\u0000SPAN_KIND_UNSPECIFIED\u0000STATUS_CODE_UNSET"), k0) assert.Equal(t, metricKey("a\u0000bc\u0000SPAN_KIND_UNSPECIFIED\u0000STATUS_CODE_UNSET"), k1) @@ -727,9 +731,9 @@ func TestBuildKeyWithDimensions(t *testing.T) { span0 := ptrace.NewSpan() span0.Attributes().FromRaw(tc.spanAttrMap) span0.SetName("c") - k := buildKey("ab", span0, tc.optionalDims, resAttr) - - assert.Equal(t, metricKey(tc.wantKey), k) + buf := &bytes.Buffer{} + buildKey(buf, "ab", span0, tc.optionalDims, resAttr) + assert.Equal(t, tc.wantKey, buf.String()) }) } }