Skip to content

Commit

Permalink
[spanmetrics] Do not drop metric data points on attributes cache miss. (
Browse files Browse the repository at this point in the history
open-telemetry#18725)

* [spanmetrics] Do not drop metric data points on attributes cache miss.

Componetns:
- spanmetrics processor
- spanmetrics connector
  • Loading branch information
kovrus committed Feb 20, 2023
1 parent 3103c41 commit 104bc96
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 153 deletions.
16 changes: 16 additions & 0 deletions .chloggen/spanmetrics-cache-miss.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetrics

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Do not drop metric data points on attributes cache misses.

# One or more tracking issues related to the change
issues: [18725]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
132 changes: 64 additions & 68 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var defaultLatencyHistogramBucketsMs = []float64{
2, 4, 6, 8, 10, 50, 100, 200, 400, 800, 1000, 1400, 2000, 5000, 10_000, 15_000,
}

type exemplarData struct {
type exemplar struct {
traceID pcommon.TraceID
spanID pcommon.SpanID
value float64
Expand All @@ -76,7 +76,7 @@ type connectorImp struct {
startTimestamp pcommon.Timestamp

// Histogram.
histograms map[metricKey]*histogramData
histograms map[metricKey]*histogram
latencyBounds []float64

keyBuf *bytes.Buffer
Expand Down Expand Up @@ -112,11 +112,25 @@ func newDimensions(cfgDims []Dimension) []dimension {
return dims
}

type histogramData struct {
count uint64
sum float64
bucketCounts []uint64
exemplarsData []exemplarData
type histogram struct {
attributes pcommon.Map

count uint64
sum float64
bucketCounts []uint64
exemplars []exemplar

latencyBounds []float64
}

// observe a measurement and adds an exemplar.
func (h *histogram) observe(latencyMs float64, traceID pcommon.TraceID, spanID pcommon.SpanID) {
h.sum += latencyMs
h.count++
// Binary search to find the latencyMs bucket index.
index := sort.SearchFloat64s(h.latencyBounds, latencyMs)
h.bucketCounts[index]++
h.exemplars = append(h.exemplars, exemplar{traceID: traceID, spanID: spanID, value: latencyMs})
}

func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Ticker) (*connectorImp, error) {
Expand Down Expand Up @@ -148,7 +162,7 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
config: *pConfig,
startTimestamp: pcommon.NewTimestampFromTime(time.Now()),
latencyBounds: bounds,
histograms: make(map[metricKey]*histogramData),
histograms: make(map[metricKey]*histogram),
dimensions: newDimensions(pConfig.Dimensions),
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
metricKeyToDimensions: metricKeyToDimensionsCache,
Expand Down Expand Up @@ -201,7 +215,7 @@ func validateDimensions(dimensions []Dimension, skipSanitizeLabel bool) error {
}

// Start implements the component.Component interface.
func (p *connectorImp) Start(ctx context.Context, host component.Host) error {
func (p *connectorImp) Start(ctx context.Context, _ component.Host) error {
p.logger.Info("Starting spanmetricsconnector")

p.started = true
Expand Down Expand Up @@ -241,7 +255,7 @@ func (p *connectorImp) Capabilities() consumer.Capabilities {
// ConsumeTraces implements the consumer.Traces interface.
// It aggregates the trace data to generate metrics, forwarding these metrics to the discovered metrics exporter.
// The original input trace data will be forwarded to the next consumer, unmodified.
func (p *connectorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
func (p *connectorImp) ConsumeTraces(_ context.Context, traces ptrace.Traces) error {
p.lock.Lock()
p.aggregateMetrics(traces)
p.lock.Unlock()
Expand All @@ -255,11 +269,11 @@ func (p *connectorImp) exportMetrics(ctx context.Context) {

// Exemplars are only relevant to this batch of traces, so must be cleared within the lock,
// regardless of error while building metrics, before the next batch of spans is received.
p.resetExemplarData()
p.resetExemplars()

// If delta metrics, reset accumulated data
if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
p.histograms = make(map[metricKey]*histogramData)
p.histograms = make(map[metricKey]*histogram)
p.metricKeyToDimensions.Purge()
} else {
p.metricKeyToDimensions.RemoveEvictedItems()
Expand Down Expand Up @@ -297,21 +311,16 @@ func (p *connectorImp) collectLatencyMetrics(ilm pmetric.ScopeMetrics) {
dps := mLatency.Histogram().DataPoints()
dps.EnsureCapacity(len(p.histograms))
timestamp := pcommon.NewTimestampFromTime(time.Now())
for key, hist := range p.histograms {
dimensions, ok := p.metricKeyToDimensions.Get(key)
if !ok {
p.logger.Warn("Metric key not found in cache; consider increasing the dimensions_cache_size", zap.String("key", string(key)))
continue
}
for _, hist := range p.histograms {
dpLatency := dps.AppendEmpty()
dpLatency.SetStartTimestamp(p.startTimestamp)
dpLatency.SetTimestamp(timestamp)
dpLatency.ExplicitBounds().FromRaw(p.latencyBounds)
dpLatency.BucketCounts().FromRaw(hist.bucketCounts)
dpLatency.SetCount(hist.count)
dpLatency.SetSum(hist.sum)
setExemplars(hist.exemplarsData, timestamp, dpLatency.Exemplars())
dimensions.CopyTo(dpLatency.Attributes())
setExemplars(hist.exemplars, timestamp, dpLatency.Exemplars())
hist.attributes.CopyTo(dpLatency.Attributes())
}
}

Expand All @@ -325,18 +334,12 @@ func (p *connectorImp) collectCallMetrics(ilm pmetric.ScopeMetrics) {
dps := mCalls.Sum().DataPoints()
dps.EnsureCapacity(len(p.histograms))
timestamp := pcommon.NewTimestampFromTime(time.Now())
for key, hist := range p.histograms {
dimensions, ok := p.metricKeyToDimensions.Get(key)
if !ok {
p.logger.Warn("Metric key not found in cache; consider increasing the dimensions_cache_size", zap.String("key", string(key)))
continue
}

for _, hist := range p.histograms {
dpCalls := dps.AppendEmpty()
dpCalls.SetStartTimestamp(p.startTimestamp)
dpCalls.SetTimestamp(timestamp)
dpCalls.SetIntValue(int64(hist.count))
dimensions.CopyTo(dpCalls.Attributes())
hist.attributes.CopyTo(dpCalls.Attributes())
}
}

Expand All @@ -360,63 +363,67 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
// Protect against end timestamps before start timestamps. Assume 0 duration.
latencyInMilliseconds := float64(0)
latencyMs := float64(0)
startTime := span.StartTimestamp()
endTime := span.EndTimestamp()
if endTime > startTime {
latencyInMilliseconds = float64(endTime-startTime) / float64(time.Millisecond.Nanoseconds())
latencyMs = float64(endTime-startTime) / float64(time.Millisecond.Nanoseconds())
}
// Always reset the buffer before re-using.
p.keyBuf.Reset()
buildKey(p.keyBuf, serviceName, span, p.dimensions, resourceAttr)
key := metricKey(p.keyBuf.String())
p.cache(serviceName, span, key, resourceAttr)
p.updateHistogram(key, latencyInMilliseconds, span.TraceID(), span.SpanID())

attributes, ok := p.metricKeyToDimensions.Get(key)
if !ok {
attributes = p.buildAttributes(serviceName, span, resourceAttr)
p.metricKeyToDimensions.Add(key, attributes)
}

h := p.getOrCreateHistogram(key, attributes)
h.observe(latencyMs, span.TraceID(), span.SpanID())
}
}
}
}

// updateHistogram adds the histogram sample to the histogram defined by the metric key.
func (p *connectorImp) updateHistogram(key metricKey, latency float64, traceID pcommon.TraceID, spanID pcommon.SpanID) {
histo, ok := p.histograms[key]
func (p *connectorImp) getOrCreateHistogram(k metricKey, attr pcommon.Map) *histogram {
h, ok := p.histograms[k]
if !ok {
histo = &histogramData{
bucketCounts: make([]uint64, len(p.latencyBounds)+1),
h = &histogram{
attributes: attr,
bucketCounts: make([]uint64, len(p.latencyBounds)+1),
latencyBounds: p.latencyBounds,
exemplars: []exemplar{},
}
p.histograms[key] = histo
p.histograms[k] = h
}

histo.sum += latency
histo.count++
// Binary search to find the latencyInMilliseconds bucket index.
index := sort.SearchFloat64s(p.latencyBounds, latency)
histo.bucketCounts[index]++
histo.exemplarsData = append(histo.exemplarsData, exemplarData{traceID: traceID, spanID: spanID, value: latency})
return h
}

// resetExemplarData resets the entire exemplars map so the next trace will recreate all
// resetExemplars resets the entire exemplars map so the next trace will recreate all
// the data structure. An exemplar is a punctual value that exists at specific moment in time
// and should be not considered like a metrics that persist over time.
func (p *connectorImp) resetExemplarData() {
func (p *connectorImp) resetExemplars() {
for _, histo := range p.histograms {
histo.exemplarsData = nil
histo.exemplars = nil
}
}

func (p *connectorImp) buildDimensionKVs(serviceName string, span ptrace.Span, resourceAttrs pcommon.Map) pcommon.Map {
dims := pcommon.NewMap()
dims.EnsureCapacity(4 + len(p.dimensions))
dims.PutStr(serviceNameKey, serviceName)
dims.PutStr(spanNameKey, span.Name())
dims.PutStr(spanKindKey, traceutil.SpanKindStr(span.Kind()))
dims.PutStr(statusCodeKey, traceutil.StatusCodeStr(span.Status().Code()))
func (p *connectorImp) buildAttributes(serviceName string, span ptrace.Span, resourceAttrs pcommon.Map) pcommon.Map {
attr := pcommon.NewMap()
attr.EnsureCapacity(4 + len(p.dimensions))
attr.PutStr(serviceNameKey, serviceName)
attr.PutStr(spanNameKey, span.Name())
attr.PutStr(spanKindKey, traceutil.SpanKindStr(span.Kind()))
attr.PutStr(statusCodeKey, traceutil.StatusCodeStr(span.Status().Code()))
for _, d := range p.dimensions {
if v, ok := getDimensionValue(d, span.Attributes(), resourceAttrs); ok {
v.CopyTo(dims.PutEmpty(d.name))
v.CopyTo(attr.PutEmpty(d.name))
}
}
return dims
return attr
}

func concatDimensionValue(dest *bytes.Buffer, value string, prefixSep bool) {
Expand Down Expand Up @@ -466,17 +473,6 @@ func getDimensionValue(d dimension, spanAttr pcommon.Map, resourceAttr pcommon.M
return v, ok
}

// cache the dimension key-value map for the metricKey if there is a cache miss.
// This enables a lookup of the dimension key-value map when constructing the metric like so:
//
// LabelsMap().InitFromMap(p.metricKeyToDimensions[key])
func (p *connectorImp) cache(serviceName string, span ptrace.Span, k metricKey, resourceAttrs pcommon.Map) {
// Use Get to ensure any existing key has its recent-ness updated.
if _, has := p.metricKeyToDimensions.Get(k); !has {
p.metricKeyToDimensions.Add(k, p.buildDimensionKVs(serviceName, span, resourceAttrs))
}
}

// copied from prometheus-go-metric-exporter
// sanitize replaces non-alphanumeric characters with underscores in s.
func sanitize(s string, skipSanitizeLabel bool) string {
Expand Down Expand Up @@ -513,7 +509,7 @@ func sanitizeRune(r rune) rune {
}

// setExemplars sets the histogram exemplars.
func setExemplars(exemplarsData []exemplarData, timestamp pcommon.Timestamp, exemplars pmetric.ExemplarSlice) {
func setExemplars(exemplarsData []exemplar, timestamp pcommon.Timestamp, exemplars pmetric.ExemplarSlice) {
es := pmetric.NewExemplarSlice()
es.EnsureCapacity(len(exemplarsData))

Expand Down
26 changes: 14 additions & 12 deletions connector/spanmetricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func TestSetExemplars(t *testing.T) {
timestamp := pcommon.NewTimestampFromTime(time.Now())
value := float64(42)

ed := []exemplarData{{traceID: traceID, spanID: spanID, value: value}}
ed := []exemplar{{traceID: traceID, spanID: spanID, value: value}}

// ----- call -------------------------------------------------------------
setExemplars(ed, timestamp, exemplarSlice)
Expand Down Expand Up @@ -530,19 +530,20 @@ func TestConnectorUpdateExemplars(t *testing.T) {
value := float64(42)

// ----- call -------------------------------------------------------------
c.updateHistogram(key, value, traceID, spanID)
h := c.getOrCreateHistogram(key, pcommon.NewMap())
h.observe(value, traceID, spanID)

// ----- verify -----------------------------------------------------------
assert.NoError(t, err)
assert.NotEmpty(t, c.histograms[key].exemplarsData)
assert.Equal(t, c.histograms[key].exemplarsData[0], exemplarData{traceID: traceID, spanID: spanID, value: value})
assert.NotEmpty(t, c.histograms[key].exemplars)
assert.Equal(t, c.histograms[key].exemplars[0], exemplar{traceID: traceID, spanID: spanID, value: value})

// ----- call -------------------------------------------------------------
c.resetExemplarData()
c.resetExemplars()

// ----- verify -----------------------------------------------------------
assert.NoError(t, err)
assert.Empty(t, c.histograms[key].exemplarsData)
assert.Empty(t, c.histograms[key].exemplars)
}

func TestStart(t *testing.T) {
Expand Down Expand Up @@ -836,7 +837,7 @@ func newConnectorImp(mcon consumer.Metrics, defaultNullValue *pcommon.Value, tem
metricsConsumer: mcon,

startTimestamp: pcommon.NewTimestampFromTime(time.Now()),
histograms: make(map[metricKey]*histogramData),
histograms: make(map[metricKey]*histogram),
latencyBounds: defaultLatencyHistogramBucketsMs,
dimensions: []dimension{
// Set nil defaults to force a lookup for the attribute in the span.
Expand Down Expand Up @@ -889,19 +890,20 @@ func TestUpdateExemplars(t *testing.T) {
value := float64(42)

// ----- call -------------------------------------------------------------
c.updateHistogram(key, value, traceID, spanID)
h := c.getOrCreateHistogram(key, pcommon.NewMap())
h.observe(value, traceID, spanID)

// ----- verify -----------------------------------------------------------
assert.NoError(t, err)
assert.NotEmpty(t, c.histograms[key].exemplarsData)
assert.Equal(t, c.histograms[key].exemplarsData[0], exemplarData{traceID: traceID, spanID: spanID, value: value})
assert.NotEmpty(t, c.histograms[key].exemplars)
assert.Equal(t, c.histograms[key].exemplars[0], exemplar{traceID: traceID, spanID: spanID, value: value})

// ----- call -------------------------------------------------------------
c.resetExemplarData()
c.resetExemplars()

// ----- verify -----------------------------------------------------------
assert.NoError(t, err)
assert.Empty(t, c.histograms[key].exemplarsData)
assert.Empty(t, c.histograms[key].exemplars)
}

func TestConnectorConsumeTracesEvictedCacheKey(t *testing.T) {
Expand Down
Loading

0 comments on commit 104bc96

Please sign in to comment.