Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector/spanmetrics] Produce delta temporality span metrics with timestamps representing an uninterrupted series #31780

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/spanmetrics_uninterrupted_delta_timestamps.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetrics

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Produce delta temporality span metrics with StartTimeUnixNano and TimeUnixNano values representing an uninterrupted series

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31671, 30688]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This allows producing delta span metrics instead of the more memory-intensive cumulative metrics, specifically when a downstream component can convert the delta metrics to cumulative.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
3 changes: 2 additions & 1 deletion connector/spanmetricsconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,9 @@ The following settings can be optionally configured:
- `namespace`: Defines the namespace of the generated metrics. If `namespace` provided, generated metric name will be added `namespace.` prefix.
- `metrics_flush_interval` (default: `60s`): Defines the flush interval of the generated metrics.
- `metrics_expiration` (default: `0`): Defines the expiration time as `time.Duration`, after which, if no new spans are received, metrics will no longer be exported. Setting to `0` means the metrics will never expire (default behavior).
- `metric_timestamp_cache_size` (default `1000`): Only relevant for delta temporality span metrics. Controls the size of the cache used to keep track of a metric's TimestampUnixNano the last time it was flushed. When a metric is evicted from the cache, its next data point will indicate a "reset" in the series. Downstream components converting from delta to cumulative, like `prometheusexporter`, may handle these resets by setting cumulative counters back to 0.
- `exemplars`: Use to configure how to attach exemplars to metrics.
- `enabled` (default: `false`): enabling will add spans as Exemplars to all metrics. Exemplars are only kept for one flush interval.
- `enabled` (default: `false`): enabling will add spans as Exemplars to all metrics. Exemplars are only kept for one flush interval.rom the cache, its next data point will indicate a "reset" in the series. Downstream components converting from delta to cumulative, like `prometheusexporter`, may handle these resets by setting cumulative counters back to 0.
- `events`: Use to configure the events metric.
- `enabled`: (default: `false`): enabling will add the events metric.
- `dimensions`: (mandatory if `enabled`) the list of the span's event attributes to add as dimensions to the events metric, which will be included _on top of_ the common and configured `dimensions` for span and resource attributes.
Expand Down
19 changes: 19 additions & 0 deletions connector/spanmetricsconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var defaultHistogramBucketsMs = []float64{
2, 4, 6, 8, 10, 50, 100, 200, 400, 800, 1000, 1400, 2000, 5000, 10_000, 15_000,
}

var defaultDeltaTimestampCacheSize = 1000

// Dimension defines the dimension name and optional default value if the Dimension is missing from a span attribute.
type Dimension struct {
Name string `mapstructure:"name"`
Expand Down Expand Up @@ -71,6 +73,9 @@ type Config struct {
// Default value (0) means that the metrics will never expire.
MetricsExpiration time.Duration `mapstructure:"metrics_expiration"`

// TimestampCacheSize controls the size of the cache used to keep track of delta metrics' TimestampUnixNano the last time it was flushed
TimestampCacheSize *int `mapstructure:"metric_timestamp_cache_size"`

// Namespace is the namespace of the metrics emitted by the connector.
Namespace string `mapstructure:"namespace"`

Expand Down Expand Up @@ -139,6 +144,13 @@ func (c Config) Validate() error {
return fmt.Errorf("invalid metrics_expiration: %v, the duration should be positive", c.MetricsExpiration)
}

if c.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta && c.GetDeltaTimestampCacheSize() <= 0 {
return fmt.Errorf(
"invalid delta timestamp cache size: %v, the maximum number of the items in the cache should be positive",
c.GetDeltaTimestampCacheSize(),
)
}

return nil
}

Expand All @@ -151,6 +163,13 @@ func (c Config) GetAggregationTemporality() pmetric.AggregationTemporality {
return pmetric.AggregationTemporalityCumulative
}

func (c Config) GetDeltaTimestampCacheSize() int {
if c.TimestampCacheSize != nil {
return *c.TimestampCacheSize
}
return defaultDeltaTimestampCacheSize
}

// validateDimensions checks duplicates for reserved dimensions and additional dimensions.
func validateDimensions(dimensions []Dimension) error {
labelNames := make(map[string]struct{})
Expand Down
39 changes: 36 additions & 3 deletions connector/spanmetricsconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ func TestLoadConfig(t *testing.T) {

defaultMethod := "GET"
defaultMaxPerDatapoint := 5
customTimestampCacheSize := 123
tests := []struct {
id component.ID
expected component.Config
errorMessage string
id component.ID
expected component.Config
errorMessage string
extraAssertions func(config *Config)
}{
{
id: component.NewIDWithName(metadata.Type, "default"),
Expand Down Expand Up @@ -125,6 +127,34 @@ func TestLoadConfig(t *testing.T) {
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
},
},
{
id: component.NewIDWithName(metadata.Type, "custom_delta_timestamp_cache_size"),
expected: &Config{
AggregationTemporality: "AGGREGATION_TEMPORALITY_DELTA",
TimestampCacheSize: &customTimestampCacheSize,
DimensionsCacheSize: defaultDimensionsCacheSize,
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
MetricsFlushInterval: 60 * time.Second,
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
},
},
{
id: component.NewIDWithName(metadata.Type, "default_delta_timestamp_cache_size"),
expected: &Config{
AggregationTemporality: "AGGREGATION_TEMPORALITY_DELTA",
DimensionsCacheSize: defaultDimensionsCacheSize,
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
MetricsFlushInterval: 60 * time.Second,
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
},
extraAssertions: func(config *Config) {
assert.Equal(t, defaultDeltaTimestampCacheSize, config.GetDeltaTimestampCacheSize())
},
},
{
id: component.NewIDWithName(metadata.Type, "invalid_delta_timestamp_cache_size"),
errorMessage: "invalid delta timestamp cache size: 0, the maximum number of the items in the cache should be positive",
},
}

for _, tt := range tests {
Expand All @@ -143,6 +173,9 @@ func TestLoadConfig(t *testing.T) {
}
assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, tt.expected, cfg)
if tt.extraAssertions != nil {
tt.extraAssertions(cfg.(*Config))
}
})
}
}
Expand Down
52 changes: 46 additions & 6 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/lightstep/go-expohisto/structure"
"github.com/tilinna/clock"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -72,6 +73,9 @@ type connectorImp struct {
eDimensions []dimension

events EventsConfig

// Tracks the last TimestampUnixNano for delta metrics so that they represent an uninterrupted series. Unused for cumulative span metrics.
lastDeltaTimestamps *simplelru.LRU[metrics.Key, pcommon.Timestamp]
}

type resourceMetrics struct {
Expand Down Expand Up @@ -125,6 +129,16 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
resourceMetricsKeyAttributes[attr] = s
}

var lastDeltaTimestamps *simplelru.LRU[metrics.Key, pcommon.Timestamp]
if cfg.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
lastDeltaTimestamps, err = simplelru.NewLRU[metrics.Key, pcommon.Timestamp](cfg.GetDeltaTimestampCacheSize(), func(k metrics.Key, _ pcommon.Timestamp) {
logger.Info("Evicting cached delta timestamp", zap.String("key", string(k)))
})
if err != nil {
return nil, err
}
}

return &connectorImp{
logger: logger,
config: *cfg,
Expand All @@ -133,6 +147,7 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
dimensions: newDimensions(cfg.Dimensions),
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
metricKeyToDimensions: metricKeyToDimensionsCache,
lastDeltaTimestamps: lastDeltaTimestamps,
ticker: ticker,
done: make(chan struct{}),
eDimensions: newDimensions(cfg.Events.Dimensions),
Expand Down Expand Up @@ -251,6 +266,7 @@ func (p *connectorImp) exportMetrics(ctx context.Context) {
// buildMetrics collects the computed raw metrics data and builds OTLP metrics.
func (p *connectorImp) buildMetrics() pmetric.Metrics {
m := pmetric.NewMetrics()
timestamp := pcommon.NewTimestampFromTime(time.Now())

p.resourceMetrics.ForEach(func(_ resourceKey, rawMetrics *resourceMetrics) {
rm := m.ResourceMetrics().AppendEmpty()
Expand All @@ -259,23 +275,46 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics {
sm := rm.ScopeMetrics().AppendEmpty()
sm.Scope().SetName("spanmetricsconnector")

/**
* To represent an uninterrupted stream of metrics as per the spec, the (StartTimestamp, Timestamp)'s of successive data points should be:
* - For cumulative metrics: (T1, T2), (T1, T3), (T1, T4) ...
* - For delta metrics: (T1, T2), (T2, T3), (T3, T4) ...
*/
deltaMetricKeys := make(map[metrics.Key]bool)
startTimeGenerator := func(mk metrics.Key) pcommon.Timestamp {
swar8080 marked this conversation as resolved.
Show resolved Hide resolved
startTime := rawMetrics.startTimestamp
if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
if lastTimestamp, ok := p.lastDeltaTimestamps.Get(mk); ok {
startTime = lastTimestamp
}
// Collect lastDeltaTimestamps keys that need to be updated. Metrics can share the same key, so defer the update.
deltaMetricKeys[mk] = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update lastDeltaTimestamps here directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This anonymous function is first called for the calls metric and then called with the same key for the duration metric. So if we update it here then the duration metric will get the incorrect value from the cache

That is what I was trying to say in the comment above this line, but feel free to change the wording if it's unclear!

}
return startTime
}

sums := rawMetrics.sums
metric := sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameCalls))
sums.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
sums.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
if !p.config.Histogram.Disable {
histograms := rawMetrics.histograms
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameDuration))
metric.SetUnit(p.config.Histogram.Unit.String())
histograms.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
histograms.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
}

events := rawMetrics.events
if p.events.Enabled {
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameEvents))
events.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
events.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
}

for mk := range deltaMetricKeys {
// For delta metrics, cache the current data point's timestamp, which will be the start timestamp for the next data points in the series
p.lastDeltaTimestamps.Add(mk, timestamp)
}
})

Expand Down Expand Up @@ -326,6 +365,7 @@ func (p *connectorImp) resetState() {
// and span metadata such as name, kind, status_code and any additional
// dimensions the user has configured.
func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
startTimestamp := pcommon.NewTimestampFromTime(time.Now())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this just to make testing easier, since before that the startTimestamp was regenerated during each iteration of the for loop on the next line

for i := 0; i < traces.ResourceSpans().Len(); i++ {
rspans := traces.ResourceSpans().At(i)
resourceAttr := rspans.Resource().Attributes()
Expand All @@ -334,7 +374,7 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
continue
}

rm := p.getOrCreateResourceMetrics(resourceAttr)
rm := p.getOrCreateResourceMetrics(resourceAttr, startTimestamp)
sums := rm.sums
histograms := rm.histograms
events := rm.events
Expand Down Expand Up @@ -431,7 +471,7 @@ func (p *connectorImp) createResourceKey(attr pcommon.Map) resourceKey {
return pdatautil.MapHash(m)
}

func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMetrics {
func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map, startTimestamp pcommon.Timestamp) *resourceMetrics {
key := p.createResourceKey(attr)
v, ok := p.resourceMetrics.Get(key)
if !ok {
Expand All @@ -440,7 +480,7 @@ func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMet
sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
attributes: attr,
startTimestamp: pcommon.NewTimestampFromTime(time.Now()),
startTimestamp: startTimestamp,
}
p.resourceMetrics.Add(key, v)
}
Expand Down
Loading