Skip to content

Commit

Permalink
setting TraceID and SpanID fields in Exemplar objects in spanmetricsp…
Browse files Browse the repository at this point in the history
…rocessor and removing the use of FilteredAttributes to pass these values around.
  • Loading branch information
arun-shopify committed Oct 6, 2022
1 parent e80df4f commit 5c38543
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 71 deletions.
16 changes: 16 additions & 0 deletions .chloggen/spanmetrics_exemplars_type_fields.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetricsprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Sets TraceID and SpanID fields in Exemplar type (as per the spec) and removes the use of FilteredAttributes to pass these values around.

# One or more tracking issues related to the change
issues: [13401]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
16 changes: 10 additions & 6 deletions exporter/prometheusexporter/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,22 @@ func (c *collector) convertDoubleHistogram(metric pmetric.Metric, resourceAttrs

arrLen := ip.Exemplars().Len()
exemplars := make([]prometheus.Exemplar, arrLen)

for i := 0; i < arrLen; i++ {
e := ip.Exemplars().At(i)
exemplarLabels := make(prometheus.Labels, 0)

labels := make(prometheus.Labels, e.FilteredAttributes().Len())
e.FilteredAttributes().Range(func(k string, v pcommon.Value) bool {
labels[k] = v.AsString()
return true
})
if !e.TraceID().IsEmpty() {
exemplarLabels["trace_id"] = e.TraceID().HexString()
}

if !e.SpanID().IsEmpty() {
exemplarLabels["span_id"] = e.SpanID().HexString()
}

exemplars[i] = prometheus.Exemplar{
Value: e.DoubleValue(),
Labels: labels,
Labels: exemplarLabels,
Timestamp: e.Timestamp().AsTime(),
}
}
Expand Down
84 changes: 29 additions & 55 deletions exporter/prometheusexporter/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package prometheusexporter

import (
"encoding/hex"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -103,48 +104,33 @@ func TestConvertDoubleHistogramExemplar(t *testing.T) {
metric.SetUnit("T")

// initialize empty datapoint
hd := metric.SetEmptyHistogram().DataPoints().AppendEmpty()
histogramDataPoint := metric.SetEmptyHistogram().DataPoints().AppendEmpty()

hd.ExplicitBounds().FromRaw([]float64{5, 25, 90})
hd.BucketCounts().FromRaw([]uint64{2, 35, 70})
histogramDataPoint.ExplicitBounds().FromRaw([]float64{5, 25, 90})
histogramDataPoint.BucketCounts().FromRaw([]uint64{2, 35, 70})

exemplarTs, _ := time.Parse("unix", "Mon Jan _2 15:04:05 MST 2006")
exemplars := []prometheus.Exemplar{
{
Timestamp: exemplarTs,
Value: 3,
Labels: prometheus.Labels{"test_label_0": "label_value_0"},
},
{
Timestamp: exemplarTs,
Value: 50,
Labels: prometheus.Labels{"test_label_1": "label_value_1"},
},
{
Timestamp: exemplarTs,
Value: 78,
Labels: prometheus.Labels{"test_label_2": "label_value_2"},
},
{
Timestamp: exemplarTs,
Value: 100,
Labels: prometheus.Labels{"test_label_3": "label_value_3"},
},
}
// add test exemplar values to the metric
promExporterExemplars := histogramDataPoint.Exemplars().AppendEmpty()
var tBytes [16]byte
testTraceID, _ := hex.DecodeString("641d68e314a58152cc2581e7663435d1")
copy(tBytes[:], testTraceID)
traceID := pcommon.TraceID(tBytes)
promExporterExemplars.SetTraceID(traceID)

// add each exemplar value to the metric
for _, e := range exemplars {
pde := hd.Exemplars().AppendEmpty()
pde.SetDoubleValue(e.Value)
for k, v := range e.Labels {
pde.FilteredAttributes().PutString(k, v)
}
pde.SetTimestamp(pcommon.NewTimestampFromTime(e.Timestamp))
}
var sBytes [8]byte
testSpanID, _ := hex.DecodeString("7436d6ac76178623")
copy(sBytes[:], testSpanID)
spanID := pcommon.SpanID(sBytes)
promExporterExemplars.SetSpanID(spanID)

exemplarTs, _ := time.Parse("unix", "Mon Jan _2 15:04:05 MST 2006")
promExporterExemplars.SetTimestamp(pcommon.NewTimestampFromTime(exemplarTs))
promExporterExemplars.SetDoubleValue(3.0)

pMap := pcommon.NewMap()

c := collector{

accumulator: &mockAccumulator{
metrics: []pmetric.Metric{metric},
resourceAttributes: pMap,
Expand All @@ -161,29 +147,17 @@ func TestConvertDoubleHistogramExemplar(t *testing.T) {

buckets := m.GetHistogram().GetBucket()

require.Equal(t, 4, len(buckets))
require.Equal(t, 3, len(buckets))

require.Equal(t, 3.0, buckets[0].GetExemplar().GetValue())
require.Equal(t, int32(128654848), buckets[0].GetExemplar().GetTimestamp().GetNanos())
require.Equal(t, 1, len(buckets[0].GetExemplar().GetLabel()))
require.Equal(t, "test_label_0", buckets[0].GetExemplar().GetLabel()[0].GetName())
require.Equal(t, "label_value_0", buckets[0].GetExemplar().GetLabel()[0].GetValue())

require.Equal(t, 0.0, buckets[1].GetExemplar().GetValue())
require.Equal(t, int32(0), buckets[1].GetExemplar().GetTimestamp().GetNanos())
require.Equal(t, 0, len(buckets[1].GetExemplar().GetLabel()))

require.Equal(t, 78.0, buckets[2].GetExemplar().GetValue())
require.Equal(t, int32(128654848), buckets[2].GetExemplar().GetTimestamp().GetNanos())
require.Equal(t, 1, len(buckets[2].GetExemplar().GetLabel()))
require.Equal(t, "test_label_2", buckets[2].GetExemplar().GetLabel()[0].GetName())
require.Equal(t, "label_value_2", buckets[2].GetExemplar().GetLabel()[0].GetValue())

require.Equal(t, 100.0, buckets[3].GetExemplar().GetValue())
require.Equal(t, int32(128654848), buckets[3].GetExemplar().GetTimestamp().GetNanos())
require.Equal(t, 1, len(buckets[3].GetExemplar().GetLabel()))
require.Equal(t, "test_label_3", buckets[3].GetExemplar().GetLabel()[0].GetName())
require.Equal(t, "label_value_3", buckets[3].GetExemplar().GetLabel()[0].GetValue())
require.Equal(t, 2, len(buckets[0].GetExemplar().GetLabel()))
ml := make(map[string]string)
for _, l := range buckets[0].GetExemplar().GetLabel() {
ml[l.GetName()] = l.GetValue()
}
require.Equal(t, "641d68e314a58152cc2581e7663435d1", ml["trace_id"])
require.Equal(t, "7436d6ac76178623", ml["span_id"])
}

// errorCheckCore keeps track of logged errors
Expand Down
12 changes: 8 additions & 4 deletions processor/spanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ const (
spanKindKey = "span.kind" // OpenTelemetry non-standard constant.
statusCodeKey = "status.code" // OpenTelemetry non-standard constant.
metricKeySeparator = string(byte(0))
traceIDKey = "trace_id"

defaultDimensionsCacheSize = 1000
)
Expand All @@ -55,6 +54,7 @@ var (

type exemplarData struct {
traceID pcommon.TraceID
spanID pcommon.SpanID
value float64
}

Expand Down Expand Up @@ -390,7 +390,7 @@ func (p *processorImp) aggregateMetricsForSpan(serviceName string, span ptrace.S
p.cache(serviceName, span, key, resourceAttr)
p.updateCallMetrics(key)
p.updateLatencyMetrics(key, latencyInMilliseconds, index)
p.updateLatencyExemplars(key, latencyInMilliseconds, span.TraceID())
p.updateLatencyExemplars(key, latencyInMilliseconds, span.TraceID(), span.SpanID())
}

// updateCallMetrics increments the call count for the given metric key.
Expand All @@ -409,13 +409,14 @@ func (p *processorImp) resetAccumulatedMetrics() {
}

// updateLatencyExemplars sets the histogram exemplars for the given metric key and append the exemplar data.
func (p *processorImp) updateLatencyExemplars(key metricKey, value float64, traceID pcommon.TraceID) {
func (p *processorImp) updateLatencyExemplars(key metricKey, value float64, traceID pcommon.TraceID, spanID pcommon.SpanID) {
if _, ok := p.latencyExemplarsData[key]; !ok {
p.latencyExemplarsData[key] = []exemplarData{}
}

e := exemplarData{
traceID: traceID,
spanID: spanID,
value: value,
}
p.latencyExemplarsData[key] = append(p.latencyExemplarsData[key], e)
Expand Down Expand Up @@ -559,6 +560,7 @@ func setLatencyExemplars(exemplarsData []exemplarData, timestamp pcommon.Timesta
for _, ed := range exemplarsData {
value := ed.value
traceID := ed.traceID
spanID := ed.spanID

exemplar := es.AppendEmpty()

Expand All @@ -567,8 +569,10 @@ func setLatencyExemplars(exemplarsData []exemplarData, timestamp pcommon.Timesta
}

exemplar.SetDoubleValue(value)

exemplar.SetTimestamp(timestamp)
exemplar.FilteredAttributes().PutString(traceIDKey, traceID.HexString())
exemplar.SetTraceID(traceID)
exemplar.SetSpanID(spanID)
}

es.CopyTo(exemplars)
Expand Down
17 changes: 11 additions & 6 deletions processor/spanmetricsprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ func initSpan(span span, s ptrace.Span) {
now := time.Now()
s.SetStartTimestamp(pcommon.NewTimestampFromTime(now))
s.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(sampleLatencyDuration)))

s.Attributes().PutString(stringAttrName, "stringAttrValue")
s.Attributes().PutInt(intAttrName, 99)
s.Attributes().PutDouble(doubleAttrName, 99.99)
Expand All @@ -630,6 +631,7 @@ func initSpan(span span, s ptrace.Span) {
s.Attributes().PutEmptyMap(mapAttrName)
s.Attributes().PutEmptySlice(arrayAttrName)
s.SetTraceID(pcommon.TraceID([16]byte{byte(42)}))
s.SetSpanID(pcommon.SpanID([8]byte{byte(42)}))
}

func newOTLPExporters(t *testing.T) (*otlpexporter.Config, component.MetricsExporter, component.TracesExporter) {
Expand Down Expand Up @@ -841,21 +843,23 @@ func TestSetLatencyExemplars(t *testing.T) {
// ----- conditions -------------------------------------------------------
traces := buildSampleTrace()
traceID := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID()
spanID := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SpanID()
exemplarSlice := pmetric.NewExemplarSlice()
timestamp := pcommon.NewTimestampFromTime(time.Now())
value := float64(42)

ed := []exemplarData{{traceID: traceID, value: value}}
ed := []exemplarData{{traceID: traceID, spanID: spanID, value: value}}

// ----- call -------------------------------------------------------------
setLatencyExemplars(ed, timestamp, exemplarSlice)

// ----- verify -----------------------------------------------------------
traceIDValue, exist := exemplarSlice.At(0).FilteredAttributes().Get(traceIDKey)
traceIDValue := exemplarSlice.At(0).TraceID()
spanIDValue := exemplarSlice.At(0).SpanID()

assert.NotEmpty(t, exemplarSlice)
assert.True(t, exist)
assert.Equal(t, traceIDValue.AsString(), traceID.HexString())
assert.Equal(t, traceIDValue, traceID)
assert.Equal(t, spanIDValue, spanID)
assert.Equal(t, exemplarSlice.At(0).Timestamp(), timestamp)
assert.Equal(t, exemplarSlice.At(0).DoubleValue(), value)
}
Expand All @@ -866,18 +870,19 @@ func TestProcessorUpdateLatencyExemplars(t *testing.T) {
cfg := factory.CreateDefaultConfig().(*Config)
traces := buildSampleTrace()
traceID := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID()
spanID := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SpanID()
key := metricKey("metricKey")
next := new(consumertest.TracesSink)
p, err := newProcessor(zaptest.NewLogger(t), cfg, next)
value := float64(42)

// ----- call -------------------------------------------------------------
p.updateLatencyExemplars(key, value, traceID)
p.updateLatencyExemplars(key, value, traceID, spanID)

// ----- verify -----------------------------------------------------------
assert.NoError(t, err)
assert.NotEmpty(t, p.latencyExemplarsData[key])
assert.Equal(t, p.latencyExemplarsData[key][0], exemplarData{traceID: traceID, value: value})
assert.Equal(t, p.latencyExemplarsData[key][0], exemplarData{traceID: traceID, spanID: spanID, value: value})
}

func TestProcessorResetExemplarData(t *testing.T) {
Expand Down

0 comments on commit 5c38543

Please sign in to comment.