Skip to content

Commit

Permalink
setting TraceID and SpanID fields in Exemplar objects in spanmetricsp… (
Browse files Browse the repository at this point in the history
open-telemetry#13401)

setting TraceID and SpanID fields in Exemplar objects in spanmetricsprocessor and removing the use of FilteredAttributes to pass these values around.
  • Loading branch information
arun-shopify committed Oct 13, 2022
1 parent ca0f8f3 commit 8f8a634
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 71 deletions.
16 changes: 16 additions & 0 deletions .chloggen/spanmetrics_exemplars_type_fields.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetricsprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Sets TraceID and SpanID fields in Exemplar type (as per the spec) and removes the use of FilteredAttributes to pass these values around.

# One or more tracking issues related to the change
issues: [13401]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
16 changes: 10 additions & 6 deletions exporter/prometheusexporter/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,22 @@ func (c *collector) convertDoubleHistogram(metric pmetric.Metric, resourceAttrs

arrLen := ip.Exemplars().Len()
exemplars := make([]prometheus.Exemplar, arrLen)

for i := 0; i < arrLen; i++ {
e := ip.Exemplars().At(i)
exemplarLabels := make(prometheus.Labels, 0)

labels := make(prometheus.Labels, e.FilteredAttributes().Len())
e.FilteredAttributes().Range(func(k string, v pcommon.Value) bool {
labels[k] = v.AsString()
return true
})
if !e.TraceID().IsEmpty() {
exemplarLabels["trace_id"] = e.TraceID().HexString()
}

if !e.SpanID().IsEmpty() {
exemplarLabels["span_id"] = e.SpanID().HexString()
}

exemplars[i] = prometheus.Exemplar{
Value: e.DoubleValue(),
Labels: labels,
Labels: exemplarLabels,
Timestamp: e.Timestamp().AsTime(),
}
}
Expand Down
84 changes: 29 additions & 55 deletions exporter/prometheusexporter/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package prometheusexporter

import (
"encoding/hex"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -103,48 +104,33 @@ func TestConvertDoubleHistogramExemplar(t *testing.T) {
metric.SetUnit("T")

// initialize empty datapoint
hd := metric.SetEmptyHistogram().DataPoints().AppendEmpty()
histogramDataPoint := metric.SetEmptyHistogram().DataPoints().AppendEmpty()

hd.ExplicitBounds().FromRaw([]float64{5, 25, 90})
hd.BucketCounts().FromRaw([]uint64{2, 35, 70})
histogramDataPoint.ExplicitBounds().FromRaw([]float64{5, 25, 90})
histogramDataPoint.BucketCounts().FromRaw([]uint64{2, 35, 70})

exemplarTs, _ := time.Parse("unix", "Mon Jan _2 15:04:05 MST 2006")
exemplars := []prometheus.Exemplar{
{
Timestamp: exemplarTs,
Value: 3,
Labels: prometheus.Labels{"test_label_0": "label_value_0"},
},
{
Timestamp: exemplarTs,
Value: 50,
Labels: prometheus.Labels{"test_label_1": "label_value_1"},
},
{
Timestamp: exemplarTs,
Value: 78,
Labels: prometheus.Labels{"test_label_2": "label_value_2"},
},
{
Timestamp: exemplarTs,
Value: 100,
Labels: prometheus.Labels{"test_label_3": "label_value_3"},
},
}
// add test exemplar values to the metric
promExporterExemplars := histogramDataPoint.Exemplars().AppendEmpty()
var tBytes [16]byte
testTraceID, _ := hex.DecodeString("641d68e314a58152cc2581e7663435d1")
copy(tBytes[:], testTraceID)
traceID := pcommon.TraceID(tBytes)
promExporterExemplars.SetTraceID(traceID)

// add each exemplar value to the metric
for _, e := range exemplars {
pde := hd.Exemplars().AppendEmpty()
pde.SetDoubleValue(e.Value)
for k, v := range e.Labels {
pde.FilteredAttributes().PutStr(k, v)
}
pde.SetTimestamp(pcommon.NewTimestampFromTime(e.Timestamp))
}
var sBytes [8]byte
testSpanID, _ := hex.DecodeString("7436d6ac76178623")
copy(sBytes[:], testSpanID)
spanID := pcommon.SpanID(sBytes)
promExporterExemplars.SetSpanID(spanID)

exemplarTs, _ := time.Parse("unix", "Mon Jan _2 15:04:05 MST 2006")
promExporterExemplars.SetTimestamp(pcommon.NewTimestampFromTime(exemplarTs))
promExporterExemplars.SetDoubleValue(3.0)

pMap := pcommon.NewMap()

c := collector{

accumulator: &mockAccumulator{
metrics: []pmetric.Metric{metric},
resourceAttributes: pMap,
Expand All @@ -161,29 +147,17 @@ func TestConvertDoubleHistogramExemplar(t *testing.T) {

buckets := m.GetHistogram().GetBucket()

require.Equal(t, 4, len(buckets))
require.Equal(t, 3, len(buckets))

require.Equal(t, 3.0, buckets[0].GetExemplar().GetValue())
require.Equal(t, int32(128654848), buckets[0].GetExemplar().GetTimestamp().GetNanos())
require.Equal(t, 1, len(buckets[0].GetExemplar().GetLabel()))
require.Equal(t, "test_label_0", buckets[0].GetExemplar().GetLabel()[0].GetName())
require.Equal(t, "label_value_0", buckets[0].GetExemplar().GetLabel()[0].GetValue())

require.Equal(t, 0.0, buckets[1].GetExemplar().GetValue())
require.Equal(t, int32(0), buckets[1].GetExemplar().GetTimestamp().GetNanos())
require.Equal(t, 0, len(buckets[1].GetExemplar().GetLabel()))

require.Equal(t, 78.0, buckets[2].GetExemplar().GetValue())
require.Equal(t, int32(128654848), buckets[2].GetExemplar().GetTimestamp().GetNanos())
require.Equal(t, 1, len(buckets[2].GetExemplar().GetLabel()))
require.Equal(t, "test_label_2", buckets[2].GetExemplar().GetLabel()[0].GetName())
require.Equal(t, "label_value_2", buckets[2].GetExemplar().GetLabel()[0].GetValue())

require.Equal(t, 100.0, buckets[3].GetExemplar().GetValue())
require.Equal(t, int32(128654848), buckets[3].GetExemplar().GetTimestamp().GetNanos())
require.Equal(t, 1, len(buckets[3].GetExemplar().GetLabel()))
require.Equal(t, "test_label_3", buckets[3].GetExemplar().GetLabel()[0].GetName())
require.Equal(t, "label_value_3", buckets[3].GetExemplar().GetLabel()[0].GetValue())
require.Equal(t, 2, len(buckets[0].GetExemplar().GetLabel()))
ml := make(map[string]string)
for _, l := range buckets[0].GetExemplar().GetLabel() {
ml[l.GetName()] = l.GetValue()
}
require.Equal(t, "641d68e314a58152cc2581e7663435d1", ml["trace_id"])
require.Equal(t, "7436d6ac76178623", ml["span_id"])
}

// errorCheckCore keeps track of logged errors
Expand Down
11 changes: 7 additions & 4 deletions processor/spanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ const (
spanKindKey = "span.kind" // OpenTelemetry non-standard constant.
statusCodeKey = "status.code" // OpenTelemetry non-standard constant.
metricKeySeparator = string(byte(0))
traceIDKey = "trace_id"

defaultDimensionsCacheSize = 1000
)
Expand All @@ -55,6 +54,7 @@ var (

type exemplarData struct {
traceID pcommon.TraceID
spanID pcommon.SpanID
value float64
}

Expand Down Expand Up @@ -390,7 +390,7 @@ func (p *processorImp) aggregateMetricsForSpan(serviceName string, span ptrace.S
p.cache(serviceName, span, key, resourceAttr)
p.updateCallMetrics(key)
p.updateLatencyMetrics(key, latencyInMilliseconds, index)
p.updateLatencyExemplars(key, latencyInMilliseconds, span.TraceID())
p.updateLatencyExemplars(key, latencyInMilliseconds, span.TraceID(), span.SpanID())
}

// updateCallMetrics increments the call count for the given metric key.
Expand All @@ -409,13 +409,14 @@ func (p *processorImp) resetAccumulatedMetrics() {
}

// updateLatencyExemplars sets the histogram exemplars for the given metric key and append the exemplar data.
func (p *processorImp) updateLatencyExemplars(key metricKey, value float64, traceID pcommon.TraceID) {
func (p *processorImp) updateLatencyExemplars(key metricKey, value float64, traceID pcommon.TraceID, spanID pcommon.SpanID) {
if _, ok := p.latencyExemplarsData[key]; !ok {
p.latencyExemplarsData[key] = []exemplarData{}
}

e := exemplarData{
traceID: traceID,
spanID: spanID,
value: value,
}
p.latencyExemplarsData[key] = append(p.latencyExemplarsData[key], e)
Expand Down Expand Up @@ -559,6 +560,7 @@ func setLatencyExemplars(exemplarsData []exemplarData, timestamp pcommon.Timesta
for _, ed := range exemplarsData {
value := ed.value
traceID := ed.traceID
spanID := ed.spanID

exemplar := es.AppendEmpty()

Expand All @@ -568,7 +570,8 @@ func setLatencyExemplars(exemplarsData []exemplarData, timestamp pcommon.Timesta

exemplar.SetDoubleValue(value)
exemplar.SetTimestamp(timestamp)
exemplar.FilteredAttributes().PutStr(traceIDKey, traceID.HexString())
exemplar.SetTraceID(traceID)
exemplar.SetSpanID(spanID)
}

es.CopyTo(exemplars)
Expand Down
17 changes: 11 additions & 6 deletions processor/spanmetricsprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ func initSpan(span span, s ptrace.Span) {
now := time.Now()
s.SetStartTimestamp(pcommon.NewTimestampFromTime(now))
s.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(sampleLatencyDuration)))

s.Attributes().PutStr(stringAttrName, "stringAttrValue")
s.Attributes().PutInt(intAttrName, 99)
s.Attributes().PutDouble(doubleAttrName, 99.99)
Expand All @@ -630,6 +631,7 @@ func initSpan(span span, s ptrace.Span) {
s.Attributes().PutEmptyMap(mapAttrName)
s.Attributes().PutEmptySlice(arrayAttrName)
s.SetTraceID(pcommon.TraceID([16]byte{byte(42)}))
s.SetSpanID(pcommon.SpanID([8]byte{byte(42)}))
}

func newOTLPExporters(t *testing.T) (*otlpexporter.Config, component.MetricsExporter, component.TracesExporter) {
Expand Down Expand Up @@ -841,21 +843,23 @@ func TestSetLatencyExemplars(t *testing.T) {
// ----- conditions -------------------------------------------------------
traces := buildSampleTrace()
traceID := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID()
spanID := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SpanID()
exemplarSlice := pmetric.NewExemplarSlice()
timestamp := pcommon.NewTimestampFromTime(time.Now())
value := float64(42)

ed := []exemplarData{{traceID: traceID, value: value}}
ed := []exemplarData{{traceID: traceID, spanID: spanID, value: value}}

// ----- call -------------------------------------------------------------
setLatencyExemplars(ed, timestamp, exemplarSlice)

// ----- verify -----------------------------------------------------------
traceIDValue, exist := exemplarSlice.At(0).FilteredAttributes().Get(traceIDKey)
traceIDValue := exemplarSlice.At(0).TraceID()
spanIDValue := exemplarSlice.At(0).SpanID()

assert.NotEmpty(t, exemplarSlice)
assert.True(t, exist)
assert.Equal(t, traceIDValue.AsString(), traceID.HexString())
assert.Equal(t, traceIDValue, traceID)
assert.Equal(t, spanIDValue, spanID)
assert.Equal(t, exemplarSlice.At(0).Timestamp(), timestamp)
assert.Equal(t, exemplarSlice.At(0).DoubleValue(), value)
}
Expand All @@ -866,18 +870,19 @@ func TestProcessorUpdateLatencyExemplars(t *testing.T) {
cfg := factory.CreateDefaultConfig().(*Config)
traces := buildSampleTrace()
traceID := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID()
spanID := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SpanID()
key := metricKey("metricKey")
next := new(consumertest.TracesSink)
p, err := newProcessor(zaptest.NewLogger(t), cfg, next)
value := float64(42)

// ----- call -------------------------------------------------------------
p.updateLatencyExemplars(key, value, traceID)
p.updateLatencyExemplars(key, value, traceID, spanID)

// ----- verify -----------------------------------------------------------
assert.NoError(t, err)
assert.NotEmpty(t, p.latencyExemplarsData[key])
assert.Equal(t, p.latencyExemplarsData[key][0], exemplarData{traceID: traceID, value: value})
assert.Equal(t, p.latencyExemplarsData[key][0], exemplarData{traceID: traceID, spanID: spanID, value: value})
}

func TestProcessorResetExemplarData(t *testing.T) {
Expand Down

0 comments on commit 8f8a634

Please sign in to comment.