Skip to content

Commit

Permalink
[connectors/spanmetrics] Use dedicated state for calls sum metrics. (
Browse files Browse the repository at this point in the history
…open-telemetry#18894)

[spanmetricsconnector] Use dedicated field for counter metrics.
  • Loading branch information
kovrus committed Feb 27, 2023
1 parent 94e4025 commit 945073c
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 140 deletions.
214 changes: 117 additions & 97 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ type connectorImp struct {
// The starting time of the data points.
startTimestamp pcommon.Timestamp

// Histogram.
// Metrics
sums map[metricKey]*sum
histograms map[metricKey]*histogram
latencyBounds []float64

Expand All @@ -83,6 +84,22 @@ type connectorImp struct {
shutdownOnce sync.Once
}

type histogram struct {
attributes pcommon.Map
exemplars pmetric.ExemplarSlice

bucketCounts []uint64
count uint64
sum float64

latencyBounds []float64
}

type sum struct {
attributes pcommon.Map
count uint64
}

type dimension struct {
name string
value *pcommon.Value
Expand All @@ -103,32 +120,6 @@ func newDimensions(cfgDims []Dimension) []dimension {
return dims
}

type histogram struct {
attributes pcommon.Map
exemplars pmetric.ExemplarSlice

count uint64
sum float64
bucketCounts []uint64

latencyBounds []float64
}

// observe a measurement and adds an exemplar.
func (h *histogram) observe(latencyMs float64, traceID pcommon.TraceID, spanID pcommon.SpanID) {
h.sum += latencyMs
h.count++
// Binary search to find the latencyMs bucket index.
index := sort.SearchFloat64s(h.latencyBounds, latencyMs)
h.bucketCounts[index]++
if !traceID.IsEmpty() {
e := h.exemplars.AppendEmpty()
e.SetTraceID(traceID)
e.SetSpanID(spanID)
e.SetDoubleValue(latencyMs)
}
}

func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Ticker) (*connectorImp, error) {
logger.Info("Building spanmetrics")
pConfig := config.(*Config)
Expand All @@ -148,6 +139,7 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
config: *pConfig,
startTimestamp: pcommon.NewTimestampFromTime(time.Now()),
latencyBounds: bounds,
sums: make(map[metricKey]*sum),
histograms: make(map[metricKey]*histogram),
dimensions: newDimensions(pConfig.Dimensions),
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
Expand Down Expand Up @@ -223,18 +215,7 @@ func (p *connectorImp) exportMetrics(ctx context.Context) {
p.lock.Lock()

m := p.buildMetrics()

// Exemplars are only relevant to this batch of traces, so must be cleared within the lock,
// regardless of error while building metrics, before the next batch of spans is received.
p.resetExemplars()

// If delta metrics, reset accumulated data
if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
p.histograms = make(map[metricKey]*histogram)
p.metricKeyToDimensions.Purge()
} else {
p.metricKeyToDimensions.RemoveEvictedItems()
}
p.resetState()

// This component no longer needs to read the metrics once built, so it is safe to unlock.
p.lock.Unlock()
Expand All @@ -252,53 +233,72 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics {
ilm := m.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty()
ilm.Scope().SetName("spanmetricsconnector")

p.collectCallMetrics(ilm)
p.collectLatencyMetrics(ilm)
p.buildCallsSumMetrics(ilm)
p.buildLatencyHistogramMetrics(ilm)

return m
}

// collectLatencyMetrics collects the raw latency metrics, writing the data
// into the given instrumentation library metrics.
func (p *connectorImp) collectLatencyMetrics(ilm pmetric.ScopeMetrics) {
mLatency := ilm.Metrics().AppendEmpty()
mLatency.SetName(buildMetricName(p.config.Namespace, metricNameLatency))
mLatency.SetUnit("ms")
mLatency.SetEmptyHistogram().SetAggregationTemporality(p.config.GetAggregationTemporality())
dps := mLatency.Histogram().DataPoints()
// buildLatencyHistogramMetrics collects the raw latency metrics and builds
// a histogram scope metric.
func (p *connectorImp) buildLatencyHistogramMetrics(ilm pmetric.ScopeMetrics) {
m := ilm.Metrics().AppendEmpty()
m.SetName(buildMetricName(p.config.Namespace, metricNameLatency))
m.SetUnit("ms")
m.SetEmptyHistogram().SetAggregationTemporality(p.config.GetAggregationTemporality())

dps := m.Histogram().DataPoints()
dps.EnsureCapacity(len(p.histograms))
timestamp := pcommon.NewTimestampFromTime(time.Now())
for _, hist := range p.histograms {
dpLatency := dps.AppendEmpty()
dpLatency.SetStartTimestamp(p.startTimestamp)
dpLatency.SetTimestamp(timestamp)
dpLatency.ExplicitBounds().FromRaw(p.latencyBounds)
dpLatency.BucketCounts().FromRaw(hist.bucketCounts)
dpLatency.SetCount(hist.count)
dpLatency.SetSum(hist.sum)
for i := 0; i < dpLatency.Exemplars().Len(); i++ {
dpLatency.Exemplars().At(i).SetTimestamp(timestamp)
dp := dps.AppendEmpty()
dp.SetStartTimestamp(p.startTimestamp)
dp.SetTimestamp(timestamp)
dp.ExplicitBounds().FromRaw(p.latencyBounds)
dp.BucketCounts().FromRaw(hist.bucketCounts)
dp.SetCount(hist.count)
dp.SetSum(hist.sum)
for i := 0; i < dp.Exemplars().Len(); i++ {
dp.Exemplars().At(i).SetTimestamp(timestamp)
}
hist.attributes.CopyTo(dpLatency.Attributes())
hist.attributes.CopyTo(dp.Attributes())
}
}

// collectCallMetrics collects the raw call count metrics, writing the data
// into the given instrumentation library metrics.
func (p *connectorImp) collectCallMetrics(ilm pmetric.ScopeMetrics) {
mCalls := ilm.Metrics().AppendEmpty()
mCalls.SetName(buildMetricName(p.config.Namespace, metricNameCalls))
mCalls.SetEmptySum().SetIsMonotonic(true)
mCalls.Sum().SetAggregationTemporality(p.config.GetAggregationTemporality())
dps := mCalls.Sum().DataPoints()
dps.EnsureCapacity(len(p.histograms))
// buildCallsSumMetrics collects the raw call count metrics and builds
// a sum scope metric.
func (p *connectorImp) buildCallsSumMetrics(ilm pmetric.ScopeMetrics) {
m := ilm.Metrics().AppendEmpty()
m.SetName(buildMetricName(p.config.Namespace, metricNameCalls))
m.SetEmptySum().SetIsMonotonic(true)
m.Sum().SetAggregationTemporality(p.config.GetAggregationTemporality())

dps := m.Sum().DataPoints()
dps.EnsureCapacity(len(p.sums))
timestamp := pcommon.NewTimestampFromTime(time.Now())
for _, hist := range p.histograms {
dpCalls := dps.AppendEmpty()
dpCalls.SetStartTimestamp(p.startTimestamp)
dpCalls.SetTimestamp(timestamp)
dpCalls.SetIntValue(int64(hist.count))
hist.attributes.CopyTo(dpCalls.Attributes())
for _, c := range p.sums {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(p.startTimestamp)
dp.SetTimestamp(timestamp)
dp.SetIntValue(int64(c.count))
c.attributes.CopyTo(dp.Attributes())
}
}

func (p *connectorImp) resetState() {
// Exemplars are only relevant to this batch of traces, so must be cleared within the lock,
// regardless of error while building metrics, before the next batch of spans is received.
for _, h := range p.histograms {
h.exemplars = pmetric.NewExemplarSlice()
}

// If delta metrics, reset accumulated data
if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
p.histograms = make(map[metricKey]*histogram)
p.sums = make(map[metricKey]*sum)
p.metricKeyToDimensions.Purge()
} else {
p.metricKeyToDimensions.RemoveEvictedItems()
}
}

Expand Down Expand Up @@ -328,46 +328,63 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
if endTime > startTime {
latencyMs = float64(endTime-startTime) / float64(time.Millisecond.Nanoseconds())
}
// Always reset the buffer before re-using.
p.keyBuf.Reset()
buildKey(p.keyBuf, serviceName, span, p.dimensions, resourceAttr)
key := metricKey(p.keyBuf.String())
key := p.buildKey(serviceName, span, p.dimensions, resourceAttr)

attributes, ok := p.metricKeyToDimensions.Get(key)
if !ok {
attributes = p.buildAttributes(serviceName, span, resourceAttr)
p.metricKeyToDimensions.Add(key, attributes)
}

h := p.getOrCreateHistogram(key, attributes)
h.observe(latencyMs, span.TraceID(), span.SpanID())
p.aggregateLatencies(key, attributes, span, latencyMs)
p.aggregateCalls(key, attributes)
}
}
}
}

func (p *connectorImp) getOrCreateHistogram(k metricKey, attr pcommon.Map) *histogram {
h, ok := p.histograms[k]
func (p *connectorImp) aggregateLatencies(
key metricKey,
attributes pcommon.Map,
span ptrace.Span,
latency float64,
) {
h, ok := p.histograms[key]
if !ok {
h = &histogram{
attributes: attr,
attributes: attributes,
bucketCounts: make([]uint64, len(p.latencyBounds)+1),
latencyBounds: p.latencyBounds,
exemplars: pmetric.NewExemplarSlice(),
}
p.histograms[k] = h
p.histograms[key] = h
}

return h
h.sum += latency
h.count++

// Binary search to find the latencyMs bucket index.
index := sort.SearchFloat64s(h.latencyBounds, latency)
h.bucketCounts[index]++

if !span.TraceID().IsEmpty() {
e := h.exemplars.AppendEmpty()
e.SetTraceID(span.TraceID())
e.SetSpanID(span.SpanID())
e.SetDoubleValue(latency)
}
}

// resetExemplars resets the entire exemplars map so the next trace will recreate all
// the data structure. An exemplar is a punctual value that exists at specific moment in time
// and should be not considered like a metrics that persist over time.
func (p *connectorImp) resetExemplars() {
for _, histo := range p.histograms {
histo.exemplars = pmetric.NewExemplarSlice()
func (p *connectorImp) aggregateCalls(key metricKey, attributes pcommon.Map) {
c, ok := p.sums[key]
if !ok {
c = &sum{
attributes: attributes,
}
p.sums[key] = c
}

c.count++
}

func (p *connectorImp) buildAttributes(serviceName string, span ptrace.Span, resourceAttrs pcommon.Map) pcommon.Map {
Expand Down Expand Up @@ -397,17 +414,20 @@ func concatDimensionValue(dest *bytes.Buffer, value string, prefixSep bool) {
// or resource attributes. If the dimension exists in both, the span's attributes, being the most specific, takes precedence.
//
// The metric key is a simple concatenation of dimension values, delimited by a null character.
func buildKey(dest *bytes.Buffer, serviceName string, span ptrace.Span, optionalDims []dimension, resourceAttrs pcommon.Map) {
concatDimensionValue(dest, serviceName, false)
concatDimensionValue(dest, span.Name(), true)
concatDimensionValue(dest, traceutil.SpanKindStr(span.Kind()), true)
concatDimensionValue(dest, traceutil.StatusCodeStr(span.Status().Code()), true)
func (p *connectorImp) buildKey(serviceName string, span ptrace.Span, optionalDims []dimension, resourceAttrs pcommon.Map) metricKey {
p.keyBuf.Reset()
concatDimensionValue(p.keyBuf, serviceName, false)
concatDimensionValue(p.keyBuf, span.Name(), true)
concatDimensionValue(p.keyBuf, traceutil.SpanKindStr(span.Kind()), true)
concatDimensionValue(p.keyBuf, traceutil.StatusCodeStr(span.Status().Code()), true)

for _, d := range optionalDims {
if v, ok := getDimensionValue(d, span.Attributes(), resourceAttrs); ok {
concatDimensionValue(dest, v.AsString(), true)
concatDimensionValue(p.keyBuf, v.AsString(), true)
}
}

return metricKey(p.keyBuf.String())
}

// getDimensionValue gets the dimension value for the given configured dimension.
Expand Down
Loading

0 comments on commit 945073c

Please sign in to comment.